diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index c4fb420be..4c103dce4 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -26,6 +26,9 @@ func init() { type Exchange struct { Client *binance.Client + + useMargin bool + useMarginIsolated bool } func New(key, secret string) *Exchange { @@ -39,6 +42,11 @@ func (e *Exchange) Name() types.ExchangeName { return types.ExchangeBinance } +func (e *Exchange) UseMargin(isolated bool) { + e.useMargin = true + e.useMarginIsolated = isolated +} + func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) { log.Info("querying market info...") @@ -96,7 +104,13 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6 } func (e *Exchange) NewStream() types.Stream { - return NewStream(e.Client) + stream := NewStream(e.Client) + + if e.useMargin { + stream.UseMargin(e.useMarginIsolated) + } + + return stream } func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 08c4a92d0..170916979 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -45,29 +45,17 @@ type Stream struct { balanceUpdateEventCallbacks []func(event *BalanceUpdateEvent) outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent) executionReportEventCallbacks []func(event *ExecutionReportEvent) + + useMargin bool + useMarginIsolated bool } func NewStream(client *binance.Client) *Stream { - // binance BalanceUpdate = withdrawal or deposit changes - /* - stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) { - a.mu.Lock() - defer a.mu.Unlock() - - delta := util.MustParseFloat(e.Delta) - if balance, ok := a.Balances[e.Asset]; ok { - balance.Available += delta - a.Balances[e.Asset] = balance - } - }) - */ - stream := &Stream{ Client: client, } - var depthFrames = make(map[string]*DepthFrame) - + depthFrames := make(map[string]*DepthFrame) stream.OnDepthEvent(func(e *DepthEvent) { f, ok := depthFrames[e.Symbol] if !ok { @@ -201,12 +189,34 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { return conn, nil } +func (s *Stream) UseMargin(isolated bool) { + s.useMargin = true + s.useMarginIsolated = isolated +} + +func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { + if s.useMargin { + return s.Client.NewStartMarginUserStreamService().Do(ctx) + } + + return s.Client.NewStartUserStreamService().Do(ctx) +} + +func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error { + if s.useMargin { + return s.Client.NewKeepaliveMarginUserStreamService().ListenKey(listenKey).Do(ctx) + } + + return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx) +} + func (s *Stream) connect(ctx context.Context) error { if s.publicOnly { log.Infof("stream is set to public only mode") } else { - log.Infof("creating user data stream...") - listenKey, err := s.Client.NewStartUserStreamService().Do(ctx) + log.Infof("request listen key for creating user data stream...") + + listenKey, err := s.fetchListenKey(ctx) if err != nil { return err } @@ -268,7 +278,7 @@ func (s *Stream) read(ctx context.Context) { case <-keepAliveTicker.C: if !s.publicOnly { - if err := s.Client.NewKeepaliveUserStreamService().ListenKey(s.ListenKey).Do(ctx); err != nil { + if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil { log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey)) } } @@ -348,9 +358,16 @@ func (s *Stream) read(ctx context.Context) { } } -func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) error { - // use background context to invalidate the user stream - err := s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx) +func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err error) { + // should use background context to invalidate the user stream + log.Info("[binance] closing listen key") + + if s.useMargin { + err = s.Client.NewCloseMarginUserStreamService().ListenKey(listenKey).Do(ctx) + } else { + err = s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx) + } + if err != nil { log.WithError(err).Error("[binance] error deleting listen key") return err diff --git a/scripts/maxapi.sh b/scripts/maxapi.sh index f040ebee0..24367c761 100755 --- a/scripts/maxapi.sh +++ b/scripts/maxapi.sh @@ -93,6 +93,14 @@ function me() send_auth_request "GET" "/api/v2/members/me" params } +function depth() +{ + local market=$1 + declare -A params=() + params[market]=$market + send_auth_request "GET" "/api/v2/depth" params +} + function submitOrder() { local -n params=$1 diff --git a/utils/binance-margin-stream/main.go b/utils/binance-margin-stream/main.go new file mode 100644 index 000000000..7768cdde4 --- /dev/null +++ b/utils/binance-margin-stream/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "context" + "os" + "syscall" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/c9s/bbgo/pkg/cmd/cmdutil" + "github.com/c9s/bbgo/pkg/exchange/binance" +) + +func main() { + log.SetLevel(log.DebugLevel) + + ctx, cancel := context.WithCancel(context.Background()) + + // gobinance.NewClient(os.Getenv("BINANCE_API_KEY"), os.Getenv("BINANCE_API_SECRET")) + + ex := binance.New(os.Getenv("BINANCE_API_KEY"), os.Getenv("BINANCE_API_SECRET")) + ex.UseMargin(true) + stream := ex.NewStream() + + if err := stream.Connect(ctx); err != nil { + log.Fatal(err) + } + + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + cancel() + time.Sleep(5 * time.Second) + + return +}