From b2b363ba42f9ff27724475f046bd8e4d9915b848 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 16 Nov 2024 14:58:05 +0800 Subject: [PATCH] depth: fix early snapshot id checking --- pkg/depth/buffer.go | 20 +++++++------------- pkg/depth/buffer_test.go | 3 +-- pkg/exchange/binance/stream.go | 3 +-- pkg/exchange/kucoin/stream.go | 3 +-- pkg/exchange/max/stream.go | 3 +-- pkg/strategy/xdepthmaker/strategy.go | 16 ++++++++++++---- pkg/types/price_volume_slice.go | 1 + 7 files changed, 24 insertions(+), 25 deletions(-) diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go index e3fbb6818..5c9c6204a 100644 --- a/pkg/depth/buffer.go +++ b/pkg/depth/buffer.go @@ -3,7 +3,6 @@ package depth import ( "fmt" "sync" - "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -41,13 +40,14 @@ type Buffer struct { updateTimeout time.Duration // bufferingPeriod is used to buffer the update message before we get the full depth - bufferingPeriod atomic.Value + bufferingPeriod time.Duration } -func NewBuffer(fetcher SnapshotFetcher) *Buffer { +func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer { return &Buffer{ - fetcher: fetcher, - resetC: make(chan struct{}, 1), + fetcher: fetcher, + resetC: make(chan struct{}, 1), + bufferingPeriod: bufferingPeriod, } } @@ -55,10 +55,6 @@ func (b *Buffer) SetUpdateTimeout(d time.Duration) { b.updateTimeout = d } -func (b *Buffer) SetBufferingPeriod(d time.Duration) { - b.bufferingPeriod.Store(d) -} - func (b *Buffer) resetSnapshot() { b.snapshot = nil b.finalUpdateID = 0 @@ -151,7 +147,7 @@ func (b *Buffer) fetchAndPush() error { if len(b.buffer) > 0 { // the snapshot is too early - if finalUpdateID < b.buffer[0].FirstUpdateID { + if finalUpdateID < b.buffer[0].FirstUpdateID-1 { b.resetSnapshot() b.emitReset() b.mu.Unlock() @@ -197,9 +193,7 @@ func (b *Buffer) fetchAndPush() error { func (b *Buffer) tryFetch() { for { - if period := b.bufferingPeriod.Load(); period != nil { - <-time.After(period.(time.Duration)) - } + <-time.After(b.bufferingPeriod) err := b.fetchAndPush() if err != nil { diff --git a/pkg/depth/buffer_test.go b/pkg/depth/buffer_test.go index 949db0f21..fe8023641 100644 --- a/pkg/depth/buffer_test.go +++ b/pkg/depth/buffer_test.go @@ -26,8 +26,7 @@ func TestDepthBuffer_ReadyState(t *testing.T) { {Price: itov(99), Volume: itov(1)}, }, }, 33, nil - }) - buf.SetBufferingPeriod(time.Millisecond * 5) + }, time.Millisecond*5) readyC := make(chan struct{}) buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) { diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index fec308c8d..27c7566e5 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -90,8 +90,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { log.Infof("fetching %s depth...", e.Symbol) return ex.QueryDepth(context.Background(), e.Symbol) - }) - f.SetBufferingPeriod(time.Second) + }, 3*time.Second) f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { stream.EmitBookSnapshot(snapshot) for _, u := range updates { diff --git a/pkg/exchange/kucoin/stream.go b/pkg/exchange/kucoin/stream.go index f7a938693..ddc5cb626 100644 --- a/pkg/exchange/kucoin/stream.go +++ b/pkg/exchange/kucoin/stream.go @@ -80,9 +80,8 @@ func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) { } else { f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { return s.exchange.QueryDepth(context.Background(), e.Symbol) - }) + }, 3*time.Second) s.depthBuffers[e.Symbol] = f - f.SetBufferingPeriod(time.Second) f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { if valid, err := snapshot.IsValid(); !valid { log.Errorf("depth snapshot is invalid, error: %v", err) diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index 9999b57c5..d29d46570 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -235,8 +235,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) { log.Infof("fetching %s depth with depth = %d...", e.Market, bookDepth) // the depth of websocket orderbook event is 50 by default, so we use 50 as limit here return ex.QueryDepth(context.Background(), e.Market, bookDepth) - }) - f.SetBufferingPeriod(3 * time.Second) + }, 3*time.Second) f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { s.EmitBookSnapshot(snapshot) for _, u := range updates { diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 10843d995..972a48df7 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -230,6 +230,8 @@ type Strategy struct { StopHedgeQuoteBalance fixedpoint.Value `json:"stopHedgeQuoteBalance"` StopHedgeBaseBalance fixedpoint.Value `json:"stopHedgeBaseBalance"` + SkipCleanUpOpenOrders bool `json:"skipCleanUpOpenOrders"` + // Quantity is used for fixed quantity of the first layer Quantity fixedpoint.Value `json:"quantity"` @@ -410,9 +412,11 @@ func (s *Strategy) quoteWorker(ctx context.Context) { fullReplenishTicker := time.NewTicker(timejitter.Milliseconds(s.FullReplenishInterval.Duration(), 200)) defer fullReplenishTicker.Stop() - // clean up the previous open orders - if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil { - log.WithError(err).Warnf("error cleaning up open orders") + // clean up the previous open orders before starting the quote worker + if !s.SkipCleanUpOpenOrders { + if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil { + log.WithError(err).Warnf("error cleaning up open orders") + } } s.updateQuote(ctx, 0) @@ -966,10 +970,14 @@ func (s *Strategy) generateMakerOrders( for _, side := range []types.SideType{types.SideTypeBuy, types.SideTypeSell} { sideBook := dupPricingBook.SideBook(side) if sideBook.Len() == 0 { - log.Warnf("orderbook %s side is empty", side) + s.logger.Warnf("orderbook %s side is empty", side) continue } + if sideBook.Len() < 5 { + s.logger.Warnf("order book %s side is too thin, size: %d, levels: %+v", side, sideBook.Len(), sideBook) + } + availableSideBalance, ok := availableBalances[side] if !ok { log.Warnf("no available balance for side %s side", side) diff --git a/pkg/types/price_volume_slice.go b/pkg/types/price_volume_slice.go index 7f03941f6..08255747e 100644 --- a/pkg/types/price_volume_slice.go +++ b/pkg/types/price_volume_slice.go @@ -92,6 +92,7 @@ func (slice PriceVolumeSlice) ElemOrLast(i int) (PriceVolume, bool) { return slice[i], true } +// IndexByQuoteVolumeDepth returns the index of the price volume slice by the required quote volume depth func (slice PriceVolumeSlice) IndexByQuoteVolumeDepth(requiredQuoteVolume fixedpoint.Value) int { var totalQuoteVolume = fixedpoint.Zero for x, pv := range slice {