fix depth reset

This commit is contained in:
c9s 2021-05-26 01:20:24 +08:00
parent 44ff833c91
commit 07ded04a9b
2 changed files with 34 additions and 14 deletions

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/adshao/go-binance/v2" "github.com/adshao/go-binance/v2"
"github.com/pkg/errors"
) )
//go:generate callbackgen -type DepthFrame //go:generate callbackgen -type DepthFrame
@ -21,7 +22,8 @@ type DepthFrame struct {
bufMutex sync.Mutex bufMutex sync.Mutex
bufEvents []DepthEvent bufEvents []DepthEvent
once sync.Once resetC chan struct{}
once sync.Once
readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent) readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent)
pushCallbacks []func(e DepthEvent) pushCallbacks []func(e DepthEvent)
@ -42,6 +44,13 @@ func (f *DepthFrame) reset() {
f.snapshotMutex.Unlock() f.snapshotMutex.Unlock()
} }
func (f *DepthFrame) emitReset() {
select {
case f.resetC <- struct{}{}:
default:
}
}
func (f *DepthFrame) bufferEvent(e DepthEvent) { func (f *DepthFrame) bufferEvent(e DepthEvent) {
if debugBinanceDepth { if debugBinanceDepth {
log.Infof("buffering %s depth event FirstUpdateID = %d, FinalUpdateID = %d", f.Symbol, e.FirstUpdateID, e.FinalUpdateID) log.Infof("buffering %s depth event FirstUpdateID = %d, FinalUpdateID = %d", f.Symbol, e.FirstUpdateID, e.FinalUpdateID)
@ -52,7 +61,7 @@ func (f *DepthFrame) bufferEvent(e DepthEvent) {
f.bufMutex.Unlock() f.bufMutex.Unlock()
} }
func (f *DepthFrame) loadDepthSnapshot() { func (f *DepthFrame) loadDepthSnapshot() error {
if debugBinanceDepth { if debugBinanceDepth {
log.Infof("buffering %s depth events...", f.Symbol) log.Infof("buffering %s depth events...", f.Symbol)
} }
@ -66,17 +75,16 @@ func (f *DepthFrame) loadDepthSnapshot() {
depth, err := f.fetch(f.context) depth, err := f.fetch(f.context)
if err != nil { if err != nil {
log.WithError(err).Errorf("depth api error") log.WithError(err).Errorf("depth api error")
return return err
} }
if len(depth.Asks) == 0 { if len(depth.Asks) == 0 {
log.Errorf("depth response error: empty asks") log.Errorf("depth response error: empty asks")
return return errors.New("depth response error: empty asks")
} }
if len(depth.Bids) == 0 { if len(depth.Bids) == 0 {
log.Errorf("depth response error: empty bids") return errors.New("depth response error: empty bids")
return
} }
if debugBinanceDepth { if debugBinanceDepth {
@ -123,7 +131,7 @@ func (f *DepthFrame) loadDepthSnapshot() {
nextID := depth.FinalUpdateID + 1 nextID := depth.FinalUpdateID + 1
if firstEvent.FirstUpdateID > nextID || firstEvent.FinalUpdateID < nextID { if firstEvent.FirstUpdateID > nextID || firstEvent.FinalUpdateID < nextID {
log.Warn("MISMATCH final update id for order book, resetting depth...") log.Warn("MISMATCH final update id for order book, resetting depth...")
return return errors.New("MISMATCH final update id for order book, resetting depth...")
} }
if debugBinanceDepth { if debugBinanceDepth {
@ -142,6 +150,7 @@ func (f *DepthFrame) loadDepthSnapshot() {
f.snapshotMutex.Unlock() f.snapshotMutex.Unlock()
f.EmitReady(*depth, events) f.EmitReady(*depth, events)
return nil
} }
func (f *DepthFrame) PushEvent(e DepthEvent) { func (f *DepthFrame) PushEvent(e DepthEvent) {
@ -151,10 +160,20 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
// before the snapshot is loaded, we need to buffer the events until we loaded the snapshot. // before the snapshot is loaded, we need to buffer the events until we loaded the snapshot.
if snapshot == nil { if snapshot == nil {
// buffer the events until we loaded the snapshot
f.bufferEvent(e) select {
f.once.Do(func() { case <-f.resetC:
f.loadDepthSnapshot() f.reset()
default:
// buffer the events until we loaded the snapshot
f.bufferEvent(e)
}
go f.once.Do(func() {
if err := f.loadDepthSnapshot(); err != nil {
log.WithError(err).Error("depth snapshot load failed, resetting..")
f.emitReset()
}
}) })
return return
} }
@ -172,7 +191,7 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
f.Symbol, f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
f.reset() f.emitReset()
return return
} }

View File

@ -84,12 +84,13 @@ func NewStream(client *binance.Client) *Stream {
client: client, client: client,
context: context.Background(), context: context.Background(),
Symbol: e.Symbol, Symbol: e.Symbol,
resetC: make(chan struct{}, 1),
} }
stream.depthFrames[e.Symbol] = f stream.depthFrames[e.Symbol] = f
f.OnReady(func(snapshotDepth DepthEvent, bufEvents []DepthEvent) { f.OnReady(func(snapshotDepth DepthEvent, bufEvents []DepthEvent) {
log.Infof("depth snapshot: %s", snapshotDepth.String()) log.Infof("depth snapshot ready: %s", snapshotDepth.String())
snapshot, err := snapshotDepth.OrderBook() snapshot, err := snapshotDepth.OrderBook()
if err != nil { if err != nil {
@ -189,7 +190,7 @@ func NewStream(client *binance.Client) *Stream {
stream.OnDisconnect(func() { stream.OnDisconnect(func() {
log.Infof("resetting depth snapshots...") log.Infof("resetting depth snapshots...")
for _, f := range stream.depthFrames { for _, f := range stream.depthFrames {
f.reset() f.emitReset()
} }
}) })