diff --git a/bbgo/exchange/binance/stream.go b/bbgo/exchange/binance/stream.go index fae2371c9..12fa6b69e 100644 --- a/bbgo/exchange/binance/stream.go +++ b/bbgo/exchange/binance/stream.go @@ -2,6 +2,7 @@ package binance import ( "context" + "fmt" "strings" "time" @@ -71,7 +72,7 @@ func (s *Stream) connect(ctx context.Context) error { var params []string for _, subscription := range s.Subscriptions { - params = append(params, subscription.String()) + params = append(params, convertSubscription(subscription)) } log.Infof("[binance] subscribing channels: %+v", params) @@ -82,6 +83,21 @@ func (s *Stream) connect(ctx context.Context) error { }) } +func convertSubscription(s types.Subscription) string { + // binance uses lower case symbol name, + // for kline, it's "@kline_" + // for depth, it's "@depth OR @depth@100ms" + switch s.Channel { + case types.KLineChannel: + return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String()) + + case types.BookChannel: + return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol)) + } + + return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel) +} + func (s *Stream) Connect(ctx context.Context) error { err := s.connect(ctx) if err != nil { diff --git a/bbgo/exchange/max/maxapi/websocket.go b/bbgo/exchange/max/maxapi/websocket.go index e9c56a03f..273f7f204 100644 --- a/bbgo/exchange/max/maxapi/websocket.go +++ b/bbgo/exchange/max/maxapi/websocket.go @@ -213,7 +213,7 @@ func (s *WebSocketService) Reconnect() { // Subscribe is a helper method for building subscription request from the internal mapping types. // (Internal public method) -func (s *WebSocketService) Subscribe(channel string, market string) { +func (s *WebSocketService) Subscribe(channel, market string) { s.AddSubscription(Subscription{ Channel: channel, Market: market, diff --git a/bbgo/exchange/max/stream.go b/bbgo/exchange/max/stream.go index e28864192..043a05591 100644 --- a/bbgo/exchange/max/stream.go +++ b/bbgo/exchange/max/stream.go @@ -19,10 +19,15 @@ type Stream struct { func NewStream(key, secret string) *Stream { wss := max.NewWebSocketService(max.WebSocketURL, key, secret) + stream := &Stream{ websocketService: wss, } + wss.OnMessage(func(message []byte) { + logger.Infof("M: %s", message) + }) + wss.OnBookEvent(func(e max.BookEvent) { newbook, err := e.OrderBook() if err != nil { @@ -38,11 +43,50 @@ func NewStream(key, secret string) *Stream { } }) + wss.OnAccountSnapshotEvent(func(e max.AccountSnapshotEvent) { + snapshot := map[string]types.Balance{} + for _, bm := range e.Balances { + balance, err := bm.Balance() + if err != nil { + continue + } + + snapshot[balance.Currency] = *balance + } + + stream.EmitBalanceSnapshot(snapshot) + }) + + wss.OnAccountUpdateEvent(func(e max.AccountUpdateEvent) { + snapshot := map[string]types.Balance{} + for _, bm := range e.Balances { + balance, err := bm.Balance() + if err != nil { + continue + } + + snapshot[balance.Currency] = *balance + } + + stream.EmitBalanceUpdate(snapshot) + }) + + return stream } +func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) { + // "book" + switch channel { + case types.KLineChannel: + panic("kline channel is not supported in max") + } + + s.websocketService.Subscribe(string(channel), symbol) +} + func (s *Stream) Connect(ctx context.Context) error { - return nil + return s.websocketService.Connect(ctx) } func (s *Stream) Close() error { diff --git a/bbgo/sigchan/sigchan.go b/bbgo/sigchan/sigchan.go new file mode 100644 index 000000000..8f32326d2 --- /dev/null +++ b/bbgo/sigchan/sigchan.go @@ -0,0 +1,39 @@ +package sigchan + +import "time" + +type Chan chan struct{} + +func New(cap int) Chan { + return make(Chan, cap) +} + +func (c Chan) Drain(duration, deadline time.Duration) (cnt int) { + cnt = 0 + + deadlineC := time.After(deadline) + for { + select { + case <-c: + cnt++ + + case <-deadlineC: + return cnt + + case <-time.After(duration): + return cnt + } + } +} + + +func (c Chan) Emit() { + select { + case c <- struct{}{}: + default: + } +} + +func (c Chan) Close() { + close(c) +} diff --git a/bbgo/types/stream.go b/bbgo/types/stream.go index a57213324..ce6953233 100644 --- a/bbgo/types/stream.go +++ b/bbgo/types/stream.go @@ -2,38 +2,43 @@ package types import ( "context" - "fmt" - "strings" ) type PrivateStream interface { StandardStreamEventHub - Subscribe(channel string, symbol string, options SubscribeOptions) + Subscribe(channel Channel, symbol string, options SubscribeOptions) Connect(ctx context.Context) error Close() error } +type Channel string + +var BookChannel = Channel("book") + +var KLineChannel = Channel("kline") + + //go:generate callbackgen -type StandardStream -interface type StandardStream struct { Subscriptions []Subscription // private trade callbacks - tradeCallbacks []func(trade *Trade) + tradeCallbacks []func(trade *Trade) // balance snapshot callbacks balanceSnapshotCallbacks []func(balances map[string]Balance) balanceUpdateCallbacks []func(balances map[string]Balance) - kLineClosedCallbacks []func(kline KLine) + kLineClosedCallbacks []func(kline KLine) bookUpdateCallbacks []func(book OrderBook) bookSnapshotCallbacks []func(book OrderBook) } -func (stream *StandardStream) Subscribe(channel string, symbol string, options SubscribeOptions) { +func (stream *StandardStream) Subscribe(channel Channel, symbol string, options SubscribeOptions) { stream.Subscriptions = append(stream.Subscriptions, Subscription{ Channel: channel, Symbol: symbol, @@ -41,6 +46,7 @@ func (stream *StandardStream) Subscribe(channel string, symbol string, options S }) } +// SubscribeOptions provides the standard stream options type SubscribeOptions struct { Interval string Depth string @@ -56,21 +62,6 @@ func (o SubscribeOptions) String() string { type Subscription struct { Symbol string - Channel string + Channel Channel Options SubscribeOptions } - -func (s *Subscription) String() string { - // binance uses lower case symbol name, - // for kline, it's "@kline_" - // for depth, it's "@depth OR @depth@100ms" - switch s.Channel { - case "kline": - return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String()) - case "depth", "book": - return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel) - default: - return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel) - } -} -