diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index 3ca6d1dba..4d6e80464 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "encoding/json" "fmt" "syscall" "time" @@ -15,12 +14,6 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -type PV struct { - bids types.PriceVolumeSlice - asks types.PriceVolumeSlice - t int64 -} - // go run ./cmd/bbgo orderbook --session=binance --symbol=BTCUSDT var orderbookCmd = &cobra.Command{ Use: "orderbook --session=[exchange_name] --symbol=[pair_name]", @@ -63,16 +56,9 @@ var orderbookCmd = &cobra.Command{ orderBook := types.NewMutexOrderBook(symbol, session.Exchange.Name()) - pv1c := make(chan PV, 5000) - pv2c := make(chan PV, 5000) - s := session.Exchange.NewStream() - session.Key = "new" - s2 := session.Exchange.NewStream() s.SetPublicOnly() - s2.SetPublicOnly() s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) - s2.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) s.OnBookSnapshot(func(book types.SliceOrderBook) { if dumpDepthUpdate { log.Infof("orderbook snapshot: %s", book.String()) @@ -84,19 +70,11 @@ var orderbookCmd = &cobra.Command{ log.WithError(err).Panicf("invalid error book snapshot") } - /* - if bid, ask, ok := orderBook.BestBidAndAsk(); ok { - log.Infof("1) ASK | %f x %f / %f x %f | BID | %s", - ask.Volume.Float64(), ask.Price.Float64(), - bid.Price.Float64(), bid.Volume.Float64(), - book.Time.String()) - } - */ - - pv1c <- PV{ - asks: orderBook.SideBook(types.SideTypeSell), - bids: orderBook.SideBook(types.SideTypeBuy), - t: book.LastUpdateId, + if bid, ask, ok := orderBook.BestBidAndAsk(); ok { + log.Infof("ASK | %f x %f / %f x %f | BID | %s", + ask.Volume.Float64(), ask.Price.Float64(), + bid.Price.Float64(), bid.Volume.Float64(), + book.Time.String()) } }) @@ -106,64 +84,11 @@ var orderbookCmd = &cobra.Command{ } orderBook.Update(book) - /* - if bid, ask, ok := orderBook.BestBidAndAsk(); ok { - log.Infof("1) ASK | %f x %f / %f x %f | BID | %s", - ask.Volume.Float64(), ask.Price.Float64(), - bid.Price.Float64(), bid.Volume.Float64(), - book.Time.String()) - } - */ - pv1c <- PV{ - asks: orderBook.SideBook(types.SideTypeSell), - bids: orderBook.SideBook(types.SideTypeBuy), - t: book.LastUpdateId, - } - }) - s2.OnBookSnapshot(func(book types.SliceOrderBook) { - if dumpDepthUpdate { - log.Infof("orderbook snapshot: %s", book.String()) - } - - orderBook.Load(book) - - if ok, err := orderBook.IsValid(); !ok { - log.WithError(err).Panicf("invalid error book snapshot") - } - - /* - if bid, ask, ok := orderBook.BestBidAndAsk(); ok { - log.Infof("2) ASK | %f x %f / %f x %f | BID | %s", - ask.Volume.Float64(), ask.Price.Float64(), - bid.Price.Float64(), bid.Volume.Float64(), - book.Time.String()) - } - */ - pv2c <- PV{ - asks: orderBook.SideBook(types.SideTypeSell), - bids: orderBook.SideBook(types.SideTypeBuy), - t: book.LastUpdateId, - } - }) - - s2.OnBookUpdate(func(book types.SliceOrderBook) { - if dumpDepthUpdate { - log.Infof("orderbook update: %s", book.String()) - } - orderBook.Update(book) - - /* - if bid, ask, ok := orderBook.BestBidAndAsk(); ok { - log.Infof("2) ASK | %f x %f / %f x %f | BID | %s", - ask.Volume.Float64(), ask.Price.Float64(), - bid.Price.Float64(), bid.Volume.Float64(), - book.Time.String()) - } - */ - pv2c <- PV{ - asks: orderBook.SideBook(types.SideTypeSell), - bids: orderBook.SideBook(types.SideTypeBuy), - t: book.LastUpdateId, + if bid, ask, ok := orderBook.BestBidAndAsk(); ok { + log.Infof("ASK | %f x %f / %f x %f | BID | %s", + ask.Volume.Float64(), ask.Price.Float64(), + bid.Price.Float64(), bid.Volume.Float64(), + book.Time.String()) } }) @@ -171,9 +96,6 @@ var orderbookCmd = &cobra.Command{ if err := s.Connect(ctx); err != nil { return fmt.Errorf("failed to connect to %s", sessionName) } - if err := s2.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect to %s", sessionName) - } log.Infof("connected") defer func() { @@ -181,86 +103,9 @@ var orderbookCmd = &cobra.Command{ if err := s.Close(); err != nil { log.WithError(err).Errorf("connection close error") } - if err := s2.Close(); err != nil { - log.WithError(err).Errorf("connection close error") - } time.Sleep(1 * time.Second) }() - go func() { - flag := false - for { - select { - case pv1 := <-pv1c: - pv2 := <-pv2c - - if flag && pv1.t != pv2.t { - log.Info("skip") - } - - for pv1.t != pv2.t { - if pv1.t < pv2.t { - pv1 = <-pv1c - } else { - pv2 = <-pv2c - } - } - - log.Infof("version of 1: %d, version of 2: %d", pv1.t, pv2.t) - - flag = true - - if len(pv1.asks) != len(pv2.asks) { - log.Info("not equal for length") - log.Infof("pv1 asks: %+v", pv1.asks) - log.Infof("pv2 asks: %+v", pv2.asks) - x, _ := json.Marshal(pv1.asks) - log.Infof("pv1 bids: %s", string(x)) - y, _ := json.Marshal(pv2.asks) - log.Infof("pv2 bids: %s", string(y)) - - continue - } - - for i := range pv1.asks { - if !pv1.asks[i].Equals(pv2.asks[i]) { - log.Info("not euqal for value") - log.Infof("pv1 asks: %+v", pv1.asks) - log.Infof("pv2 asks: %+v", pv2.asks) - x, _ := json.Marshal(pv1.asks) - log.Infof("pv1 bids: %s", string(x)) - y, _ := json.Marshal(pv2.asks) - log.Infof("pv2 bids: %s", string(y)) - - continue - } - } - - if len(pv1.bids) != len(pv2.bids) { - log.Info("not equal for length") - x, _ := json.Marshal(pv1.bids) - log.Infof("pv1 bids: %s", string(x)) - y, _ := json.Marshal(pv2.bids) - log.Infof("pv2 bids: %s", string(y)) - - continue - } - - for i := range pv1.bids { - if !pv1.bids[i].Equals(pv2.bids[i]) { - log.Info("not euqal for value") - x, _ := json.Marshal(pv1.bids) - log.Infof("pv1 bids: %s", string(x)) - y, _ := json.Marshal(pv2.bids) - log.Infof("pv2 bids: %s", string(y)) - - continue - } - } - } - } - }() - cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) return nil }, diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index 5162d6d0f..eb20fa071 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -70,11 +70,7 @@ func NewStream(ex *Exchange, key, secret string) *Stream { stream.OnTradeUpdateEvent(stream.handleTradeEvent) stream.OnAccountSnapshotEvent(stream.handleAccountSnapshotEvent) stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent) - if key == "new" { - stream.OnBookEvent(stream.handleBookEventNew(ex)) - } else { - stream.OnBookEvent(stream.handleBookEvent) - } + stream.OnBookEvent(stream.handleBookEvent(ex)) return stream } @@ -217,25 +213,7 @@ func (s *Stream) handleTradeEvent(e max.TradeUpdateEvent) { } } -func (s *Stream) handleBookEvent(e max.BookEvent) { - newBook, err := e.OrderBook() - if err != nil { - log.WithError(err).Error("book convert error") - return - } - - newBook.Symbol = toGlobalSymbol(e.Market) - newBook.Time = e.Time() - - switch e.Event { - case "snapshot": - s.EmitBookSnapshot(newBook) - case "update": - s.EmitBookUpdate(newBook) - } -} - -func (s *Stream) handleBookEventNew(ex *Exchange) func(e max.BookEvent) { +func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) { return func(e max.BookEvent) { symbol := toGlobalSymbol(e.Market) f, ok := s.depthBuffers[symbol]