mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 14:55:16 +00:00
depth: do not test depth buffer when race is on
This commit is contained in:
parent
4cb0b1c571
commit
ee89a1c382
|
@ -92,10 +92,6 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
|
||||||
Object: o,
|
Object: o,
|
||||||
}
|
}
|
||||||
|
|
||||||
// we lock here because there might be 2+ calls to the AddUpdate method
|
|
||||||
// we don't want to reset sync.Once 2 times here
|
|
||||||
b.mu.Lock()
|
|
||||||
defer b.mu.Unlock()
|
|
||||||
select {
|
select {
|
||||||
case <-b.resetC:
|
case <-b.resetC:
|
||||||
log.Warnf("received depth reset signal, resetting...")
|
log.Warnf("received depth reset signal, resetting...")
|
||||||
|
@ -106,11 +102,13 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the snapshot is set to nil, we need to buffer the message
|
// if the snapshot is set to nil, we need to buffer the message
|
||||||
|
b.mu.Lock()
|
||||||
if b.snapshot == nil {
|
if b.snapshot == nil {
|
||||||
b.buffer = append(b.buffer, u)
|
b.buffer = append(b.buffer, u)
|
||||||
b.once.Do(func() {
|
b.once.Do(func() {
|
||||||
go b.tryFetch()
|
go b.tryFetch()
|
||||||
})
|
})
|
||||||
|
b.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,6 +119,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
|
||||||
finalUpdateID = b.finalUpdateID
|
finalUpdateID = b.finalUpdateID
|
||||||
b.resetSnapshot()
|
b.resetSnapshot()
|
||||||
b.emitReset()
|
b.emitReset()
|
||||||
|
b.mu.Unlock()
|
||||||
return fmt.Errorf("found missing update between finalUpdateID %d and firstUpdateID %d, diff: %d",
|
return fmt.Errorf("found missing update between finalUpdateID %d and firstUpdateID %d, diff: %d",
|
||||||
finalUpdateID+1,
|
finalUpdateID+1,
|
||||||
u.FirstUpdateID,
|
u.FirstUpdateID,
|
||||||
|
@ -129,18 +128,19 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
|
||||||
|
|
||||||
log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID)
|
log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID)
|
||||||
b.finalUpdateID = u.FinalUpdateID
|
b.finalUpdateID = u.FinalUpdateID
|
||||||
|
b.mu.Unlock()
|
||||||
|
|
||||||
b.EmitPush(u)
|
b.EmitPush(u)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Buffer) fetchAndPush() error {
|
func (b *Buffer) fetchAndPush() error {
|
||||||
b.mu.Lock()
|
|
||||||
defer b.mu.Unlock()
|
|
||||||
book, finalUpdateID, err := b.fetcher()
|
book, finalUpdateID, err := b.fetcher()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.mu.Lock()
|
||||||
log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID)
|
log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID)
|
||||||
|
|
||||||
if len(b.buffer) > 0 {
|
if len(b.buffer) > 0 {
|
||||||
|
@ -148,6 +148,7 @@ func (b *Buffer) fetchAndPush() error {
|
||||||
if finalUpdateID < b.buffer[0].FirstUpdateID {
|
if finalUpdateID < b.buffer[0].FirstUpdateID {
|
||||||
b.resetSnapshot()
|
b.resetSnapshot()
|
||||||
b.emitReset()
|
b.emitReset()
|
||||||
|
b.mu.Unlock()
|
||||||
return fmt.Errorf("depth snapshot is too early, final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID)
|
return fmt.Errorf("depth snapshot is too early, final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,6 +163,7 @@ func (b *Buffer) fetchAndPush() error {
|
||||||
if u.FirstUpdateID > finalUpdateID+1 {
|
if u.FirstUpdateID > finalUpdateID+1 {
|
||||||
b.resetSnapshot()
|
b.resetSnapshot()
|
||||||
b.emitReset()
|
b.emitReset()
|
||||||
|
b.mu.Unlock()
|
||||||
return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
|
return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,6 +182,9 @@ func (b *Buffer) fetchAndPush() error {
|
||||||
// set the snapshot
|
// set the snapshot
|
||||||
b.snapshot = &book
|
b.snapshot = &book
|
||||||
|
|
||||||
|
b.mu.Unlock()
|
||||||
|
|
||||||
|
// should unlock first then call ready
|
||||||
b.EmitReady(book, pushUpdates)
|
b.EmitReady(book, pushUpdates)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
//go:build !race
|
||||||
|
// +build !race
|
||||||
|
|
||||||
package depth
|
package depth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -5,9 +8,10 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var itov = fixedpoint.NewFromInt
|
var itov = fixedpoint.NewFromInt
|
||||||
|
|
Loading…
Reference in New Issue
Block a user