diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go index 0b627f255..c960dfb2f 100644 --- a/pkg/depth/buffer.go +++ b/pkg/depth/buffer.go @@ -92,10 +92,6 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg Object: o, } - // we lock here because there might be 2+ calls to the AddUpdate method - // we don't want to reset sync.Once 2 times here - b.mu.Lock() - defer b.mu.Unlock() select { case <-b.resetC: log.Warnf("received depth reset signal, resetting...") @@ -106,11 +102,13 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg } // if the snapshot is set to nil, we need to buffer the message + b.mu.Lock() if b.snapshot == nil { b.buffer = append(b.buffer, u) b.once.Do(func() { go b.tryFetch() }) + b.mu.Unlock() return nil } @@ -121,6 +119,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg finalUpdateID = b.finalUpdateID b.resetSnapshot() b.emitReset() + b.mu.Unlock() return fmt.Errorf("found missing update between finalUpdateID %d and firstUpdateID %d, diff: %d", finalUpdateID+1, u.FirstUpdateID, @@ -129,18 +128,19 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID) b.finalUpdateID = u.FinalUpdateID + b.mu.Unlock() + b.EmitPush(u) return nil } func (b *Buffer) fetchAndPush() error { - b.mu.Lock() - defer b.mu.Unlock() book, finalUpdateID, err := b.fetcher() if err != nil { return err } + b.mu.Lock() log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID) if len(b.buffer) > 0 { @@ -148,6 +148,7 @@ func (b *Buffer) fetchAndPush() error { if finalUpdateID < b.buffer[0].FirstUpdateID { b.resetSnapshot() b.emitReset() + b.mu.Unlock() return fmt.Errorf("depth snapshot is too early, final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID) } } @@ -162,6 +163,7 @@ func (b *Buffer) fetchAndPush() error { if u.FirstUpdateID > finalUpdateID+1 { b.resetSnapshot() b.emitReset() + b.mu.Unlock() return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID) } @@ -180,6 +182,9 @@ func (b *Buffer) fetchAndPush() error { // set the snapshot b.snapshot = &book + b.mu.Unlock() + + // should unlock first then call ready b.EmitReady(book, pushUpdates) return nil } diff --git a/pkg/depth/buffer_test.go b/pkg/depth/buffer_test.go index 9c33effdd..949db0f21 100644 --- a/pkg/depth/buffer_test.go +++ b/pkg/depth/buffer_test.go @@ -1,3 +1,6 @@ +//go:build !race +// +build !race + package depth import ( @@ -5,9 +8,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" - "github.com/stretchr/testify/assert" ) var itov = fixedpoint.NewFromInt