fix binance depth snapshot updating

This commit is contained in:
c9s 2021-01-23 16:59:51 +08:00
parent d0fc161ae7
commit 6a6dacd595
3 changed files with 97 additions and 17 deletions

View File

@ -49,15 +49,36 @@ var rootCmd = &cobra.Command{
stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
stream.OnBookSnapshot(func(book types.OrderBook) {
log.Infof("book snapshot: %+v", book)
// log.Infof("book snapshot: %+v", book)
})
stream.OnBookUpdate(func(book types.OrderBook) {
log.Infof("book update: %+v", book)
// log.Infof("book update: %+v", book)
})
streambook := types.NewStreamBook(symbol)
streambook.BindStream(stream)
streambook.OnUpdate(func(book *types.OrderBook) {
bestBid, hasBid := book.BestBid()
bestAsk, hasAsk := book.BestAsk()
if !book.IsValid() {
log.Warnf("order book is invalid")
return
}
if hasBid && hasAsk {
log.Infof("================================")
log.Infof("best ask %f % -12f",
bestAsk.Price.Float64(),
bestAsk.Volume.Float64(),
)
log.Infof("best bid %f % -12f",
bestBid.Price.Float64(),
bestBid.Volume.Float64(),
)
}
})
log.Info("connecting websocket...")
if err := stream.Connect(ctx); err != nil {

View File

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"sync"
"time"
@ -16,9 +18,23 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
var debugBinanceDepth bool
func init() {
// randomize pulling
rand.Seed(time.Now().UnixNano())
if s := os.Getenv("BINANCE_DEBUG_DEPTH"); len(s) > 0 {
v, err := strconv.ParseBool(s)
if err != nil {
log.Error(err)
} else {
debugBinanceDepth = v
if debugBinanceDepth {
log.Info("binance depth debugging is enabled")
}
}
}
}
type StreamRequest struct {
@ -457,21 +473,27 @@ func (f *DepthFrame) loadDepthSnapshot() {
}
f.mu.Lock()
// since we're buffering the update events, ideally the some of the head events
// should be older than the received depth snapshot.
// if the head event is newer than the depth we got,
// then there are something missed, we need to restart the process.
if len(f.BufEvents) > 0 {
e := f.BufEvents[0]
if e.FirstUpdateID > depth.FinalUpdateID+1 {
log.Warn("miss matched final update id for order book")
f.SnapshotDepth = nil
f.BufEvents = nil
f.mu.Unlock()
return
}
}
f.SnapshotDepth = depth
// filter the events by the event IDs
var events []DepthEvent
for _, e := range f.BufEvents {
/*
if i == 0 {
if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID+1 {
// FIXME: we missed some events
log.Warn("miss matched final update id for order book")
f.SnapshotDepth = nil
f.mu.Unlock()
return
}
}
*/
if e.FirstUpdateID <= f.SnapshotDepth.FinalUpdateID || e.FinalUpdateID <= f.SnapshotDepth.FinalUpdateID {
continue
}
@ -490,10 +512,16 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
f.BufEvents = append(f.BufEvents, e)
f.mu.Unlock()
go f.once.Do(f.loadDepthSnapshot)
go func() {
go f.once.Do(func() {
if debugBinanceDepth {
log.Infof("starting depth snapshot updater for %s market", f.Symbol)
}
ctx := context.Background()
ticker := time.NewTicker(5*time.Minute + time.Duration(rand.Intn(10))*time.Second)
f.loadDepthSnapshot()
ticker := time.NewTicker(1*time.Minute + time.Duration(rand.Intn(10))*time.Second)
defer ticker.Stop()
for {
select {
@ -503,7 +531,7 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
f.loadDepthSnapshot()
}
}
}()
})
} else {
if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID || e.FinalUpdateID > f.SnapshotDepth.FinalUpdateID {
if e.FinalUpdateID > f.SnapshotDepth.FinalUpdateID {
@ -518,6 +546,10 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
// fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type
func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) {
if debugBinanceDepth {
log.Infof("fetching %s depth snapshot", f.Symbol)
}
response, err := f.client.NewDepthService().Symbol(f.Symbol).Do(ctx)
if err != nil {
return nil, err

View File

@ -123,6 +123,33 @@ type OrderBook struct {
asksChangeCallbacks []func(pvs PriceVolumeSlice)
}
func (b *OrderBook) BestBid() (PriceVolume, bool) {
if len(b.Bids) == 0 {
return PriceVolume{}, false
}
return b.Bids[0], true
}
func (b *OrderBook) BestAsk() (PriceVolume, bool) {
if len(b.Asks) == 0 {
return PriceVolume{}, false
}
return b.Asks[0], true
}
func (b *OrderBook) IsValid() bool {
bid, hasBid := b.BestBid()
ask, hasAsk := b.BestAsk()
if !hasBid || !hasAsk {
return false
}
return bid.Price < ask.Price
}
func (b *OrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice {
switch side {