binance: fix depth snapshot buffering

This commit is contained in:
c9s 2021-05-25 21:36:14 +08:00
parent d3f06bc9d7
commit 686dcef2c5
6 changed files with 90 additions and 39 deletions

View File

@ -48,16 +48,8 @@ var rootCmd = &cobra.Command{
stream.SetPublicOnly()
stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
stream.OnBookSnapshot(func(book types.SliceOrderBook) {
// log.Infof("book snapshot: %+v", book)
})
stream.OnBookUpdate(func(book types.SliceOrderBook) {
// log.Infof("book update: %+v", book)
})
streambook := types.NewStreamBook(symbol)
streambook.BindStream(stream)
streamBook := types.NewStreamBook(symbol)
streamBook.BindStream(stream)
go func() {
for {
@ -66,8 +58,8 @@ var rootCmd = &cobra.Command{
case <-ctx.Done():
return
case <-streambook.C:
book := streambook.Copy()
case <-streamBook.C:
book := streamBook.Copy()
if valid, err := book.IsValid(); !valid {
log.Errorf("order book is invalid, error: %v", err)

View File

@ -160,6 +160,18 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
// drop old events
if e.FinalUpdateID <= snapshot.FinalUpdateID {
log.Infof("DROP %s depth update event, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
return
}
if e.FirstUpdateID > snapshot.FinalUpdateID+1 {
log.Infof("MISSING %s depth update event, resetting, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
f.reset()
return
}
@ -181,6 +193,7 @@ func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) {
}
event := DepthEvent{
Symbol: f.Symbol,
FirstUpdateID: 0,
FinalUpdateID: response.LastUpdateID,
}

View File

@ -319,17 +319,40 @@ type DepthEvent struct {
Asks []DepthEntry
}
func (e *DepthEvent) String() (o string) {
o += fmt.Sprintf("Depth %s bid/ask = ", e.Symbol)
if len(e.Bids) == 0 {
o += "empty"
} else {
o += e.Bids[0].PriceLevel
}
o += "/"
if len(e.Asks) == 0 {
o += "empty"
} else {
o += e.Asks[0].PriceLevel
}
o += fmt.Sprintf(" %d ~ %d", e.FirstUpdateID, e.FinalUpdateID)
return o
}
func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) {
book.Symbol = e.Symbol
for _, entry := range e.Bids {
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
log.WithError(err).Errorf("depth quantity parse error: %s", entry.Quantity)
continue
}
price, err := fixedpoint.NewFromString(entry.PriceLevel)
if err != nil {
log.WithError(err).Errorf("depth price parse error: %s", entry.PriceLevel)
continue
}
@ -344,11 +367,13 @@ func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) {
for _, entry := range e.Asks {
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
log.WithError(err).Errorf("depth quantity parse error: %s", entry.Quantity)
continue
}
price, err := fixedpoint.NewFromString(entry.PriceLevel)
if err != nil {
log.WithError(err).Errorf("depth price parse error: %s", entry.PriceLevel)
continue
}
@ -360,7 +385,7 @@ func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) {
book.Asks = book.Asks.Upsert(pv, false)
}
return
return book, nil
}
func parseDepthEntry(val *fastjson.Value) (*DepthEntry, error) {

View File

@ -88,27 +88,29 @@ func NewStream(client *binance.Client) *Stream {
stream.depthFrames[e.Symbol] = f
f.OnReady(func(e DepthEvent, bufEvents []DepthEvent) {
snapshot, err := e.OrderBook()
f.OnReady(func(snapshotDepth DepthEvent, bufEvents []DepthEvent) {
log.Infof("depth snapshot: %s", snapshotDepth.String())
snapshot, err := snapshotDepth.OrderBook()
if err != nil {
log.WithError(err).Error("book snapshot convert error")
return
}
if valid, err := snapshot.IsValid(); !valid {
log.Warnf("depth snapshot is invalid, event: %+v, error: %v", e, err)
log.Errorf("depth snapshot is invalid, event: %+v, error: %v", snapshotDepth, err)
}
stream.EmitBookSnapshot(snapshot)
for _, e := range bufEvents {
book, err := e.OrderBook()
bookUpdate, err := e.OrderBook()
if err != nil {
log.WithError(err).Error("book convert error")
return
}
stream.EmitBookUpdate(book)
stream.EmitBookUpdate(bookUpdate)
}
})
@ -185,7 +187,7 @@ func NewStream(client *binance.Client) *Stream {
})
stream.OnDisconnect(func() {
log.Infof("resetting depth snapshot...")
log.Infof("resetting depth snapshots...")
for _, f := range stream.depthFrames {
f.reset()
}

View File

@ -52,6 +52,27 @@ func NewMutexOrderBook(symbol string) *MutexOrderBook {
}
}
func (b *MutexOrderBook) IsValid() (ok bool, err error) {
b.Lock()
ok, err = b.OrderBook.IsValid()
b.Unlock()
return ok, err
}
func (b *MutexOrderBook) BestBid() (pv PriceVolume, ok bool) {
b.Lock()
pv, ok = b.OrderBook.BestBid()
b.Unlock()
return pv, ok
}
func (b *MutexOrderBook) BestAsk() (pv PriceVolume, ok bool) {
b.Lock()
pv, ok = b.OrderBook.BestAsk()
b.Unlock()
return pv, ok
}
func (b *MutexOrderBook) Load(book SliceOrderBook) {
b.Lock()
b.OrderBook.Load(book)
@ -66,14 +87,16 @@ func (b *MutexOrderBook) Reset() {
func (b *MutexOrderBook) CopyDepth(depth int) OrderBook {
b.Lock()
defer b.Unlock()
return b.OrderBook.CopyDepth(depth)
book := b.OrderBook.CopyDepth(depth)
b.Unlock()
return book
}
func (b *MutexOrderBook) Copy() OrderBook {
b.Lock()
defer b.Unlock()
return b.OrderBook.Copy()
book := b.OrderBook.Copy()
b.Unlock()
return book
}
func (b *MutexOrderBook) Update(update SliceOrderBook) {

View File

@ -93,10 +93,10 @@ func (b *SliceOrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice {
switch side {
case SideTypeBuy:
return b.Bids
return b.Bids.Copy()
case SideTypeSell:
return b.Asks
return b.Asks.Copy()
}
return nil
@ -122,11 +122,6 @@ func (b *SliceOrderBook) updateBids(pvs PriceVolumeSlice) {
}
}
func (b *SliceOrderBook) load(book SliceOrderBook) {
b.Reset()
b.update(book)
}
func (b *SliceOrderBook) update(book SliceOrderBook) {
b.updateBids(book.Bids)
b.updateAsks(book.Asks)
@ -138,7 +133,8 @@ func (b *SliceOrderBook) Reset() {
}
func (b *SliceOrderBook) Load(book SliceOrderBook) {
b.load(book)
b.Reset()
b.update(book)
b.EmitLoad(b)
}