depth: fix early snapshot id checking

This commit is contained in:
c9s 2024-11-16 14:58:05 +08:00
parent 40d0b59dfa
commit b2b363ba42
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
7 changed files with 24 additions and 25 deletions

View File

@ -3,7 +3,6 @@ package depth
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -41,13 +40,14 @@ type Buffer struct {
updateTimeout time.Duration updateTimeout time.Duration
// bufferingPeriod is used to buffer the update message before we get the full depth // bufferingPeriod is used to buffer the update message before we get the full depth
bufferingPeriod atomic.Value bufferingPeriod time.Duration
} }
func NewBuffer(fetcher SnapshotFetcher) *Buffer { func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer {
return &Buffer{ return &Buffer{
fetcher: fetcher, fetcher: fetcher,
resetC: make(chan struct{}, 1), resetC: make(chan struct{}, 1),
bufferingPeriod: bufferingPeriod,
} }
} }
@ -55,10 +55,6 @@ func (b *Buffer) SetUpdateTimeout(d time.Duration) {
b.updateTimeout = d b.updateTimeout = d
} }
func (b *Buffer) SetBufferingPeriod(d time.Duration) {
b.bufferingPeriod.Store(d)
}
func (b *Buffer) resetSnapshot() { func (b *Buffer) resetSnapshot() {
b.snapshot = nil b.snapshot = nil
b.finalUpdateID = 0 b.finalUpdateID = 0
@ -151,7 +147,7 @@ func (b *Buffer) fetchAndPush() error {
if len(b.buffer) > 0 { if len(b.buffer) > 0 {
// the snapshot is too early // the snapshot is too early
if finalUpdateID < b.buffer[0].FirstUpdateID { if finalUpdateID < b.buffer[0].FirstUpdateID-1 {
b.resetSnapshot() b.resetSnapshot()
b.emitReset() b.emitReset()
b.mu.Unlock() b.mu.Unlock()
@ -197,9 +193,7 @@ func (b *Buffer) fetchAndPush() error {
func (b *Buffer) tryFetch() { func (b *Buffer) tryFetch() {
for { for {
if period := b.bufferingPeriod.Load(); period != nil { <-time.After(b.bufferingPeriod)
<-time.After(period.(time.Duration))
}
err := b.fetchAndPush() err := b.fetchAndPush()
if err != nil { if err != nil {

View File

@ -26,8 +26,7 @@ func TestDepthBuffer_ReadyState(t *testing.T) {
{Price: itov(99), Volume: itov(1)}, {Price: itov(99), Volume: itov(1)},
}, },
}, 33, nil }, 33, nil
}) }, time.Millisecond*5)
buf.SetBufferingPeriod(time.Millisecond * 5)
readyC := make(chan struct{}) readyC := make(chan struct{})
buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) { buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) {

View File

@ -90,8 +90,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
log.Infof("fetching %s depth...", e.Symbol) log.Infof("fetching %s depth...", e.Symbol)
return ex.QueryDepth(context.Background(), e.Symbol) return ex.QueryDepth(context.Background(), e.Symbol)
}) }, 3*time.Second)
f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
stream.EmitBookSnapshot(snapshot) stream.EmitBookSnapshot(snapshot)
for _, u := range updates { for _, u := range updates {

View File

@ -80,9 +80,8 @@ func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) {
} else { } else {
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
return s.exchange.QueryDepth(context.Background(), e.Symbol) return s.exchange.QueryDepth(context.Background(), e.Symbol)
}) }, 3*time.Second)
s.depthBuffers[e.Symbol] = f s.depthBuffers[e.Symbol] = f
f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid { if valid, err := snapshot.IsValid(); !valid {
log.Errorf("depth snapshot is invalid, error: %v", err) log.Errorf("depth snapshot is invalid, error: %v", err)

View File

@ -235,8 +235,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
log.Infof("fetching %s depth with depth = %d...", e.Market, bookDepth) log.Infof("fetching %s depth with depth = %d...", e.Market, bookDepth)
// the depth of websocket orderbook event is 50 by default, so we use 50 as limit here // the depth of websocket orderbook event is 50 by default, so we use 50 as limit here
return ex.QueryDepth(context.Background(), e.Market, bookDepth) return ex.QueryDepth(context.Background(), e.Market, bookDepth)
}) }, 3*time.Second)
f.SetBufferingPeriod(3 * time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
s.EmitBookSnapshot(snapshot) s.EmitBookSnapshot(snapshot)
for _, u := range updates { for _, u := range updates {

View File

@ -230,6 +230,8 @@ type Strategy struct {
StopHedgeQuoteBalance fixedpoint.Value `json:"stopHedgeQuoteBalance"` StopHedgeQuoteBalance fixedpoint.Value `json:"stopHedgeQuoteBalance"`
StopHedgeBaseBalance fixedpoint.Value `json:"stopHedgeBaseBalance"` StopHedgeBaseBalance fixedpoint.Value `json:"stopHedgeBaseBalance"`
SkipCleanUpOpenOrders bool `json:"skipCleanUpOpenOrders"`
// Quantity is used for fixed quantity of the first layer // Quantity is used for fixed quantity of the first layer
Quantity fixedpoint.Value `json:"quantity"` Quantity fixedpoint.Value `json:"quantity"`
@ -410,9 +412,11 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
fullReplenishTicker := time.NewTicker(timejitter.Milliseconds(s.FullReplenishInterval.Duration(), 200)) fullReplenishTicker := time.NewTicker(timejitter.Milliseconds(s.FullReplenishInterval.Duration(), 200))
defer fullReplenishTicker.Stop() defer fullReplenishTicker.Stop()
// clean up the previous open orders // clean up the previous open orders before starting the quote worker
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil { if !s.SkipCleanUpOpenOrders {
log.WithError(err).Warnf("error cleaning up open orders") if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Warnf("error cleaning up open orders")
}
} }
s.updateQuote(ctx, 0) s.updateQuote(ctx, 0)
@ -966,10 +970,14 @@ func (s *Strategy) generateMakerOrders(
for _, side := range []types.SideType{types.SideTypeBuy, types.SideTypeSell} { for _, side := range []types.SideType{types.SideTypeBuy, types.SideTypeSell} {
sideBook := dupPricingBook.SideBook(side) sideBook := dupPricingBook.SideBook(side)
if sideBook.Len() == 0 { if sideBook.Len() == 0 {
log.Warnf("orderbook %s side is empty", side) s.logger.Warnf("orderbook %s side is empty", side)
continue continue
} }
if sideBook.Len() < 5 {
s.logger.Warnf("order book %s side is too thin, size: %d, levels: %+v", side, sideBook.Len(), sideBook)
}
availableSideBalance, ok := availableBalances[side] availableSideBalance, ok := availableBalances[side]
if !ok { if !ok {
log.Warnf("no available balance for side %s side", side) log.Warnf("no available balance for side %s side", side)

View File

@ -92,6 +92,7 @@ func (slice PriceVolumeSlice) ElemOrLast(i int) (PriceVolume, bool) {
return slice[i], true return slice[i], true
} }
// IndexByQuoteVolumeDepth returns the index of the price volume slice by the required quote volume depth
func (slice PriceVolumeSlice) IndexByQuoteVolumeDepth(requiredQuoteVolume fixedpoint.Value) int { func (slice PriceVolumeSlice) IndexByQuoteVolumeDepth(requiredQuoteVolume fixedpoint.Value) int {
var totalQuoteVolume = fixedpoint.Zero var totalQuoteVolume = fixedpoint.Zero
for x, pv := range slice { for x, pv := range slice {