depth: implement depth.Buffer

This commit is contained in:
c9s 2021-12-25 01:22:24 +08:00
parent d1c5e93e4f
commit b217a0dec8
5 changed files with 428 additions and 0 deletions

174
pkg/depth/buffer.go Normal file
View File

@ -0,0 +1,174 @@
package depth
import (
"fmt"
"sync"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
log "github.com/sirupsen/logrus"
)
type SnapshotFetcher func() (snapshot types.SliceOrderBook, finalUpdateID int64, err error)
type Update struct {
FirstUpdateID, FinalUpdateID int64
// Object is the update object
Object types.SliceOrderBook
}
//go:generate callbackgen -type Buffer
type Buffer struct {
Symbol string
buffer []Update
finalUpdateID int64
fetcher SnapshotFetcher
snapshot *types.SliceOrderBook
resetCallbacks []func()
readyCallbacks []func(snapshot types.SliceOrderBook, updates []Update)
pushCallbacks []func(update Update)
resetC chan struct{}
mu sync.Mutex
once util.Reonce
}
func NewDepthBuffer(symbol string, fetcher SnapshotFetcher) *Buffer {
return &Buffer{
Symbol: symbol,
fetcher: fetcher,
resetC: make(chan struct{}, 1),
}
}
func (b *Buffer) resetSnapshot() {
b.snapshot = nil
b.finalUpdateID = 0
b.EmitReset()
}
func (b *Buffer) emitReset() {
select {
case b.resetC <- struct{}{}:
default:
}
}
// AddUpdate adds the update to the buffer or push the update to the subscriber
func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
finalUpdateID := firstUpdateID
if len(finalArgs) > 0 {
finalUpdateID = finalArgs[0]
}
u := Update{
FirstUpdateID: firstUpdateID,
FinalUpdateID: finalUpdateID,
Object: o,
}
// we lock here because there might be 2+ calls to the AddUpdate method
// we don't want to reset sync.Once 2 times here
b.mu.Lock()
select {
case <-b.resetC:
log.Warnf("received depth reset signal, resetting...")
// if the once goroutine is still running, overwriting this once might cause "unlock of unlocked mutex" panic.
b.once.Reset()
default:
}
// if the snapshot is set to nil, we need to buffer the message
if b.snapshot == nil {
b.buffer = append(b.buffer, u)
b.once.Do(func() {
go b.tryFetch()
})
b.mu.Unlock()
return nil
}
// if there is a missing update, we should reset the snapshot and re-fetch the snapshot
if u.FirstUpdateID > b.finalUpdateID+1 {
// emitReset will reset the once outside the mutex lock section
b.resetSnapshot()
b.buffer = nil
b.buffer = append(b.buffer, u)
b.emitReset()
b.mu.Unlock()
return fmt.Errorf("there is a missing update between %d and %d", u.FirstUpdateID, b.finalUpdateID+1)
}
b.finalUpdateID = u.FinalUpdateID
b.EmitPush(u)
b.mu.Unlock()
return nil
}
func (b *Buffer) fetchAndPush() error {
log.Info("fetching depth snapshot...")
book, finalUpdateID, err := b.fetcher()
if err != nil {
return err
}
log.Infof("fetched depth snapshot, final update id %d", finalUpdateID)
b.mu.Lock()
if len(b.buffer) > 0 {
// the snapshot is too early
if finalUpdateID < b.buffer[0].FirstUpdateID {
b.resetSnapshot()
b.emitReset()
b.mu.Unlock()
return fmt.Errorf("depth final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID)
}
}
var pushUpdates []Update
for _, u := range b.buffer {
if u.FirstUpdateID > finalUpdateID+1 {
b.resetSnapshot()
b.emitReset()
b.mu.Unlock()
return fmt.Errorf("the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
}
if u.FirstUpdateID < finalUpdateID+1 {
continue
}
pushUpdates = append(pushUpdates, u)
// update the final update id to the correct final update id
finalUpdateID = u.FinalUpdateID
}
// clean the buffer since we have filtered out the buffer we want
b.buffer = nil
// set the final update ID so that we will know if there is an update missing
b.finalUpdateID = finalUpdateID
// set the snapshot
b.snapshot = &book
b.mu.Unlock()
b.EmitReady(book, pushUpdates)
return nil
}
func (b *Buffer) tryFetch() {
for {
err := b.fetchAndPush()
if err != nil {
log.WithError(err).Errorf("snapshot fetch failed")
continue
}
break
}
}

View File

@ -0,0 +1,37 @@
// Code generated by "callbackgen -type Buffer"; DO NOT EDIT.
package depth
import (
"github.com/c9s/bbgo/pkg/types"
)
func (b *Buffer) OnReset(cb func()) {
b.resetCallbacks = append(b.resetCallbacks, cb)
}
func (b *Buffer) EmitReset() {
for _, cb := range b.resetCallbacks {
cb()
}
}
func (b *Buffer) OnReady(cb func(snapshot types.SliceOrderBook, updates []Update)) {
b.readyCallbacks = append(b.readyCallbacks, cb)
}
func (b *Buffer) EmitReady(snapshot types.SliceOrderBook, updates []Update) {
for _, cb := range b.readyCallbacks {
cb(snapshot, updates)
}
}
func (b *Buffer) OnPush(cb func(update Update)) {
b.pushCallbacks = append(b.pushCallbacks, cb)
}
func (b *Buffer) EmitPush(update Update) {
for _, cb := range b.pushCallbacks {
cb(update)
}
}

151
pkg/depth/buffer_test.go Normal file
View File

@ -0,0 +1,151 @@
package depth
import (
"context"
"testing"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
)
func TestDepthBuffer_ReadyState(t *testing.T) {
buf := NewDepthBuffer("", func() (book types.SliceOrderBook, finalID int64, err error) {
return types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: 100, Volume: 1},
},
Asks: types.PriceVolumeSlice{
{Price: 99, Volume: 1},
},
}, 33, nil
})
readyC := make(chan struct{})
buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) {
assert.Greater(t, len(updates), 33)
close(readyC)
})
var updateID int64 = 1
for ; updateID < 100; updateID++ {
buf.AddUpdate(
types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: 100, Volume: fixedpoint.Value(updateID)},
},
Asks: types.PriceVolumeSlice{
{Price: 99, Volume: fixedpoint.Value(updateID)},
},
}, updateID)
}
<-readyC
}
func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) {
// snapshot starts from 30,
// the first ready event should have a snapshot(30) and updates (31~50)
var snapshotFinalID int64 = 0
buf := NewDepthBuffer("", func() (types.SliceOrderBook, int64, error) {
snapshotFinalID += 30
return types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: 100, Volume: 1},
},
Asks: types.PriceVolumeSlice{
{Price: 99, Volume: 1},
},
}, snapshotFinalID, nil
})
resetC := make(chan struct{}, 1)
buf.OnReset(func() {
resetC <- struct{}{}
})
var updateID int64 = 10
for ; updateID < 100; updateID++ {
if updateID == 50 {
updateID += 5
}
buf.AddUpdate(types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: 100, Volume: fixedpoint.Value(updateID)},
},
Asks: types.PriceVolumeSlice{
{Price: 99, Volume: fixedpoint.Value(updateID)},
},
}, updateID)
}
<-resetC
}
func TestDepthBuffer_ConcurrentRun(t *testing.T) {
var snapshotFinalID int64 = 0
buf := NewDepthBuffer("", func() (types.SliceOrderBook, int64, error) {
snapshotFinalID += 30
time.Sleep(10 * time.Millisecond)
return types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: 100, Volume: 1},
},
Asks: types.PriceVolumeSlice{
{Price: 99, Volume: 1},
},
}, snapshotFinalID, nil
})
readyCnt := 0
resetCnt := 0
pushCnt := 0
buf.OnPush(func(update Update) {
pushCnt++
})
buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) {
readyCnt++
assert.Greater(t, len(updates), 0)
})
buf.OnReset(func() {
resetCnt++
})
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
var updateID int64 = 10
for {
select {
case <-ctx.Done():
assert.Greater(t, readyCnt, 1)
assert.Greater(t, resetCnt, 1)
assert.Greater(t, pushCnt, 1)
return
case <-ticker.C:
updateID++
if updateID%100 == 0 {
updateID++
}
buf.AddUpdate(types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: 100, Volume: fixedpoint.Value(updateID)},
},
Asks: types.PriceVolumeSlice{
{Price: 99, Volume: fixedpoint.Value(updateID)},
},
}, updateID)
}
}
}

33
pkg/util/reonce.go Normal file
View File

@ -0,0 +1,33 @@
package util
import (
"sync"
"sync/atomic"
)
type Reonce struct {
done uint32
m sync.Mutex
}
func (o *Reonce) Reset() {
o.m.Lock()
atomic.StoreUint32(&o.done, 0)
o.m.Unlock()
}
func (o *Reonce) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
func (o *Reonce) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

33
pkg/util/reonce_test.go Normal file
View File

@ -0,0 +1,33 @@
package util
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestReonce_DoAndReset(t *testing.T) {
var cnt = 0
var reonce Reonce
go reonce.Do(func() {
t.Log("once #1")
time.Sleep(10 * time.Millisecond)
cnt++
})
// make sure it's locked
time.Sleep(10 * time.Millisecond)
t.Logf("reset")
reonce.Reset()
go reonce.Do(func() {
t.Log("once #2")
time.Sleep(10 * time.Millisecond)
cnt++
})
time.Sleep(time.Second)
assert.Equal(t, 2, cnt)
}