diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index 4d6e80464..3ca6d1dba 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "encoding/json" "fmt" "syscall" "time" @@ -14,6 +15,12 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +type PV struct { + bids types.PriceVolumeSlice + asks types.PriceVolumeSlice + t int64 +} + // go run ./cmd/bbgo orderbook --session=binance --symbol=BTCUSDT var orderbookCmd = &cobra.Command{ Use: "orderbook --session=[exchange_name] --symbol=[pair_name]", @@ -56,9 +63,16 @@ var orderbookCmd = &cobra.Command{ orderBook := types.NewMutexOrderBook(symbol, session.Exchange.Name()) + pv1c := make(chan PV, 5000) + pv2c := make(chan PV, 5000) + s := session.Exchange.NewStream() + session.Key = "new" + s2 := session.Exchange.NewStream() s.SetPublicOnly() + s2.SetPublicOnly() s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) + s2.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) s.OnBookSnapshot(func(book types.SliceOrderBook) { if dumpDepthUpdate { log.Infof("orderbook snapshot: %s", book.String()) @@ -70,11 +84,19 @@ var orderbookCmd = &cobra.Command{ log.WithError(err).Panicf("invalid error book snapshot") } - if bid, ask, ok := orderBook.BestBidAndAsk(); ok { - log.Infof("ASK | %f x %f / %f x %f | BID | %s", - ask.Volume.Float64(), ask.Price.Float64(), - bid.Price.Float64(), bid.Volume.Float64(), - book.Time.String()) + /* + if bid, ask, ok := orderBook.BestBidAndAsk(); ok { + log.Infof("1) ASK | %f x %f / %f x %f | BID | %s", + ask.Volume.Float64(), ask.Price.Float64(), + bid.Price.Float64(), bid.Volume.Float64(), + book.Time.String()) + } + */ + + pv1c <- PV{ + asks: orderBook.SideBook(types.SideTypeSell), + bids: orderBook.SideBook(types.SideTypeBuy), + t: book.LastUpdateId, } }) @@ -84,11 +106,64 @@ var orderbookCmd = &cobra.Command{ } orderBook.Update(book) - if bid, ask, ok := orderBook.BestBidAndAsk(); ok { - log.Infof("ASK | %f x %f / %f x %f | BID | %s", - ask.Volume.Float64(), ask.Price.Float64(), - bid.Price.Float64(), bid.Volume.Float64(), - book.Time.String()) + /* + if bid, ask, ok := orderBook.BestBidAndAsk(); ok { + log.Infof("1) ASK | %f x %f / %f x %f | BID | %s", + ask.Volume.Float64(), ask.Price.Float64(), + bid.Price.Float64(), bid.Volume.Float64(), + book.Time.String()) + } + */ + pv1c <- PV{ + asks: orderBook.SideBook(types.SideTypeSell), + bids: orderBook.SideBook(types.SideTypeBuy), + t: book.LastUpdateId, + } + }) + s2.OnBookSnapshot(func(book types.SliceOrderBook) { + if dumpDepthUpdate { + log.Infof("orderbook snapshot: %s", book.String()) + } + + orderBook.Load(book) + + if ok, err := orderBook.IsValid(); !ok { + log.WithError(err).Panicf("invalid error book snapshot") + } + + /* + if bid, ask, ok := orderBook.BestBidAndAsk(); ok { + log.Infof("2) ASK | %f x %f / %f x %f | BID | %s", + ask.Volume.Float64(), ask.Price.Float64(), + bid.Price.Float64(), bid.Volume.Float64(), + book.Time.String()) + } + */ + pv2c <- PV{ + asks: orderBook.SideBook(types.SideTypeSell), + bids: orderBook.SideBook(types.SideTypeBuy), + t: book.LastUpdateId, + } + }) + + s2.OnBookUpdate(func(book types.SliceOrderBook) { + if dumpDepthUpdate { + log.Infof("orderbook update: %s", book.String()) + } + orderBook.Update(book) + + /* + if bid, ask, ok := orderBook.BestBidAndAsk(); ok { + log.Infof("2) ASK | %f x %f / %f x %f | BID | %s", + ask.Volume.Float64(), ask.Price.Float64(), + bid.Price.Float64(), bid.Volume.Float64(), + book.Time.String()) + } + */ + pv2c <- PV{ + asks: orderBook.SideBook(types.SideTypeSell), + bids: orderBook.SideBook(types.SideTypeBuy), + t: book.LastUpdateId, } }) @@ -96,6 +171,9 @@ var orderbookCmd = &cobra.Command{ if err := s.Connect(ctx); err != nil { return fmt.Errorf("failed to connect to %s", sessionName) } + if err := s2.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to %s", sessionName) + } log.Infof("connected") defer func() { @@ -103,9 +181,86 @@ var orderbookCmd = &cobra.Command{ if err := s.Close(); err != nil { log.WithError(err).Errorf("connection close error") } + if err := s2.Close(); err != nil { + log.WithError(err).Errorf("connection close error") + } time.Sleep(1 * time.Second) }() + go func() { + flag := false + for { + select { + case pv1 := <-pv1c: + pv2 := <-pv2c + + if flag && pv1.t != pv2.t { + log.Info("skip") + } + + for pv1.t != pv2.t { + if pv1.t < pv2.t { + pv1 = <-pv1c + } else { + pv2 = <-pv2c + } + } + + log.Infof("version of 1: %d, version of 2: %d", pv1.t, pv2.t) + + flag = true + + if len(pv1.asks) != len(pv2.asks) { + log.Info("not equal for length") + log.Infof("pv1 asks: %+v", pv1.asks) + log.Infof("pv2 asks: %+v", pv2.asks) + x, _ := json.Marshal(pv1.asks) + log.Infof("pv1 bids: %s", string(x)) + y, _ := json.Marshal(pv2.asks) + log.Infof("pv2 bids: %s", string(y)) + + continue + } + + for i := range pv1.asks { + if !pv1.asks[i].Equals(pv2.asks[i]) { + log.Info("not euqal for value") + log.Infof("pv1 asks: %+v", pv1.asks) + log.Infof("pv2 asks: %+v", pv2.asks) + x, _ := json.Marshal(pv1.asks) + log.Infof("pv1 bids: %s", string(x)) + y, _ := json.Marshal(pv2.asks) + log.Infof("pv2 bids: %s", string(y)) + + continue + } + } + + if len(pv1.bids) != len(pv2.bids) { + log.Info("not equal for length") + x, _ := json.Marshal(pv1.bids) + log.Infof("pv1 bids: %s", string(x)) + y, _ := json.Marshal(pv2.bids) + log.Infof("pv2 bids: %s", string(y)) + + continue + } + + for i := range pv1.bids { + if !pv1.bids[i].Equals(pv2.bids[i]) { + log.Info("not euqal for value") + x, _ := json.Marshal(pv1.bids) + log.Infof("pv1 bids: %s", string(x)) + y, _ := json.Marshal(pv2.bids) + log.Infof("pv2 bids: %s", string(y)) + + continue + } + } + } + } + }() + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) return nil }, diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index cee0b151e..c0804159b 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -178,7 +178,7 @@ func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) { } func (e *Exchange) NewStream() types.Stream { - stream := NewStream(e.key, e.secret) + stream := NewStream(e, e.key, e.secret) stream.MarginSettings = e.MarginSettings return stream } diff --git a/pkg/exchange/max/maxapi/public_parser.go b/pkg/exchange/max/maxapi/public_parser.go index 133dd2e68..55b7fe2d8 100644 --- a/pkg/exchange/max/maxapi/public_parser.go +++ b/pkg/exchange/max/maxapi/public_parser.go @@ -164,12 +164,15 @@ func parsePublicTradeEvent(val *fastjson.Value) (*PublicTradeEvent, error) { } type BookEvent struct { - Event string `json:"e"` - Market string `json:"M"` - Channel string `json:"c"` - Timestamp int64 `json:"t"` // Millisecond timestamp - Bids types.PriceVolumeSlice - Asks types.PriceVolumeSlice + Event string `json:"e"` + Market string `json:"M"` + Channel string `json:"c"` + Timestamp int64 `json:"t"` // Millisecond timestamp + Bids types.PriceVolumeSlice + Asks types.PriceVolumeSlice + FirstUpdateID int64 `json:"fi"` + LastUpdateID int64 `json:"li"` + Version int64 `json:"v"` } func (e *BookEvent) Time() time.Time { @@ -181,6 +184,7 @@ func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook, err error) { snapshot.Time = e.Time() snapshot.Bids = e.Bids snapshot.Asks = e.Asks + snapshot.LastUpdateId = e.LastUpdateID return snapshot, nil } @@ -212,10 +216,13 @@ func parseKLineEvent(val *fastjson.Value) (*KLineEvent, error) { func parseBookEvent(val *fastjson.Value) (event *BookEvent, err error) { event = &BookEvent{ - Event: string(val.GetStringBytes("e")), - Market: string(val.GetStringBytes("M")), - Channel: string(val.GetStringBytes("c")), - Timestamp: val.GetInt64("T"), + Event: string(val.GetStringBytes("e")), + Market: string(val.GetStringBytes("M")), + Channel: string(val.GetStringBytes("c")), + Timestamp: val.GetInt64("T"), + FirstUpdateID: val.GetInt64("fi"), + LastUpdateID: val.GetInt64("li"), + Version: val.GetInt64("v"), } // t := time.Unix(0, event.Timestamp*int64(time.Millisecond)) diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index 766562ecf..5162d6d0f 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" + "github.com/c9s/bbgo/pkg/depth" max "github.com/c9s/bbgo/pkg/exchange/max/maxapi" "github.com/c9s/bbgo/pkg/types" ) @@ -40,19 +41,24 @@ type Stream struct { accountSnapshotEventCallbacks []func(e max.AccountSnapshotEvent) accountUpdateEventCallbacks []func(e max.AccountUpdateEvent) + + // depthBuffers is used for storing the depth info + depthBuffers map[string]*depth.Buffer } -func NewStream(key, secret string) *Stream { +func NewStream(ex *Exchange, key, secret string) *Stream { stream := &Stream{ StandardStream: types.NewStandardStream(), key: key, // pragma: allowlist nextline secret - secret: secret, + secret: secret, + depthBuffers: make(map[string]*depth.Buffer), } stream.SetEndpointCreator(stream.getEndpoint) stream.SetParser(max.ParseMessage) stream.SetDispatcher(stream.dispatchEvent) stream.OnConnect(stream.handleConnect) + stream.OnDisconnect(stream.handleDisconnect) stream.OnAuthEvent(func(e max.AuthEvent) { log.Infof("max websocket connection authenticated: %+v", e) stream.EmitAuth() @@ -62,9 +68,13 @@ func NewStream(key, secret string) *Stream { stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent) stream.OnOrderUpdateEvent(stream.handleOrderUpdateEvent) stream.OnTradeUpdateEvent(stream.handleTradeEvent) - stream.OnBookEvent(stream.handleBookEvent) stream.OnAccountSnapshotEvent(stream.handleAccountSnapshotEvent) stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent) + if key == "new" { + stream.OnBookEvent(stream.handleBookEventNew(ex)) + } else { + stream.OnBookEvent(stream.handleBookEvent) + } return stream } @@ -156,6 +166,13 @@ func (s *Stream) handleConnect() { } } +func (s *Stream) handleDisconnect() { + log.Debugf("resetting depth snapshots...") + for _, f := range s.depthBuffers { + f.Reset() + } +} + func (s *Stream) handleKLineEvent(e max.KLineEvent) { kline := e.KLine.KLine() s.EmitKLine(kline) @@ -218,6 +235,41 @@ func (s *Stream) handleBookEvent(e max.BookEvent) { } } +func (s *Stream) handleBookEventNew(ex *Exchange) func(e max.BookEvent) { + return func(e max.BookEvent) { + symbol := toGlobalSymbol(e.Market) + f, ok := s.depthBuffers[symbol] + if ok { + err := f.AddUpdate(types.SliceOrderBook{ + Symbol: toGlobalSymbol(e.Market), + Time: e.Time(), + Bids: e.Bids, + Asks: e.Asks, + }, e.FirstUpdateID, e.LastUpdateID) + if err != nil { + log.WithError(err).Errorf("found missing %s update event", e.Market) + } + } else { + f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { + log.Infof("fetching %s depth...", e.Market) + // the depth of websocket orderbook event is 50 by default, so we use 50 as limit here + return ex.QueryDepth(context.Background(), e.Market, 50) + }) + f.SetBufferingPeriod(time.Second) + f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { + s.EmitBookSnapshot(snapshot) + for _, u := range updates { + s.EmitBookUpdate(u.Object) + } + }) + f.OnPush(func(update depth.Update) { + s.EmitBookUpdate(update.Object) + }) + s.depthBuffers[symbol] = f + } + } +} + func (s *Stream) handleAccountSnapshotEvent(e max.AccountSnapshotEvent) { snapshot := map[string]types.Balance{} for _, bm := range e.Balances {