diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go new file mode 100644 index 000000000..30a72a869 --- /dev/null +++ b/pkg/depth/buffer.go @@ -0,0 +1,174 @@ +package depth + +import ( + "fmt" + "sync" + + "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" + log "github.com/sirupsen/logrus" +) + +type SnapshotFetcher func() (snapshot types.SliceOrderBook, finalUpdateID int64, err error) + +type Update struct { + FirstUpdateID, FinalUpdateID int64 + + // Object is the update object + Object types.SliceOrderBook +} + +//go:generate callbackgen -type Buffer +type Buffer struct { + Symbol string + buffer []Update + + finalUpdateID int64 + fetcher SnapshotFetcher + snapshot *types.SliceOrderBook + + resetCallbacks []func() + readyCallbacks []func(snapshot types.SliceOrderBook, updates []Update) + pushCallbacks []func(update Update) + + resetC chan struct{} + mu sync.Mutex + once util.Reonce +} + +func NewDepthBuffer(symbol string, fetcher SnapshotFetcher) *Buffer { + return &Buffer{ + Symbol: symbol, + fetcher: fetcher, + resetC: make(chan struct{}, 1), + } +} + +func (b *Buffer) resetSnapshot() { + b.snapshot = nil + b.finalUpdateID = 0 + b.EmitReset() +} + +func (b *Buffer) emitReset() { + select { + case b.resetC <- struct{}{}: + default: + } +} + +// AddUpdate adds the update to the buffer or push the update to the subscriber +func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error { + finalUpdateID := firstUpdateID + if len(finalArgs) > 0 { + finalUpdateID = finalArgs[0] + } + + u := Update{ + FirstUpdateID: firstUpdateID, + FinalUpdateID: finalUpdateID, + Object: o, + } + + // we lock here because there might be 2+ calls to the AddUpdate method + // we don't want to reset sync.Once 2 times here + b.mu.Lock() + select { + case <-b.resetC: + log.Warnf("received depth reset signal, resetting...") + + // if the once goroutine is still running, overwriting this once might cause "unlock of unlocked mutex" panic. + b.once.Reset() + default: + } + + // if the snapshot is set to nil, we need to buffer the message + if b.snapshot == nil { + b.buffer = append(b.buffer, u) + b.once.Do(func() { + go b.tryFetch() + }) + b.mu.Unlock() + return nil + } + + // if there is a missing update, we should reset the snapshot and re-fetch the snapshot + if u.FirstUpdateID > b.finalUpdateID+1 { + // emitReset will reset the once outside the mutex lock section + b.resetSnapshot() + b.buffer = nil + b.buffer = append(b.buffer, u) + b.emitReset() + b.mu.Unlock() + return fmt.Errorf("there is a missing update between %d and %d", u.FirstUpdateID, b.finalUpdateID+1) + } + + b.finalUpdateID = u.FinalUpdateID + b.EmitPush(u) + b.mu.Unlock() + return nil +} + +func (b *Buffer) fetchAndPush() error { + log.Info("fetching depth snapshot...") + book, finalUpdateID, err := b.fetcher() + if err != nil { + return err + } + + log.Infof("fetched depth snapshot, final update id %d", finalUpdateID) + + b.mu.Lock() + if len(b.buffer) > 0 { + // the snapshot is too early + if finalUpdateID < b.buffer[0].FirstUpdateID { + b.resetSnapshot() + b.emitReset() + b.mu.Unlock() + return fmt.Errorf("depth final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID) + } + } + + var pushUpdates []Update + for _, u := range b.buffer { + if u.FirstUpdateID > finalUpdateID+1 { + b.resetSnapshot() + b.emitReset() + b.mu.Unlock() + return fmt.Errorf("the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID) + } + + if u.FirstUpdateID < finalUpdateID+1 { + continue + } + + pushUpdates = append(pushUpdates, u) + + // update the final update id to the correct final update id + finalUpdateID = u.FinalUpdateID + } + + // clean the buffer since we have filtered out the buffer we want + b.buffer = nil + + // set the final update ID so that we will know if there is an update missing + b.finalUpdateID = finalUpdateID + + // set the snapshot + b.snapshot = &book + + b.mu.Unlock() + b.EmitReady(book, pushUpdates) + return nil +} + +func (b *Buffer) tryFetch() { + for { + err := b.fetchAndPush() + if err != nil { + log.WithError(err).Errorf("snapshot fetch failed") + continue + } + break + } +} diff --git a/pkg/depth/buffer_callbacks.go b/pkg/depth/buffer_callbacks.go new file mode 100644 index 000000000..bf6527166 --- /dev/null +++ b/pkg/depth/buffer_callbacks.go @@ -0,0 +1,37 @@ +// Code generated by "callbackgen -type Buffer"; DO NOT EDIT. + +package depth + +import ( + "github.com/c9s/bbgo/pkg/types" +) + +func (b *Buffer) OnReset(cb func()) { + b.resetCallbacks = append(b.resetCallbacks, cb) +} + +func (b *Buffer) EmitReset() { + for _, cb := range b.resetCallbacks { + cb() + } +} + +func (b *Buffer) OnReady(cb func(snapshot types.SliceOrderBook, updates []Update)) { + b.readyCallbacks = append(b.readyCallbacks, cb) +} + +func (b *Buffer) EmitReady(snapshot types.SliceOrderBook, updates []Update) { + for _, cb := range b.readyCallbacks { + cb(snapshot, updates) + } +} + +func (b *Buffer) OnPush(cb func(update Update)) { + b.pushCallbacks = append(b.pushCallbacks, cb) +} + +func (b *Buffer) EmitPush(update Update) { + for _, cb := range b.pushCallbacks { + cb(update) + } +} diff --git a/pkg/depth/buffer_test.go b/pkg/depth/buffer_test.go new file mode 100644 index 000000000..e538733db --- /dev/null +++ b/pkg/depth/buffer_test.go @@ -0,0 +1,151 @@ +package depth + +import ( + "context" + "testing" + "time" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" + "github.com/stretchr/testify/assert" +) + +func TestDepthBuffer_ReadyState(t *testing.T) { + buf := NewDepthBuffer("", func() (book types.SliceOrderBook, finalID int64, err error) { + return types.SliceOrderBook{ + Bids: types.PriceVolumeSlice{ + {Price: 100, Volume: 1}, + }, + Asks: types.PriceVolumeSlice{ + {Price: 99, Volume: 1}, + }, + }, 33, nil + }) + + readyC := make(chan struct{}) + buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) { + assert.Greater(t, len(updates), 33) + close(readyC) + }) + + var updateID int64 = 1 + for ; updateID < 100; updateID++ { + buf.AddUpdate( + types.SliceOrderBook{ + Bids: types.PriceVolumeSlice{ + {Price: 100, Volume: fixedpoint.Value(updateID)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: 99, Volume: fixedpoint.Value(updateID)}, + }, + }, updateID) + } + + <-readyC +} + +func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) { + // snapshot starts from 30, + // the first ready event should have a snapshot(30) and updates (31~50) + var snapshotFinalID int64 = 0 + buf := NewDepthBuffer("", func() (types.SliceOrderBook, int64, error) { + snapshotFinalID += 30 + return types.SliceOrderBook{ + Bids: types.PriceVolumeSlice{ + {Price: 100, Volume: 1}, + }, + Asks: types.PriceVolumeSlice{ + {Price: 99, Volume: 1}, + }, + }, snapshotFinalID, nil + }) + + resetC := make(chan struct{}, 1) + + buf.OnReset(func() { + resetC <- struct{}{} + }) + + var updateID int64 = 10 + for ; updateID < 100; updateID++ { + if updateID == 50 { + updateID += 5 + } + + buf.AddUpdate(types.SliceOrderBook{ + Bids: types.PriceVolumeSlice{ + {Price: 100, Volume: fixedpoint.Value(updateID)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: 99, Volume: fixedpoint.Value(updateID)}, + }, + }, updateID) + } + + <-resetC +} + +func TestDepthBuffer_ConcurrentRun(t *testing.T) { + var snapshotFinalID int64 = 0 + buf := NewDepthBuffer("", func() (types.SliceOrderBook, int64, error) { + snapshotFinalID += 30 + time.Sleep(10 * time.Millisecond) + return types.SliceOrderBook{ + Bids: types.PriceVolumeSlice{ + {Price: 100, Volume: 1}, + }, + Asks: types.PriceVolumeSlice{ + {Price: 99, Volume: 1}, + }, + }, snapshotFinalID, nil + }) + + readyCnt := 0 + resetCnt := 0 + pushCnt := 0 + + buf.OnPush(func(update Update) { + pushCnt++ + }) + buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) { + readyCnt++ + assert.Greater(t, len(updates), 0) + }) + buf.OnReset(func() { + resetCnt++ + }) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + + var updateID int64 = 10 + + for { + select { + case <-ctx.Done(): + assert.Greater(t, readyCnt, 1) + assert.Greater(t, resetCnt, 1) + assert.Greater(t, pushCnt, 1) + return + + case <-ticker.C: + updateID++ + if updateID%100 == 0 { + updateID++ + } + + buf.AddUpdate(types.SliceOrderBook{ + Bids: types.PriceVolumeSlice{ + {Price: 100, Volume: fixedpoint.Value(updateID)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: 99, Volume: fixedpoint.Value(updateID)}, + }, + }, updateID) + + } + } +} diff --git a/pkg/util/reonce.go b/pkg/util/reonce.go new file mode 100644 index 000000000..f1fae5dda --- /dev/null +++ b/pkg/util/reonce.go @@ -0,0 +1,33 @@ +package util + +import ( + "sync" + "sync/atomic" +) + +type Reonce struct { + done uint32 + m sync.Mutex +} + +func (o *Reonce) Reset() { + o.m.Lock() + atomic.StoreUint32(&o.done, 0) + o.m.Unlock() +} + +func (o *Reonce) Do(f func()) { + if atomic.LoadUint32(&o.done) == 0 { + // Outlined slow-path to allow inlining of the fast-path. + o.doSlow(f) + } +} + +func (o *Reonce) doSlow(f func()) { + o.m.Lock() + defer o.m.Unlock() + if o.done == 0 { + defer atomic.StoreUint32(&o.done, 1) + f() + } +} diff --git a/pkg/util/reonce_test.go b/pkg/util/reonce_test.go new file mode 100644 index 000000000..647b3335a --- /dev/null +++ b/pkg/util/reonce_test.go @@ -0,0 +1,33 @@ +package util + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestReonce_DoAndReset(t *testing.T) { + var cnt = 0 + var reonce Reonce + go reonce.Do(func() { + t.Log("once #1") + time.Sleep(10 * time.Millisecond) + cnt++ + }) + + // make sure it's locked + time.Sleep(10 * time.Millisecond) + t.Logf("reset") + reonce.Reset() + + go reonce.Do(func() { + t.Log("once #2") + time.Sleep(10 * time.Millisecond) + cnt++ + }) + + time.Sleep(time.Second) + assert.Equal(t, 2, cnt) +} +