diff --git a/examples/binance-book/main.go b/examples/binance-book/main.go index ff89b33c7..9823bc454 100644 --- a/examples/binance-book/main.go +++ b/examples/binance-book/main.go @@ -49,15 +49,36 @@ var rootCmd = &cobra.Command{ stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) stream.OnBookSnapshot(func(book types.OrderBook) { - log.Infof("book snapshot: %+v", book) + // log.Infof("book snapshot: %+v", book) }) stream.OnBookUpdate(func(book types.OrderBook) { - log.Infof("book update: %+v", book) + // log.Infof("book update: %+v", book) }) streambook := types.NewStreamBook(symbol) streambook.BindStream(stream) + streambook.OnUpdate(func(book *types.OrderBook) { + bestBid, hasBid := book.BestBid() + bestAsk, hasAsk := book.BestAsk() + + if !book.IsValid() { + log.Warnf("order book is invalid") + return + } + + if hasBid && hasAsk { + log.Infof("================================") + log.Infof("best ask %f % -12f", + bestAsk.Price.Float64(), + bestAsk.Volume.Float64(), + ) + log.Infof("best bid %f % -12f", + bestBid.Price.Float64(), + bestBid.Volume.Float64(), + ) + } + }) log.Info("connecting websocket...") if err := stream.Connect(ctx); err != nil { diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index b9eb94fd4..0ef5eaed3 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "math/rand" + "os" + "strconv" "strings" "sync" "time" @@ -16,9 +18,23 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +var debugBinanceDepth bool + func init() { // randomize pulling rand.Seed(time.Now().UnixNano()) + + if s := os.Getenv("BINANCE_DEBUG_DEPTH"); len(s) > 0 { + v, err := strconv.ParseBool(s) + if err != nil { + log.Error(err) + } else { + debugBinanceDepth = v + if debugBinanceDepth { + log.Info("binance depth debugging is enabled") + } + } + } } type StreamRequest struct { @@ -457,21 +473,27 @@ func (f *DepthFrame) loadDepthSnapshot() { } f.mu.Lock() + + // since we're buffering the update events, ideally the some of the head events + // should be older than the received depth snapshot. + // if the head event is newer than the depth we got, + // then there are something missed, we need to restart the process. + if len(f.BufEvents) > 0 { + e := f.BufEvents[0] + if e.FirstUpdateID > depth.FinalUpdateID+1 { + log.Warn("miss matched final update id for order book") + f.SnapshotDepth = nil + f.BufEvents = nil + f.mu.Unlock() + return + } + } + f.SnapshotDepth = depth + // filter the events by the event IDs var events []DepthEvent for _, e := range f.BufEvents { - /* - if i == 0 { - if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID+1 { - // FIXME: we missed some events - log.Warn("miss matched final update id for order book") - f.SnapshotDepth = nil - f.mu.Unlock() - return - } - } - */ if e.FirstUpdateID <= f.SnapshotDepth.FinalUpdateID || e.FinalUpdateID <= f.SnapshotDepth.FinalUpdateID { continue } @@ -490,10 +512,16 @@ func (f *DepthFrame) PushEvent(e DepthEvent) { f.BufEvents = append(f.BufEvents, e) f.mu.Unlock() - go f.once.Do(f.loadDepthSnapshot) - go func() { + go f.once.Do(func() { + if debugBinanceDepth { + log.Infof("starting depth snapshot updater for %s market", f.Symbol) + } + ctx := context.Background() - ticker := time.NewTicker(5*time.Minute + time.Duration(rand.Intn(10))*time.Second) + + f.loadDepthSnapshot() + + ticker := time.NewTicker(1*time.Minute + time.Duration(rand.Intn(10))*time.Second) defer ticker.Stop() for { select { @@ -503,7 +531,7 @@ func (f *DepthFrame) PushEvent(e DepthEvent) { f.loadDepthSnapshot() } } - }() + }) } else { if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID || e.FinalUpdateID > f.SnapshotDepth.FinalUpdateID { if e.FinalUpdateID > f.SnapshotDepth.FinalUpdateID { @@ -518,6 +546,10 @@ func (f *DepthFrame) PushEvent(e DepthEvent) { // fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) { + if debugBinanceDepth { + log.Infof("fetching %s depth snapshot", f.Symbol) + } + response, err := f.client.NewDepthService().Symbol(f.Symbol).Do(ctx) if err != nil { return nil, err diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 79c8a022d..66c1c05b9 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -123,6 +123,33 @@ type OrderBook struct { asksChangeCallbacks []func(pvs PriceVolumeSlice) } +func (b *OrderBook) BestBid() (PriceVolume, bool) { + if len(b.Bids) == 0 { + return PriceVolume{}, false + } + + return b.Bids[0], true +} + +func (b *OrderBook) BestAsk() (PriceVolume, bool) { + if len(b.Asks) == 0 { + return PriceVolume{}, false + } + + return b.Asks[0], true +} + +func (b *OrderBook) IsValid() bool { + bid, hasBid := b.BestBid() + ask, hasAsk := b.BestAsk() + + if !hasBid || !hasAsk { + return false + } + + return bid.Price < ask.Price +} + func (b *OrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice { switch side {