mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 14:55:16 +00:00
twap: improve rate limiter syntax parser and support order update rate limiter in twap
This commit is contained in:
parent
48029f95cc
commit
c92c395f67
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
_ "github.com/c9s/bbgo/pkg/twap"
|
_ "github.com/c9s/bbgo/pkg/twap"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
"github.com/c9s/bbgo/pkg/util"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/twap/v2"
|
"github.com/c9s/bbgo/pkg/twap/v2"
|
||||||
)
|
)
|
||||||
|
@ -27,6 +28,7 @@ func init() {
|
||||||
executeOrderCmd.Flags().String("target-quantity", "", "target quantity")
|
executeOrderCmd.Flags().String("target-quantity", "", "target quantity")
|
||||||
executeOrderCmd.Flags().String("slice-quantity", "", "slice quantity")
|
executeOrderCmd.Flags().String("slice-quantity", "", "slice quantity")
|
||||||
executeOrderCmd.Flags().String("stop-price", "0", "stop price")
|
executeOrderCmd.Flags().String("stop-price", "0", "stop price")
|
||||||
|
executeOrderCmd.Flags().String("order-update-rate-limit", "1s", "order update rate limit, syntax: 1+1/1m")
|
||||||
executeOrderCmd.Flags().Duration("update-interval", time.Second*10, "order update time")
|
executeOrderCmd.Flags().Duration("update-interval", time.Second*10, "order update time")
|
||||||
executeOrderCmd.Flags().Duration("delay-interval", time.Second*3, "order delay time after filled")
|
executeOrderCmd.Flags().Duration("delay-interval", time.Second*3, "order delay time after filled")
|
||||||
executeOrderCmd.Flags().Duration("deadline", 0, "deadline duration of the order execution, e.g. 1h")
|
executeOrderCmd.Flags().Duration("deadline", 0, "deadline duration of the order execution, e.g. 1h")
|
||||||
|
@ -112,11 +114,21 @@ var executeOrderCmd = &cobra.Command{
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
orderUpdateRateLimitStr, err := cmd.Flags().GetString("order-update-rate-limit")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
updateInterval, err := cmd.Flags().GetDuration("update-interval")
|
updateInterval, err := cmd.Flags().GetDuration("update-interval")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
delayInterval, err := cmd.Flags().GetDuration("delay-interval")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
deadlineDuration, err := cmd.Flags().GetDuration("deadline")
|
deadlineDuration, err := cmd.Flags().GetDuration("deadline")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -150,7 +162,23 @@ var executeOrderCmd = &cobra.Command{
|
||||||
}
|
}
|
||||||
|
|
||||||
executor := twap.NewStreamExecutor(session.Exchange, symbol, market, side, targetQuantity, sliceQuantity)
|
executor := twap.NewStreamExecutor(session.Exchange, symbol, market, side, targetQuantity, sliceQuantity)
|
||||||
|
|
||||||
|
if updateInterval > 0 {
|
||||||
executor.SetUpdateInterval(updateInterval)
|
executor.SetUpdateInterval(updateInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(orderUpdateRateLimitStr) > 0 {
|
||||||
|
rateLimit, err := util.ParseRateLimitSyntax(orderUpdateRateLimitStr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
executor.SetOrderUpdateRateLimit(rateLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
if delayInterval > 0 {
|
||||||
|
executor.SetDelayInterval(delayInterval)
|
||||||
|
}
|
||||||
|
|
||||||
if stopPrice.Sign() > 0 {
|
if stopPrice.Sign() > 0 {
|
||||||
executor.SetStopPrice(stopPrice)
|
executor.SetStopPrice(stopPrice)
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/bbgo"
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
"github.com/c9s/bbgo/pkg/core"
|
"github.com/c9s/bbgo/pkg/core"
|
||||||
|
@ -84,6 +85,8 @@ type FixedQuantityExecutor struct {
|
||||||
executionCtx context.Context
|
executionCtx context.Context
|
||||||
cancelExecution context.CancelFunc
|
cancelExecution context.CancelFunc
|
||||||
|
|
||||||
|
orderUpdateRateLimit *rate.Limiter
|
||||||
|
|
||||||
userDataStreamCtx context.Context
|
userDataStreamCtx context.Context
|
||||||
cancelUserDataStream context.CancelFunc
|
cancelUserDataStream context.CancelFunc
|
||||||
|
|
||||||
|
@ -242,6 +245,10 @@ func (e *FixedQuantityExecutor) cancelContextIfTargetQuantityFilled() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *FixedQuantityExecutor) SetOrderUpdateRateLimit(rateLimit *rate.Limiter) {
|
||||||
|
e.orderUpdateRateLimit = rateLimit
|
||||||
|
}
|
||||||
|
|
||||||
func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error {
|
func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error {
|
||||||
gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second)
|
gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||||
defer gracefulCancel()
|
defer gracefulCancel()
|
||||||
|
@ -249,8 +256,6 @@ func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
||||||
// updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 2)
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := e.cancelActiveOrders(ctx); err != nil {
|
if err := e.cancelActiveOrders(ctx); err != nil {
|
||||||
e.logger.WithError(err).Error("cancel active orders error")
|
e.logger.WithError(err).Error("cancel active orders error")
|
||||||
|
@ -277,12 +282,6 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// orderBook.C sends a signal when any price or quantity changes in the order book
|
// orderBook.C sends a signal when any price or quantity changes in the order book
|
||||||
/*
|
|
||||||
if !updateLimiter.Allow() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
if e.cancelContextIfTargetQuantityFilled() {
|
if e.cancelContextIfTargetQuantityFilled() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -299,12 +298,6 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
if !updateLimiter.Allow() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
if e.cancelContextIfTargetQuantityFilled() {
|
if e.cancelContextIfTargetQuantityFilled() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -317,6 +310,11 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
|
func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
|
||||||
|
if e.orderUpdateRateLimit != nil && !e.orderUpdateRateLimit.Allow() {
|
||||||
|
e.logger.Infof("rate limit exceeded, skip updating order")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
book := e.orderBook.Copy()
|
book := e.orderBook.Copy()
|
||||||
sideBook := book.SideBook(e.side)
|
sideBook := book.SideBook(e.side)
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,3 +18,41 @@ func NewValidLimiter(r rate.Limit, b int) (*rate.Limiter, error) {
|
||||||
}
|
}
|
||||||
return rate.NewLimiter(r, b), nil
|
return rate.NewLimiter(r, b), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseRateLimitSyntax parses the rate limit syntax into the rate.Limiter parameters
|
||||||
|
// sample inputs:
|
||||||
|
//
|
||||||
|
// 2+1/5s (2 initial tokens, 1 token per 5 seconds)
|
||||||
|
// 5+3/1m (5 initial tokens, 3 tokens per minute)
|
||||||
|
// 3m (3 tokens per minute)
|
||||||
|
// 1/3m (1 token per 3 minutes)
|
||||||
|
func ParseRateLimitSyntax(desc string) (*rate.Limiter, error) {
|
||||||
|
var b = 0
|
||||||
|
var r = 1.0
|
||||||
|
var durStr string
|
||||||
|
var duration time.Duration
|
||||||
|
|
||||||
|
_, err := fmt.Sscanf(desc, "%d+%f/%s", &b, &r, &durStr)
|
||||||
|
if err != nil {
|
||||||
|
b = 0
|
||||||
|
r = 1.0
|
||||||
|
_, err = fmt.Sscanf(desc, "%f/%s", &r, &durStr)
|
||||||
|
if err != nil {
|
||||||
|
durStr = desc
|
||||||
|
// need to reset
|
||||||
|
b = 1
|
||||||
|
r = 1.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
duration, err = time.ParseDuration(durStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid rate limit syntax: b+n/duration, err: %v", err)
|
||||||
|
}
|
||||||
|
if r == 1.0 {
|
||||||
|
return NewValidLimiter(rate.Every(duration), b)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("%v %v", duration, r)
|
||||||
|
return NewValidLimiter(rate.Every(time.Duration(float64(duration)/r)), b)
|
||||||
|
}
|
||||||
|
|
|
@ -41,3 +41,32 @@ func TestShouldDelay(t *testing.T) {
|
||||||
assert.True(t, ShouldDelay(limiter, minInterval) > 0)
|
assert.True(t, ShouldDelay(limiter, minInterval) > 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseRateLimitSyntax(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
desc string
|
||||||
|
expectedBurst int
|
||||||
|
expectedRate float64
|
||||||
|
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{"2+1/5s", 2, 1.0 / 5.0, false},
|
||||||
|
{"5+1/3m", 5, 1 / 3 * 60.0, false},
|
||||||
|
{"1m", 1, 1.0 / 60.0, false},
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
|
limiter, err := ParseRateLimitSyntax(test.desc)
|
||||||
|
if test.wantErr {
|
||||||
|
assert.Error(t, err)
|
||||||
|
} else if assert.NoError(t, err) {
|
||||||
|
assert.NotNil(t, limiter)
|
||||||
|
burst := limiter.Burst()
|
||||||
|
assert.Equal(t, test.expectedBurst, burst)
|
||||||
|
|
||||||
|
limit := limiter.Limit()
|
||||||
|
assert.InDeltaf(t, test.expectedRate, float64(limit), 0.01, "expected rate %f, got %f", test.expectedRate, limit)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user