From c92c395f67bad544b76623b40a005e9da8164117 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 20 Aug 2024 17:07:29 +0800 Subject: [PATCH] twap: improve rate limiter syntax parser and support order update rate limiter in twap --- pkg/cmd/execute_order.go | 30 +++++++++++++++++++++++++- pkg/twap/v2/stream_executor.go | 26 +++++++++++------------ pkg/util/rate_limit.go | 39 ++++++++++++++++++++++++++++++++++ pkg/util/rate_limit_test.go | 29 +++++++++++++++++++++++++ 4 files changed, 109 insertions(+), 15 deletions(-) diff --git a/pkg/cmd/execute_order.go b/pkg/cmd/execute_order.go index 84eb63ec8..d082e1f73 100644 --- a/pkg/cmd/execute_order.go +++ b/pkg/cmd/execute_order.go @@ -16,6 +16,7 @@ import ( "github.com/c9s/bbgo/pkg/fixedpoint" _ "github.com/c9s/bbgo/pkg/twap" "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/twap/v2" ) @@ -27,6 +28,7 @@ func init() { executeOrderCmd.Flags().String("target-quantity", "", "target quantity") executeOrderCmd.Flags().String("slice-quantity", "", "slice quantity") executeOrderCmd.Flags().String("stop-price", "0", "stop price") + executeOrderCmd.Flags().String("order-update-rate-limit", "1s", "order update rate limit, syntax: 1+1/1m") executeOrderCmd.Flags().Duration("update-interval", time.Second*10, "order update time") executeOrderCmd.Flags().Duration("delay-interval", time.Second*3, "order delay time after filled") executeOrderCmd.Flags().Duration("deadline", 0, "deadline duration of the order execution, e.g. 1h") @@ -112,11 +114,21 @@ var executeOrderCmd = &cobra.Command{ return err } + orderUpdateRateLimitStr, err := cmd.Flags().GetString("order-update-rate-limit") + if err != nil { + return err + } + updateInterval, err := cmd.Flags().GetDuration("update-interval") if err != nil { return err } + delayInterval, err := cmd.Flags().GetDuration("delay-interval") + if err != nil { + return err + } + deadlineDuration, err := cmd.Flags().GetDuration("deadline") if err != nil { return err @@ -150,7 +162,23 @@ var executeOrderCmd = &cobra.Command{ } executor := twap.NewStreamExecutor(session.Exchange, symbol, market, side, targetQuantity, sliceQuantity) - executor.SetUpdateInterval(updateInterval) + + if updateInterval > 0 { + executor.SetUpdateInterval(updateInterval) + } + + if len(orderUpdateRateLimitStr) > 0 { + rateLimit, err := util.ParseRateLimitSyntax(orderUpdateRateLimitStr) + if err != nil { + return err + } + + executor.SetOrderUpdateRateLimit(rateLimit) + } + + if delayInterval > 0 { + executor.SetDelayInterval(delayInterval) + } if stopPrice.Sign() > 0 { executor.SetStopPrice(stopPrice) diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index 47e030e11..4cd21a921 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -8,6 +8,7 @@ import ( "time" "github.com/sirupsen/logrus" + "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/core" @@ -84,6 +85,8 @@ type FixedQuantityExecutor struct { executionCtx context.Context cancelExecution context.CancelFunc + orderUpdateRateLimit *rate.Limiter + userDataStreamCtx context.Context cancelUserDataStream context.CancelFunc @@ -242,6 +245,10 @@ func (e *FixedQuantityExecutor) cancelContextIfTargetQuantityFilled() bool { return false } +func (e *FixedQuantityExecutor) SetOrderUpdateRateLimit(rateLimit *rate.Limiter) { + e.orderUpdateRateLimit = rateLimit +} + func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error { gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second) defer gracefulCancel() @@ -249,8 +256,6 @@ func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error { } func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { - // updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 2) - defer func() { if err := e.cancelActiveOrders(ctx); err != nil { e.logger.WithError(err).Error("cancel active orders error") @@ -277,12 +282,6 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { } // orderBook.C sends a signal when any price or quantity changes in the order book - /* - if !updateLimiter.Allow() { - break - } - */ - if e.cancelContextIfTargetQuantityFilled() { return } @@ -299,12 +298,6 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { continue } - /* - if !updateLimiter.Allow() { - break - } - */ - if e.cancelContextIfTargetQuantityFilled() { return } @@ -317,6 +310,11 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { } func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { + if e.orderUpdateRateLimit != nil && !e.orderUpdateRateLimit.Allow() { + e.logger.Infof("rate limit exceeded, skip updating order") + return nil + } + book := e.orderBook.Copy() sideBook := book.SideBook(e.side) diff --git a/pkg/util/rate_limit.go b/pkg/util/rate_limit.go index 63c83bf82..4ac428da8 100644 --- a/pkg/util/rate_limit.go +++ b/pkg/util/rate_limit.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + log "github.com/sirupsen/logrus" "golang.org/x/time/rate" ) @@ -17,3 +18,41 @@ func NewValidLimiter(r rate.Limit, b int) (*rate.Limiter, error) { } return rate.NewLimiter(r, b), nil } + +// ParseRateLimitSyntax parses the rate limit syntax into the rate.Limiter parameters +// sample inputs: +// +// 2+1/5s (2 initial tokens, 1 token per 5 seconds) +// 5+3/1m (5 initial tokens, 3 tokens per minute) +// 3m (3 tokens per minute) +// 1/3m (1 token per 3 minutes) +func ParseRateLimitSyntax(desc string) (*rate.Limiter, error) { + var b = 0 + var r = 1.0 + var durStr string + var duration time.Duration + + _, err := fmt.Sscanf(desc, "%d+%f/%s", &b, &r, &durStr) + if err != nil { + b = 0 + r = 1.0 + _, err = fmt.Sscanf(desc, "%f/%s", &r, &durStr) + if err != nil { + durStr = desc + // need to reset + b = 1 + r = 1.0 + } + } + + duration, err = time.ParseDuration(durStr) + if err != nil { + return nil, fmt.Errorf("invalid rate limit syntax: b+n/duration, err: %v", err) + } + if r == 1.0 { + return NewValidLimiter(rate.Every(duration), b) + } + + log.Infof("%v %v", duration, r) + return NewValidLimiter(rate.Every(time.Duration(float64(duration)/r)), b) +} diff --git a/pkg/util/rate_limit_test.go b/pkg/util/rate_limit_test.go index 6efd54a2a..543f1c9f1 100644 --- a/pkg/util/rate_limit_test.go +++ b/pkg/util/rate_limit_test.go @@ -41,3 +41,32 @@ func TestShouldDelay(t *testing.T) { assert.True(t, ShouldDelay(limiter, minInterval) > 0) } } + +func TestParseRateLimitSyntax(t *testing.T) { + var tests = []struct { + desc string + expectedBurst int + expectedRate float64 + + wantErr bool + }{ + {"2+1/5s", 2, 1.0 / 5.0, false}, + {"5+1/3m", 5, 1 / 3 * 60.0, false}, + {"1m", 1, 1.0 / 60.0, false}, + } + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + limiter, err := ParseRateLimitSyntax(test.desc) + if test.wantErr { + assert.Error(t, err) + } else if assert.NoError(t, err) { + assert.NotNil(t, limiter) + burst := limiter.Burst() + assert.Equal(t, test.expectedBurst, burst) + + limit := limiter.Limit() + assert.InDeltaf(t, test.expectedRate, float64(limit), 0.01, "expected rate %f, got %f", test.expectedRate, limit) + } + }) + } +}