diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index cee0b151e..c0804159b 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -178,7 +178,7 @@ func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) { } func (e *Exchange) NewStream() types.Stream { - stream := NewStream(e.key, e.secret) + stream := NewStream(e, e.key, e.secret) stream.MarginSettings = e.MarginSettings return stream } diff --git a/pkg/exchange/max/maxapi/public_parser.go b/pkg/exchange/max/maxapi/public_parser.go index 133dd2e68..55b7fe2d8 100644 --- a/pkg/exchange/max/maxapi/public_parser.go +++ b/pkg/exchange/max/maxapi/public_parser.go @@ -164,12 +164,15 @@ func parsePublicTradeEvent(val *fastjson.Value) (*PublicTradeEvent, error) { } type BookEvent struct { - Event string `json:"e"` - Market string `json:"M"` - Channel string `json:"c"` - Timestamp int64 `json:"t"` // Millisecond timestamp - Bids types.PriceVolumeSlice - Asks types.PriceVolumeSlice + Event string `json:"e"` + Market string `json:"M"` + Channel string `json:"c"` + Timestamp int64 `json:"t"` // Millisecond timestamp + Bids types.PriceVolumeSlice + Asks types.PriceVolumeSlice + FirstUpdateID int64 `json:"fi"` + LastUpdateID int64 `json:"li"` + Version int64 `json:"v"` } func (e *BookEvent) Time() time.Time { @@ -181,6 +184,7 @@ func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook, err error) { snapshot.Time = e.Time() snapshot.Bids = e.Bids snapshot.Asks = e.Asks + snapshot.LastUpdateId = e.LastUpdateID return snapshot, nil } @@ -212,10 +216,13 @@ func parseKLineEvent(val *fastjson.Value) (*KLineEvent, error) { func parseBookEvent(val *fastjson.Value) (event *BookEvent, err error) { event = &BookEvent{ - Event: string(val.GetStringBytes("e")), - Market: string(val.GetStringBytes("M")), - Channel: string(val.GetStringBytes("c")), - Timestamp: val.GetInt64("T"), + Event: string(val.GetStringBytes("e")), + Market: string(val.GetStringBytes("M")), + Channel: string(val.GetStringBytes("c")), + Timestamp: val.GetInt64("T"), + FirstUpdateID: val.GetInt64("fi"), + LastUpdateID: val.GetInt64("li"), + Version: val.GetInt64("v"), } // t := time.Unix(0, event.Timestamp*int64(time.Millisecond)) diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index 766562ecf..eb20fa071 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" + "github.com/c9s/bbgo/pkg/depth" max "github.com/c9s/bbgo/pkg/exchange/max/maxapi" "github.com/c9s/bbgo/pkg/types" ) @@ -40,19 +41,24 @@ type Stream struct { accountSnapshotEventCallbacks []func(e max.AccountSnapshotEvent) accountUpdateEventCallbacks []func(e max.AccountUpdateEvent) + + // depthBuffers is used for storing the depth info + depthBuffers map[string]*depth.Buffer } -func NewStream(key, secret string) *Stream { +func NewStream(ex *Exchange, key, secret string) *Stream { stream := &Stream{ StandardStream: types.NewStandardStream(), key: key, // pragma: allowlist nextline secret - secret: secret, + secret: secret, + depthBuffers: make(map[string]*depth.Buffer), } stream.SetEndpointCreator(stream.getEndpoint) stream.SetParser(max.ParseMessage) stream.SetDispatcher(stream.dispatchEvent) stream.OnConnect(stream.handleConnect) + stream.OnDisconnect(stream.handleDisconnect) stream.OnAuthEvent(func(e max.AuthEvent) { log.Infof("max websocket connection authenticated: %+v", e) stream.EmitAuth() @@ -62,9 +68,9 @@ func NewStream(key, secret string) *Stream { stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent) stream.OnOrderUpdateEvent(stream.handleOrderUpdateEvent) stream.OnTradeUpdateEvent(stream.handleTradeEvent) - stream.OnBookEvent(stream.handleBookEvent) stream.OnAccountSnapshotEvent(stream.handleAccountSnapshotEvent) stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent) + stream.OnBookEvent(stream.handleBookEvent(ex)) return stream } @@ -156,6 +162,13 @@ func (s *Stream) handleConnect() { } } +func (s *Stream) handleDisconnect() { + log.Debugf("resetting depth snapshots...") + for _, f := range s.depthBuffers { + f.Reset() + } +} + func (s *Stream) handleKLineEvent(e max.KLineEvent) { kline := e.KLine.KLine() s.EmitKLine(kline) @@ -200,21 +213,38 @@ func (s *Stream) handleTradeEvent(e max.TradeUpdateEvent) { } } -func (s *Stream) handleBookEvent(e max.BookEvent) { - newBook, err := e.OrderBook() - if err != nil { - log.WithError(err).Error("book convert error") - return - } - - newBook.Symbol = toGlobalSymbol(e.Market) - newBook.Time = e.Time() - - switch e.Event { - case "snapshot": - s.EmitBookSnapshot(newBook) - case "update": - s.EmitBookUpdate(newBook) +func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) { + return func(e max.BookEvent) { + symbol := toGlobalSymbol(e.Market) + f, ok := s.depthBuffers[symbol] + if ok { + err := f.AddUpdate(types.SliceOrderBook{ + Symbol: toGlobalSymbol(e.Market), + Time: e.Time(), + Bids: e.Bids, + Asks: e.Asks, + }, e.FirstUpdateID, e.LastUpdateID) + if err != nil { + log.WithError(err).Errorf("found missing %s update event", e.Market) + } + } else { + f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { + log.Infof("fetching %s depth...", e.Market) + // the depth of websocket orderbook event is 50 by default, so we use 50 as limit here + return ex.QueryDepth(context.Background(), e.Market, 50) + }) + f.SetBufferingPeriod(time.Second) + f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { + s.EmitBookSnapshot(snapshot) + for _, u := range updates { + s.EmitBookUpdate(u.Object) + } + }) + f.OnPush(func(update depth.Update) { + s.EmitBookUpdate(update.Object) + }) + s.depthBuffers[symbol] = f + } } }