add depth buffer logs

This commit is contained in:
c9s 2022-01-12 21:55:26 +08:00
parent 8c2228f428
commit 1a61935850
2 changed files with 13 additions and 8 deletions

View File

@ -123,6 +123,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
return fmt.Errorf("there is a missing update between %d and %d", u.FirstUpdateID, b.finalUpdateID+1) return fmt.Errorf("there is a missing update between %d and %d", u.FirstUpdateID, b.finalUpdateID+1)
} }
log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID)
b.finalUpdateID = u.FinalUpdateID b.finalUpdateID = u.FinalUpdateID
b.EmitPush(u) b.EmitPush(u)
b.mu.Unlock() b.mu.Unlock()
@ -130,13 +131,12 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
} }
func (b *Buffer) fetchAndPush() error { func (b *Buffer) fetchAndPush() error {
log.Info("fetching depth snapshot...")
book, finalUpdateID, err := b.fetcher() book, finalUpdateID, err := b.fetcher()
if err != nil { if err != nil {
return err return err
} }
log.Infof("fetched depth snapshot, final update id %d", finalUpdateID) log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID)
b.mu.Lock() b.mu.Lock()
if len(b.buffer) > 0 { if len(b.buffer) > 0 {
@ -151,6 +151,11 @@ func (b *Buffer) fetchAndPush() error {
var pushUpdates []Update var pushUpdates []Update
for _, u := range b.buffer { for _, u := range b.buffer {
// skip old events
if u.FirstUpdateID < finalUpdateID+1 {
continue
}
if u.FirstUpdateID > finalUpdateID+1 { if u.FirstUpdateID > finalUpdateID+1 {
b.resetSnapshot() b.resetSnapshot()
b.emitReset() b.emitReset()
@ -158,10 +163,6 @@ func (b *Buffer) fetchAndPush() error {
return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID) return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
} }
if u.FirstUpdateID < finalUpdateID+1 {
continue
}
pushUpdates = append(pushUpdates, u) pushUpdates = append(pushUpdates, u)
// update the final update id to the correct final update id // update the final update id to the correct final update id

View File

@ -79,19 +79,23 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
stream.OnDepthEvent(func(e *DepthEvent) { stream.OnDepthEvent(func(e *DepthEvent) {
f, ok := stream.depthBuffers[e.Symbol] f, ok := stream.depthBuffers[e.Symbol]
if ok { if ok {
f.AddUpdate(types.SliceOrderBook{ err := f.AddUpdate(types.SliceOrderBook{
Symbol: e.Symbol, Symbol: e.Symbol,
Bids: e.Bids, Bids: e.Bids,
Asks: e.Asks, Asks: e.Asks,
}, e.FirstUpdateID, e.FinalUpdateID) }, e.FirstUpdateID, e.FinalUpdateID)
if err != nil {
log.WithError(err).Errorf("found missing %s update event", e.Symbol)
}
} else { } else {
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
log.Infof("fetching %s depth...", e.Symbol)
return ex.QueryDepth(context.Background(), e.Symbol) return ex.QueryDepth(context.Background(), e.Symbol)
}) })
f.SetBufferingPeriod(time.Second) f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid { if valid, err := snapshot.IsValid(); !valid {
log.Errorf("depth snapshot is invalid, error: %v", err) log.Errorf("%s depth snapshot is invalid, error: %v", e.Symbol, err)
return return
} }