From d3f06bc9d71a9964736f5c37126461ee893d5390 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 25 May 2021 19:13:10 +0800 Subject: [PATCH 1/2] fix binance depth stream buffering --- examples/binance-book/main.go | 2 +- pkg/exchange/binance/convert.go | 17 ++ pkg/exchange/binance/depthframe.go | 170 ++++++++++-------- pkg/exchange/binance/stream.go | 279 ++++++++++++++++++----------- 4 files changed, 282 insertions(+), 186 deletions(-) diff --git a/examples/binance-book/main.go b/examples/binance-book/main.go index 257e2eadc..f9aa1288a 100644 --- a/examples/binance-book/main.go +++ b/examples/binance-book/main.go @@ -71,7 +71,7 @@ var rootCmd = &cobra.Command{ if valid, err := book.IsValid(); !valid { log.Errorf("order book is invalid, error: %v", err) - return + continue } bestBid, hasBid := book.BestBid() diff --git a/pkg/exchange/binance/convert.go b/pkg/exchange/binance/convert.go index 5060704d0..681fdaa04 100644 --- a/pkg/exchange/binance/convert.go +++ b/pkg/exchange/binance/convert.go @@ -3,6 +3,7 @@ package binance import ( "fmt" "strconv" + "strings" "time" "github.com/adshao/go-binance/v2" @@ -283,3 +284,19 @@ func ConvertTrades(remoteTrades []*binance.TradeV3) (trades []types.Trade, err e return trades, err } + +func convertSubscription(s types.Subscription) string { + // binance uses lower case symbol name, + // for kline, it's "@kline_" + // for depth, it's "@depth OR @depth@100ms" + switch s.Channel { + case types.KLineChannel: + return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String()) + + case types.BookChannel: + return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol)) + } + + return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel) +} + diff --git a/pkg/exchange/binance/depthframe.go b/pkg/exchange/binance/depthframe.go index 4ddb9d2bd..8129a3fbc 100644 --- a/pkg/exchange/binance/depthframe.go +++ b/pkg/exchange/binance/depthframe.go @@ -2,7 +2,6 @@ package binance import ( "context" - "math/rand" "sync" "time" @@ -11,28 +10,51 @@ import ( //go:generate callbackgen -type DepthFrame type DepthFrame struct { + Symbol string + client *binance.Client context context.Context - mu sync.Mutex - once sync.Once - SnapshotDepth *DepthEvent - Symbol string - BufEvents []DepthEvent + snapshotMutex sync.Mutex + snapshotDepth *DepthEvent + + bufMutex sync.Mutex + bufEvents []DepthEvent + + once sync.Once readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent) pushCallbacks []func(e DepthEvent) } func (f *DepthFrame) reset() { - f.mu.Lock() - f.SnapshotDepth = nil - f.BufEvents = nil - f.mu.Unlock() + if debugBinanceDepth { + log.Infof("resetting %s depth frame", f.Symbol) + } + + f.bufMutex.Lock() + f.bufEvents = nil + f.bufMutex.Unlock() + + f.snapshotMutex.Lock() + f.snapshotDepth = nil + f.once = sync.Once{} + f.snapshotMutex.Unlock() +} + +func (f *DepthFrame) bufferEvent(e DepthEvent) { + if debugBinanceDepth { + log.Infof("buffering %s depth event FirstUpdateID = %d, FinalUpdateID = %d", f.Symbol, e.FirstUpdateID, e.FinalUpdateID) + } + + f.bufMutex.Lock() + f.bufEvents = append(f.bufEvents, e) + f.bufMutex.Unlock() } func (f *DepthFrame) loadDepthSnapshot() { - f.mu.Lock() + log.Infof("buffering %s depth events for 3 seconds...", f.Symbol) + time.Sleep(3 * time.Second) if debugBinanceDepth { log.Infof("loading %s depth from the restful api", f.Symbol) @@ -41,29 +63,52 @@ func (f *DepthFrame) loadDepthSnapshot() { depth, err := f.fetch(f.context) if err != nil { log.WithError(err).Errorf("depth api error") - f.mu.Unlock() return } if len(depth.Asks) == 0 { log.Errorf("depth response error: empty asks") - f.mu.Unlock() return } if len(depth.Bids) == 0 { log.Errorf("depth response error: empty bids") - f.mu.Unlock() return } + if debugBinanceDepth { + log.Infof("loaded %s depth, last update ID = %d", f.Symbol, depth.FinalUpdateID) + } + + f.snapshotMutex.Lock() + f.snapshotDepth = depth + f.snapshotMutex.Unlock() + // filter the events by the event IDs + f.bufMutex.Lock() + bufEvents := f.bufEvents + f.bufEvents = nil + f.bufMutex.Unlock() + var events []DepthEvent - for _, e := range f.BufEvents { - if e.FirstUpdateID <= depth.FinalUpdateID || e.FinalUpdateID <= depth.FinalUpdateID { + for _, e := range bufEvents { + if e.FinalUpdateID < depth.FinalUpdateID { + if debugBinanceDepth { + log.Infof("DROP %s depth event (final update id is %d older than the last update), updateID %d ~ %d (len %d)", + f.Symbol, + depth.FinalUpdateID-e.FinalUpdateID, + e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + } + continue } + if debugBinanceDepth { + log.Infof("KEEP %s depth event, updateID %d ~ %d (len %d)", + f.Symbol, + e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + } + events = append(events, e) } @@ -72,85 +117,56 @@ func (f *DepthFrame) loadDepthSnapshot() { // if the head event is newer than the depth we got, // then there are something missed, we need to restart the process. if len(events) > 0 { + // The first processed event should have U (final update ID) <= lastUpdateId+1 AND (first update id) >= lastUpdateId+1. firstEvent := events[0] - if firstEvent.FirstUpdateID > depth.FinalUpdateID+1 { + + // valid + nextID := depth.FinalUpdateID + 1 + if firstEvent.FirstUpdateID > nextID || firstEvent.FinalUpdateID < nextID { log.Warn("miss matched final update id for order book, resetting depth...") - f.SnapshotDepth = nil - f.BufEvents = nil - f.mu.Unlock() + f.reset() return } + + if debugBinanceDepth { + log.Infof("VALID first %s depth event, updateID %d ~ %d (len %d)", + f.Symbol, + firstEvent.FirstUpdateID, firstEvent.FinalUpdateID, firstEvent.FinalUpdateID-firstEvent.FirstUpdateID) + } } - f.SnapshotDepth = depth - f.BufEvents = nil - f.mu.Unlock() + if debugBinanceDepth { + log.Infof("READY %s depth, %d bufferred events", f.Symbol, len(events)) + } f.EmitReady(*depth, events) } func (f *DepthFrame) PushEvent(e DepthEvent) { - f.mu.Lock() + f.snapshotMutex.Lock() + snapshot := f.snapshotDepth + f.snapshotMutex.Unlock() // before the snapshot is loaded, we need to buffer the events until we loaded the snapshot. - if f.SnapshotDepth == nil { + if snapshot == nil { // buffer the events until we loaded the snapshot - f.BufEvents = append(f.BufEvents, e) - f.mu.Unlock() + f.bufferEvent(e) - f.loadDepthSnapshot() - - // start a worker to update the snapshot periodically. go f.once.Do(func() { - if debugBinanceDepth { - log.Infof("starting depth snapshot updater for %s market", f.Symbol) - } - - ticker := time.NewTicker(30*time.Minute + time.Duration(rand.Intn(10))*time.Millisecond) - defer ticker.Stop() - for { - select { - case <-f.context.Done(): - return - - case <-ticker.C: - f.loadDepthSnapshot() - } - } + f.loadDepthSnapshot() }) - } else { - // if we have the snapshot, we could use that final update ID filter the events - - // too old: drop any update ID < the final update ID - if e.FinalUpdateID < f.SnapshotDepth.FinalUpdateID { - if debugBinanceDepth { - log.Warnf("event final update id %d < depth final update id %d, skip", e.FinalUpdateID, f.SnapshotDepth.FinalUpdateID) - } - - f.mu.Unlock() - return - } - - // too new: if the first update ID > final update ID + 1, it means something is missing, we need to reload. - if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID+1 { - if debugBinanceDepth { - log.Warnf("event first update id %d > final update id + 1 (%d), resetting snapshot", e.FirstUpdateID, f.SnapshotDepth.FirstUpdateID+1) - } - - f.SnapshotDepth = nil - - // save the new event for later - f.BufEvents = append(f.BufEvents, e) - f.mu.Unlock() - return - } - - // update the final update ID, so that we can check the next event - f.SnapshotDepth.FinalUpdateID = e.FinalUpdateID - f.mu.Unlock() - - f.EmitPush(e) + return } + + // drop old events + if e.FinalUpdateID <= snapshot.FinalUpdateID { + return + } + + f.snapshotMutex.Lock() + f.snapshotDepth.FinalUpdateID = e.FinalUpdateID + f.snapshotMutex.Unlock() + f.EmitPush(e) } // fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index c190690fe..41fe47f26 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -2,8 +2,8 @@ package binance import ( "context" - "fmt" "math/rand" + "net" "os" "strconv" "strings" @@ -42,10 +42,14 @@ type Stream struct { types.StandardStream - Client *binance.Client - ListenKey string - Conn *websocket.Conn - connLock sync.Mutex + Client *binance.Client + ListenKey string + Conn *websocket.Conn + connLock sync.Mutex + reconnectC chan struct{} + + connCtx context.Context + connCancel context.CancelFunc publicOnly bool @@ -66,9 +70,14 @@ func NewStream(client *binance.Client) *Stream { stream := &Stream{ Client: client, depthFrames: make(map[string]*DepthFrame), + reconnectC: make(chan struct{}, 1), } stream.OnDepthEvent(func(e *DepthEvent) { + if debugBinanceDepth { + log.Infof("received %s depth event updateID %d~%d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + } + f, ok := stream.depthFrames[e.Symbol] if !ok { f = &DepthFrame{ @@ -175,13 +184,14 @@ func NewStream(client *binance.Client) *Stream { } }) - stream.OnConnect(func() { - // reset the previous frames + stream.OnDisconnect(func() { + log.Infof("resetting depth snapshot...") for _, f := range stream.depthFrames { f.reset() - f.loadDepthSnapshot() } + }) + stream.OnConnect(func() { var params []string for _, subscription := range stream.Subscriptions { params = append(params, convertSubscription(subscription)) @@ -261,49 +271,11 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx) } -func (s *Stream) connect(ctx context.Context) error { - if s.publicOnly { - log.Infof("stream is set to public only mode") - } else { - log.Infof("request listen key for creating user data stream...") - - listenKey, err := s.fetchListenKey(ctx) - if err != nil { - return err - } - - s.ListenKey = listenKey - log.Infof("user data stream created. listenKey: %s", maskListenKey(s.ListenKey)) +func (s *Stream) emitReconnect() { + select { + case s.reconnectC <- struct{}{}: + default: } - - conn, err := s.dial(s.ListenKey) - if err != nil { - return err - } - - log.Infof("websocket connected") - - s.connLock.Lock() - s.Conn = conn - s.connLock.Unlock() - - s.EmitConnect() - return nil -} - -func convertSubscription(s types.Subscription) string { - // binance uses lower case symbol name, - // for kline, it's "@kline_" - // for depth, it's "@depth OR @depth@100ms" - switch s.Channel { - case types.KLineChannel: - return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String()) - - case types.BookChannel: - return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol)) - } - - return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel) } func (s *Stream) Connect(ctx context.Context) error { @@ -312,46 +284,140 @@ func (s *Stream) Connect(ctx context.Context) error { return err } - go s.read(ctx) + // start one re-connector goroutine with the base context + go s.reconnector(ctx) s.EmitStart() return nil } -func (s *Stream) read(ctx context.Context) { +func (s *Stream) reconnector(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return - pingTicker := time.NewTicker(10 * time.Second) + case <-s.reconnectC: + // ensure the previous context is cancelled + if s.connCancel != nil { + s.connCancel() + } + + log.Warnf("received reconnect signal, reconnecting...") + time.Sleep(3 * time.Second) + + if err := s.connect(ctx); err != nil { + log.WithError(err).Errorf("connect error, try to reconnect again...") + s.emitReconnect() + } + } + } +} + +func (s *Stream) connect(ctx context.Context) error { + // should only start one connection one time, so we lock the mutex + s.connLock.Lock() + + // create a new context + s.connCtx, s.connCancel = context.WithCancel(ctx) + + if s.publicOnly { + log.Infof("stream is set to public only mode") + } else { + log.Infof("request listen key for creating user data stream...") + + listenKey, err := s.fetchListenKey(ctx) + if err != nil { + s.connCancel() + s.connLock.Unlock() + return err + } + + s.ListenKey = listenKey + log.Infof("user data stream created. listenKey: %s", maskListenKey(s.ListenKey)) + + go s.listenKeyKeepAlive(s.connCtx, listenKey) + } + + // when in public mode, the listen key is an empty string + conn, err := s.dial(s.ListenKey) + if err != nil { + s.connCancel() + s.connLock.Unlock() + return err + } + + log.Infof("websocket connected") + + s.Conn = conn + s.connLock.Unlock() + + s.EmitConnect() + + go s.read(s.connCtx) + go s.ping(s.connCtx) + return nil +} + +func (s *Stream) ping(ctx context.Context) { + pingTicker := time.NewTicker(15 * time.Second) defer pingTicker.Stop() + for { + select { + + case <-ctx.Done(): + log.Info("ping worker stopped") + return + + case <-pingTicker.C: + s.connLock.Lock() + if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil { + log.WithError(err).Error("ping error", err) + s.emitReconnect() + } + s.connLock.Unlock() + } + } +} + +func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) { keepAliveTicker := time.NewTicker(5 * time.Minute) defer keepAliveTicker.Stop() - go func() { - for { - select { - - case <-ctx.Done(): - return - - case <-pingTicker.C: - s.connLock.Lock() - if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil { - log.WithError(err).Error("ping error", err) - } - - s.connLock.Unlock() - - case <-keepAliveTicker.C: - if !s.publicOnly { - if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil { - log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey)) - } - } - - } + // if we exit, we should invalidate the existing listen key + defer func() { + log.Info("keepalive worker stopped") + if err := s.invalidateListenKey(ctx, listenKey); err != nil { + log.WithError(err).Error("invalidate listen key error") } }() + for { + select { + + case <-ctx.Done(): + return + + case <-keepAliveTicker.C: + if err := s.keepaliveListenKey(ctx, listenKey); err != nil { + log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(listenKey)) + s.emitReconnect() + return + } + + } + } +} + +func (s *Stream) read(ctx context.Context) { + defer func() { + if s.connCancel != nil { + s.connCancel() + } + s.EmitDisconnect() + }() + for { select { @@ -359,39 +425,39 @@ func (s *Stream) read(ctx context.Context) { return default: - if err := s.Conn.SetReadDeadline(time.Now().Add(3 * time.Minute)); err != nil { + s.connLock.Lock() + if err := s.Conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { log.WithError(err).Errorf("set read deadline error: %s", err.Error()) } mt, message, err := s.Conn.ReadMessage() + s.connLock.Unlock() + if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - log.WithError(err).Errorf("read error: %s", err.Error()) - } else { - log.Info("websocket connection closed, going away") - } + // if it's a network timeout error, we should re-connect + switch err := err.(type) { - s.EmitDisconnect() - - // reconnect - for err != nil { - select { - case <-ctx.Done(): + // if it's a websocket related error + case *websocket.CloseError: + if err.Code == websocket.CloseNormalClosure { return - - default: - if !s.publicOnly { - if err := s.invalidateListenKey(ctx, s.ListenKey); err != nil { - log.WithError(err).Error("invalidate listen key error") - } - } - - err = s.connect(ctx) - time.Sleep(5 * time.Second) } - } - continue + // for unexpected close error, we should re-connect + // emit reconnect to start a new connection + s.emitReconnect() + return + + case net.Error: + log.WithError(err).Error("network error") + s.emitReconnect() + return + + default: + log.WithError(err).Error("unexpected connection error") + s.emitReconnect() + return + } } // skip non-text messages @@ -465,17 +531,14 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err func (s *Stream) Close() error { log.Infof("closing user data stream...") - if !s.publicOnly { - if err := s.invalidateListenKey(context.Background(), s.ListenKey); err != nil { - log.WithError(err).Error("invalidate listen key error") - } - log.Infof("user data stream closed") + if s.connCancel != nil { + s.connCancel() } s.connLock.Lock() - defer s.connLock.Unlock() - - return s.Conn.Close() + err := s.Conn.Close() + s.connLock.Unlock() + return err } func maskListenKey(listenKey string) string { From 686dcef2c5ce0fa9112a6d28437a6fbd02b6bec5 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 25 May 2021 21:36:14 +0800 Subject: [PATCH 2/2] binance: fix depth snapshot buffering --- examples/binance-book/main.go | 16 +++--------- pkg/exchange/binance/depthframe.go | 13 ++++++++++ pkg/exchange/binance/parse.go | 41 ++++++++++++++++++++++++------ pkg/exchange/binance/stream.go | 16 +++++++----- pkg/types/orderbook.go | 31 +++++++++++++++++++--- pkg/types/sliceorderbook.go | 12 +++------ 6 files changed, 90 insertions(+), 39 deletions(-) diff --git a/examples/binance-book/main.go b/examples/binance-book/main.go index f9aa1288a..42d669a07 100644 --- a/examples/binance-book/main.go +++ b/examples/binance-book/main.go @@ -48,16 +48,8 @@ var rootCmd = &cobra.Command{ stream.SetPublicOnly() stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) - stream.OnBookSnapshot(func(book types.SliceOrderBook) { - // log.Infof("book snapshot: %+v", book) - }) - - stream.OnBookUpdate(func(book types.SliceOrderBook) { - // log.Infof("book update: %+v", book) - }) - - streambook := types.NewStreamBook(symbol) - streambook.BindStream(stream) + streamBook := types.NewStreamBook(symbol) + streamBook.BindStream(stream) go func() { for { @@ -66,8 +58,8 @@ var rootCmd = &cobra.Command{ case <-ctx.Done(): return - case <-streambook.C: - book := streambook.Copy() + case <-streamBook.C: + book := streamBook.Copy() if valid, err := book.IsValid(); !valid { log.Errorf("order book is invalid, error: %v", err) diff --git a/pkg/exchange/binance/depthframe.go b/pkg/exchange/binance/depthframe.go index 8129a3fbc..85223ce63 100644 --- a/pkg/exchange/binance/depthframe.go +++ b/pkg/exchange/binance/depthframe.go @@ -160,6 +160,18 @@ func (f *DepthFrame) PushEvent(e DepthEvent) { // drop old events if e.FinalUpdateID <= snapshot.FinalUpdateID { + log.Infof("DROP %s depth update event, updateID %d ~ %d (len %d)", + f.Symbol, + e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + return + } + + if e.FirstUpdateID > snapshot.FinalUpdateID+1 { + log.Infof("MISSING %s depth update event, resetting, updateID %d ~ %d (len %d)", + f.Symbol, + e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + + f.reset() return } @@ -181,6 +193,7 @@ func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) { } event := DepthEvent{ + Symbol: f.Symbol, FirstUpdateID: 0, FinalUpdateID: response.LastUpdateID, } diff --git a/pkg/exchange/binance/parse.go b/pkg/exchange/binance/parse.go index b669b0b7b..9cc7f51f9 100644 --- a/pkg/exchange/binance/parse.go +++ b/pkg/exchange/binance/parse.go @@ -56,14 +56,14 @@ executionReport type ExecutionReportEvent struct { EventBase - Symbol string `json:"s"` - Side string `json:"S"` + Symbol string `json:"s"` + Side string `json:"S"` ClientOrderID string `json:"c"` OriginalClientOrderID string `json:"C"` OrderType string `json:"o"` - OrderCreationTime int64 `json:"O"` + OrderCreationTime int64 `json:"O"` TimeInForce string `json:"f"` IcebergQuantity string `json:"F"` @@ -71,13 +71,13 @@ type ExecutionReportEvent struct { OrderQuantity string `json:"q"` QuoteOrderQuantity string `json:"Q"` - OrderPrice string `json:"p"` - StopPrice string `json:"P"` + OrderPrice string `json:"p"` + StopPrice string `json:"P"` IsOnBook bool `json:"w"` - IsMaker bool `json:"m"` - Ignore bool `json:"M"` + IsMaker bool `json:"m"` + Ignore bool `json:"M"` CommissionAmount string `json:"n"` CommissionAsset string `json:"N"` @@ -319,17 +319,40 @@ type DepthEvent struct { Asks []DepthEntry } +func (e *DepthEvent) String() (o string) { + o += fmt.Sprintf("Depth %s bid/ask = ", e.Symbol) + + if len(e.Bids) == 0 { + o += "empty" + } else { + o += e.Bids[0].PriceLevel + } + + o += "/" + + if len(e.Asks) == 0 { + o += "empty" + } else { + o += e.Asks[0].PriceLevel + } + + o += fmt.Sprintf(" %d ~ %d", e.FirstUpdateID, e.FinalUpdateID) + return o +} + func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) { book.Symbol = e.Symbol for _, entry := range e.Bids { quantity, err := fixedpoint.NewFromString(entry.Quantity) if err != nil { + log.WithError(err).Errorf("depth quantity parse error: %s", entry.Quantity) continue } price, err := fixedpoint.NewFromString(entry.PriceLevel) if err != nil { + log.WithError(err).Errorf("depth price parse error: %s", entry.PriceLevel) continue } @@ -344,11 +367,13 @@ func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) { for _, entry := range e.Asks { quantity, err := fixedpoint.NewFromString(entry.Quantity) if err != nil { + log.WithError(err).Errorf("depth quantity parse error: %s", entry.Quantity) continue } price, err := fixedpoint.NewFromString(entry.PriceLevel) if err != nil { + log.WithError(err).Errorf("depth price parse error: %s", entry.PriceLevel) continue } @@ -360,7 +385,7 @@ func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) { book.Asks = book.Asks.Upsert(pv, false) } - return + return book, nil } func parseDepthEntry(val *fastjson.Value) (*DepthEntry, error) { diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 41fe47f26..73664afb6 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -75,7 +75,7 @@ func NewStream(client *binance.Client) *Stream { stream.OnDepthEvent(func(e *DepthEvent) { if debugBinanceDepth { - log.Infof("received %s depth event updateID %d~%d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + log.Infof("received %s depth event updateID %d ~ %d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) } f, ok := stream.depthFrames[e.Symbol] @@ -88,27 +88,29 @@ func NewStream(client *binance.Client) *Stream { stream.depthFrames[e.Symbol] = f - f.OnReady(func(e DepthEvent, bufEvents []DepthEvent) { - snapshot, err := e.OrderBook() + f.OnReady(func(snapshotDepth DepthEvent, bufEvents []DepthEvent) { + log.Infof("depth snapshot: %s", snapshotDepth.String()) + + snapshot, err := snapshotDepth.OrderBook() if err != nil { log.WithError(err).Error("book snapshot convert error") return } if valid, err := snapshot.IsValid(); !valid { - log.Warnf("depth snapshot is invalid, event: %+v, error: %v", e, err) + log.Errorf("depth snapshot is invalid, event: %+v, error: %v", snapshotDepth, err) } stream.EmitBookSnapshot(snapshot) for _, e := range bufEvents { - book, err := e.OrderBook() + bookUpdate, err := e.OrderBook() if err != nil { log.WithError(err).Error("book convert error") return } - stream.EmitBookUpdate(book) + stream.EmitBookUpdate(bookUpdate) } }) @@ -185,7 +187,7 @@ func NewStream(client *binance.Client) *Stream { }) stream.OnDisconnect(func() { - log.Infof("resetting depth snapshot...") + log.Infof("resetting depth snapshots...") for _, f := range stream.depthFrames { f.reset() } diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 8b7f30feb..b5e5602c5 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -52,6 +52,27 @@ func NewMutexOrderBook(symbol string) *MutexOrderBook { } } +func (b *MutexOrderBook) IsValid() (ok bool, err error) { + b.Lock() + ok, err = b.OrderBook.IsValid() + b.Unlock() + return ok, err +} + +func (b *MutexOrderBook) BestBid() (pv PriceVolume, ok bool) { + b.Lock() + pv, ok = b.OrderBook.BestBid() + b.Unlock() + return pv, ok +} + +func (b *MutexOrderBook) BestAsk() (pv PriceVolume, ok bool) { + b.Lock() + pv, ok = b.OrderBook.BestAsk() + b.Unlock() + return pv, ok +} + func (b *MutexOrderBook) Load(book SliceOrderBook) { b.Lock() b.OrderBook.Load(book) @@ -66,14 +87,16 @@ func (b *MutexOrderBook) Reset() { func (b *MutexOrderBook) CopyDepth(depth int) OrderBook { b.Lock() - defer b.Unlock() - return b.OrderBook.CopyDepth(depth) + book := b.OrderBook.CopyDepth(depth) + b.Unlock() + return book } func (b *MutexOrderBook) Copy() OrderBook { b.Lock() - defer b.Unlock() - return b.OrderBook.Copy() + book := b.OrderBook.Copy() + b.Unlock() + return book } func (b *MutexOrderBook) Update(update SliceOrderBook) { diff --git a/pkg/types/sliceorderbook.go b/pkg/types/sliceorderbook.go index 813e73fd1..24f1339d5 100644 --- a/pkg/types/sliceorderbook.go +++ b/pkg/types/sliceorderbook.go @@ -93,10 +93,10 @@ func (b *SliceOrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice { switch side { case SideTypeBuy: - return b.Bids + return b.Bids.Copy() case SideTypeSell: - return b.Asks + return b.Asks.Copy() } return nil @@ -122,11 +122,6 @@ func (b *SliceOrderBook) updateBids(pvs PriceVolumeSlice) { } } -func (b *SliceOrderBook) load(book SliceOrderBook) { - b.Reset() - b.update(book) -} - func (b *SliceOrderBook) update(book SliceOrderBook) { b.updateBids(book.Bids) b.updateAsks(book.Asks) @@ -138,7 +133,8 @@ func (b *SliceOrderBook) Reset() { } func (b *SliceOrderBook) Load(book SliceOrderBook) { - b.load(book) + b.Reset() + b.update(book) b.EmitLoad(b) }