diff --git a/examples/binance-book/main.go b/examples/binance-book/main.go index f9aa1288a..42d669a07 100644 --- a/examples/binance-book/main.go +++ b/examples/binance-book/main.go @@ -48,16 +48,8 @@ var rootCmd = &cobra.Command{ stream.SetPublicOnly() stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) - stream.OnBookSnapshot(func(book types.SliceOrderBook) { - // log.Infof("book snapshot: %+v", book) - }) - - stream.OnBookUpdate(func(book types.SliceOrderBook) { - // log.Infof("book update: %+v", book) - }) - - streambook := types.NewStreamBook(symbol) - streambook.BindStream(stream) + streamBook := types.NewStreamBook(symbol) + streamBook.BindStream(stream) go func() { for { @@ -66,8 +58,8 @@ var rootCmd = &cobra.Command{ case <-ctx.Done(): return - case <-streambook.C: - book := streambook.Copy() + case <-streamBook.C: + book := streamBook.Copy() if valid, err := book.IsValid(); !valid { log.Errorf("order book is invalid, error: %v", err) diff --git a/pkg/exchange/binance/depthframe.go b/pkg/exchange/binance/depthframe.go index 8129a3fbc..85223ce63 100644 --- a/pkg/exchange/binance/depthframe.go +++ b/pkg/exchange/binance/depthframe.go @@ -160,6 +160,18 @@ func (f *DepthFrame) PushEvent(e DepthEvent) { // drop old events if e.FinalUpdateID <= snapshot.FinalUpdateID { + log.Infof("DROP %s depth update event, updateID %d ~ %d (len %d)", + f.Symbol, + e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + return + } + + if e.FirstUpdateID > snapshot.FinalUpdateID+1 { + log.Infof("MISSING %s depth update event, resetting, updateID %d ~ %d (len %d)", + f.Symbol, + e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + + f.reset() return } @@ -181,6 +193,7 @@ func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) { } event := DepthEvent{ + Symbol: f.Symbol, FirstUpdateID: 0, FinalUpdateID: response.LastUpdateID, } diff --git a/pkg/exchange/binance/parse.go b/pkg/exchange/binance/parse.go index b669b0b7b..9cc7f51f9 100644 --- a/pkg/exchange/binance/parse.go +++ b/pkg/exchange/binance/parse.go @@ -56,14 +56,14 @@ executionReport type ExecutionReportEvent struct { EventBase - Symbol string `json:"s"` - Side string `json:"S"` + Symbol string `json:"s"` + Side string `json:"S"` ClientOrderID string `json:"c"` OriginalClientOrderID string `json:"C"` OrderType string `json:"o"` - OrderCreationTime int64 `json:"O"` + OrderCreationTime int64 `json:"O"` TimeInForce string `json:"f"` IcebergQuantity string `json:"F"` @@ -71,13 +71,13 @@ type ExecutionReportEvent struct { OrderQuantity string `json:"q"` QuoteOrderQuantity string `json:"Q"` - OrderPrice string `json:"p"` - StopPrice string `json:"P"` + OrderPrice string `json:"p"` + StopPrice string `json:"P"` IsOnBook bool `json:"w"` - IsMaker bool `json:"m"` - Ignore bool `json:"M"` + IsMaker bool `json:"m"` + Ignore bool `json:"M"` CommissionAmount string `json:"n"` CommissionAsset string `json:"N"` @@ -319,17 +319,40 @@ type DepthEvent struct { Asks []DepthEntry } +func (e *DepthEvent) String() (o string) { + o += fmt.Sprintf("Depth %s bid/ask = ", e.Symbol) + + if len(e.Bids) == 0 { + o += "empty" + } else { + o += e.Bids[0].PriceLevel + } + + o += "/" + + if len(e.Asks) == 0 { + o += "empty" + } else { + o += e.Asks[0].PriceLevel + } + + o += fmt.Sprintf(" %d ~ %d", e.FirstUpdateID, e.FinalUpdateID) + return o +} + func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) { book.Symbol = e.Symbol for _, entry := range e.Bids { quantity, err := fixedpoint.NewFromString(entry.Quantity) if err != nil { + log.WithError(err).Errorf("depth quantity parse error: %s", entry.Quantity) continue } price, err := fixedpoint.NewFromString(entry.PriceLevel) if err != nil { + log.WithError(err).Errorf("depth price parse error: %s", entry.PriceLevel) continue } @@ -344,11 +367,13 @@ func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) { for _, entry := range e.Asks { quantity, err := fixedpoint.NewFromString(entry.Quantity) if err != nil { + log.WithError(err).Errorf("depth quantity parse error: %s", entry.Quantity) continue } price, err := fixedpoint.NewFromString(entry.PriceLevel) if err != nil { + log.WithError(err).Errorf("depth price parse error: %s", entry.PriceLevel) continue } @@ -360,7 +385,7 @@ func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) { book.Asks = book.Asks.Upsert(pv, false) } - return + return book, nil } func parseDepthEntry(val *fastjson.Value) (*DepthEntry, error) { diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 41fe47f26..73664afb6 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -75,7 +75,7 @@ func NewStream(client *binance.Client) *Stream { stream.OnDepthEvent(func(e *DepthEvent) { if debugBinanceDepth { - log.Infof("received %s depth event updateID %d~%d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) + log.Infof("received %s depth event updateID %d ~ %d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) } f, ok := stream.depthFrames[e.Symbol] @@ -88,27 +88,29 @@ func NewStream(client *binance.Client) *Stream { stream.depthFrames[e.Symbol] = f - f.OnReady(func(e DepthEvent, bufEvents []DepthEvent) { - snapshot, err := e.OrderBook() + f.OnReady(func(snapshotDepth DepthEvent, bufEvents []DepthEvent) { + log.Infof("depth snapshot: %s", snapshotDepth.String()) + + snapshot, err := snapshotDepth.OrderBook() if err != nil { log.WithError(err).Error("book snapshot convert error") return } if valid, err := snapshot.IsValid(); !valid { - log.Warnf("depth snapshot is invalid, event: %+v, error: %v", e, err) + log.Errorf("depth snapshot is invalid, event: %+v, error: %v", snapshotDepth, err) } stream.EmitBookSnapshot(snapshot) for _, e := range bufEvents { - book, err := e.OrderBook() + bookUpdate, err := e.OrderBook() if err != nil { log.WithError(err).Error("book convert error") return } - stream.EmitBookUpdate(book) + stream.EmitBookUpdate(bookUpdate) } }) @@ -185,7 +187,7 @@ func NewStream(client *binance.Client) *Stream { }) stream.OnDisconnect(func() { - log.Infof("resetting depth snapshot...") + log.Infof("resetting depth snapshots...") for _, f := range stream.depthFrames { f.reset() } diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 8b7f30feb..b5e5602c5 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -52,6 +52,27 @@ func NewMutexOrderBook(symbol string) *MutexOrderBook { } } +func (b *MutexOrderBook) IsValid() (ok bool, err error) { + b.Lock() + ok, err = b.OrderBook.IsValid() + b.Unlock() + return ok, err +} + +func (b *MutexOrderBook) BestBid() (pv PriceVolume, ok bool) { + b.Lock() + pv, ok = b.OrderBook.BestBid() + b.Unlock() + return pv, ok +} + +func (b *MutexOrderBook) BestAsk() (pv PriceVolume, ok bool) { + b.Lock() + pv, ok = b.OrderBook.BestAsk() + b.Unlock() + return pv, ok +} + func (b *MutexOrderBook) Load(book SliceOrderBook) { b.Lock() b.OrderBook.Load(book) @@ -66,14 +87,16 @@ func (b *MutexOrderBook) Reset() { func (b *MutexOrderBook) CopyDepth(depth int) OrderBook { b.Lock() - defer b.Unlock() - return b.OrderBook.CopyDepth(depth) + book := b.OrderBook.CopyDepth(depth) + b.Unlock() + return book } func (b *MutexOrderBook) Copy() OrderBook { b.Lock() - defer b.Unlock() - return b.OrderBook.Copy() + book := b.OrderBook.Copy() + b.Unlock() + return book } func (b *MutexOrderBook) Update(update SliceOrderBook) { diff --git a/pkg/types/sliceorderbook.go b/pkg/types/sliceorderbook.go index 813e73fd1..24f1339d5 100644 --- a/pkg/types/sliceorderbook.go +++ b/pkg/types/sliceorderbook.go @@ -93,10 +93,10 @@ func (b *SliceOrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice { switch side { case SideTypeBuy: - return b.Bids + return b.Bids.Copy() case SideTypeSell: - return b.Asks + return b.Asks.Copy() } return nil @@ -122,11 +122,6 @@ func (b *SliceOrderBook) updateBids(pvs PriceVolumeSlice) { } } -func (b *SliceOrderBook) load(book SliceOrderBook) { - b.Reset() - b.update(book) -} - func (b *SliceOrderBook) update(book SliceOrderBook) { b.updateBids(book.Bids) b.updateAsks(book.Asks) @@ -138,7 +133,8 @@ func (b *SliceOrderBook) Reset() { } func (b *SliceOrderBook) Load(book SliceOrderBook) { - b.load(book) + b.Reset() + b.update(book) b.EmitLoad(b) }