diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index 2046b2ffa..822b6f223 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -14,8 +14,8 @@ import ( "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util/backoff" + "github.com/c9s/bbgo/pkg/util/timejitter" ) var ErrExceededSubmitOrderRetryLimit = errors.New("exceeded submit order retry limit") @@ -146,7 +146,7 @@ func (e *GeneralOrderExecutor) updateMarginAssetMaxBorrowable( func (e *GeneralOrderExecutor) marginAssetMaxBorrowableUpdater( ctx context.Context, interval time.Duration, marginService types.MarginBorrowRepayService, market types.Market, ) { - t := time.NewTicker(util.MillisecondsJitter(interval, 500)) + t := time.NewTicker(timejitter.Milliseconds(interval, 500)) defer t.Stop() e.updateMarginAssetMaxBorrowable(ctx, marginService, market) diff --git a/pkg/strategy/dca2/sync.go b/pkg/strategy/dca2/sync.go index 0b7b6eb52..a0f8a41ee 100644 --- a/pkg/strategy/dca2/sync.go +++ b/pkg/strategy/dca2/sync.go @@ -7,7 +7,7 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/strategy/common" - "github.com/c9s/bbgo/pkg/util" + "github.com/c9s/bbgo/pkg/util/timejitter" ) func (s *Strategy) syncPeriodically(ctx context.Context) { @@ -18,7 +18,7 @@ func (s *Strategy) syncPeriodically(ctx context.Context) { defer syncPersistenceTicker.Stop() // sync active orders - syncActiveOrdersTicker := time.NewTicker(util.MillisecondsJitter(10*time.Minute, 5*60*1000)) + syncActiveOrdersTicker := time.NewTicker(timejitter.Milliseconds(10*time.Minute, 5*60*1000)) defer syncActiveOrdersTicker.Stop() for { diff --git a/pkg/strategy/grid2/recover.go b/pkg/strategy/grid2/recover.go index e3a3a7fa1..412605823 100644 --- a/pkg/strategy/grid2/recover.go +++ b/pkg/strategy/grid2/recover.go @@ -14,7 +14,7 @@ import ( "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" + "github.com/c9s/bbgo/pkg/util/timejitter" ) var syncWindow = -3 * time.Minute @@ -48,7 +48,7 @@ func (s *Strategy) recoverPeriodically(ctx context.Context) { return } - interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000) + interval := timejitter.Milliseconds(25*time.Minute, 10*60*1000) s.logger.Infof("[Recover] interval: %s", interval) ticker := time.NewTicker(interval) defer ticker.Stop() diff --git a/pkg/strategy/liquiditymaker/strategy.go b/pkg/strategy/liquiditymaker/strategy.go index a34cd66a0..a269fbafa 100644 --- a/pkg/strategy/liquiditymaker/strategy.go +++ b/pkg/strategy/liquiditymaker/strategy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -135,6 +136,22 @@ func (s *Strategy) Defaults() error { return nil } +func (s *Strategy) liquidityWorker(ctx context.Context, interval types.Interval) { + ticker := time.NewTicker(interval.Duration()) + defer ticker.Stop() + + s.placeLiquidityOrders(ctx) + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + s.placeLiquidityOrders(ctx) + } + } +} + func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { if s.ProfitFixerBundle.ProfitFixerConfig != nil { market, _ := session.Market(s.Symbol) @@ -182,20 +199,29 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. return err } - session.UserDataStream.OnStart(func() { - s.placeLiquidityOrders(ctx) - }) - session.MarketDataStream.OnKLineClosed(func(k types.KLine) { if k.Interval == s.AdjustmentUpdateInterval { s.placeAdjustmentOrders(ctx) } - - if k.Interval == s.LiquidityUpdateInterval { - s.placeLiquidityOrders(ctx) - } }) + if intervalProvider, ok := session.Exchange.(types.CustomIntervalProvider); ok { + if intervalProvider.IsSupportedInterval(s.LiquidityUpdateInterval) { + session.UserDataStream.OnAuth(func() { + s.placeLiquidityOrders(ctx) + }) + session.MarketDataStream.OnKLineClosed(func(k types.KLine) { + if k.Interval == s.LiquidityUpdateInterval { + s.placeLiquidityOrders(ctx) + } + }) + } else { + session.UserDataStream.OnStart(func() { + go s.liquidityWorker(ctx, s.LiquidityUpdateInterval) + }) + } + } + bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() diff --git a/pkg/strategy/xbalance/strategy.go b/pkg/strategy/xbalance/strategy.go index 8499dd428..b71bbfa8e 100644 --- a/pkg/strategy/xbalance/strategy.go +++ b/pkg/strategy/xbalance/strategy.go @@ -14,8 +14,8 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util/templateutil" + "github.com/c9s/bbgo/pkg/util/timejitter" ) const ID = "xbalance" @@ -283,7 +283,9 @@ func (s *Strategy) checkBalance(ctx context.Context, sessions map[string]*bbgo.E } } -func (s *Strategy) findHighestBalanceLevelSession(sessions map[string]*bbgo.ExchangeSession, requiredAmount fixedpoint.Value) (*bbgo.ExchangeSession, types.Balance, error) { +func (s *Strategy) findHighestBalanceLevelSession( + sessions map[string]*bbgo.ExchangeSession, requiredAmount fixedpoint.Value, +) (*bbgo.ExchangeSession, types.Balance, error) { var balance types.Balance var maxBalanceLevel = fixedpoint.Zero var maxBalanceSession *bbgo.ExchangeSession = nil @@ -350,7 +352,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se } go func() { - ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000)) + ticker := time.NewTicker(timejitter.Milliseconds(s.Interval.Duration(), 1000)) defer ticker.Stop() for { diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index adc7c7814..b075b2edb 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -21,7 +21,7 @@ import ( "github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/xmaker" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" + "github.com/c9s/bbgo/pkg/util/timejitter" "github.com/c9s/bbgo/pkg/util/tradingutil" ) @@ -409,10 +409,10 @@ func (s *Strategy) Defaults() error { } func (s *Strategy) quoteWorker(ctx context.Context) { - updateTicker := time.NewTicker(util.MillisecondsJitter(s.FastLayerUpdateInterval.Duration(), 200)) + updateTicker := time.NewTicker(timejitter.Milliseconds(s.FastLayerUpdateInterval.Duration(), 200)) defer updateTicker.Stop() - fullReplenishTicker := time.NewTicker(util.MillisecondsJitter(s.FullReplenishInterval.Duration(), 200)) + fullReplenishTicker := time.NewTicker(timejitter.Milliseconds(s.FullReplenishInterval.Duration(), 200)) defer fullReplenishTicker.Stop() // clean up the previous open orders @@ -456,7 +456,7 @@ func (s *Strategy) quoteWorker(ctx context.Context) { } func (s *Strategy) hedgeWorker(ctx context.Context) { - ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) + ticker := time.NewTicker(timejitter.Milliseconds(s.HedgeInterval.Duration(), 200)) defer ticker.Stop() for { diff --git a/pkg/strategy/xgap/strategy.go b/pkg/strategy/xgap/strategy.go index ca812551f..bf38ed267 100644 --- a/pkg/strategy/xgap/strategy.go +++ b/pkg/strategy/xgap/strategy.go @@ -14,7 +14,7 @@ import ( "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" + "github.com/c9s/bbgo/pkg/util/timejitter" ) const ID = "xgap" @@ -169,7 +169,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se go func() { ticker := time.NewTicker( - util.MillisecondsJitter(s.UpdateInterval.Duration(), 1000), + timejitter.Milliseconds(s.UpdateInterval.Duration(), 1000), ) defer ticker.Stop() @@ -187,7 +187,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se } // < 10 seconds jitter sleep - delay := util.MillisecondsJitter(s.UpdateInterval.Duration(), 10*1000) + delay := timejitter.Milliseconds(s.UpdateInterval.Duration(), 10*1000) if delay < s.UpdateInterval.Duration() { time.Sleep(delay) } diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 8ff2ce4bd..ad4c0ecbc 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -22,6 +22,7 @@ import ( "github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" + "github.com/c9s/bbgo/pkg/util/timejitter" ) var defaultMargin = fixedpoint.NewFromFloat(0.003) @@ -1426,7 +1427,7 @@ func (s *Strategy) Validate() error { } func (s *Strategy) quoteWorker(ctx context.Context) { - ticker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200)) + ticker := time.NewTicker(timejitter.Milliseconds(s.UpdateInterval.Duration(), 200)) defer ticker.Stop() defer func() { @@ -1500,7 +1501,7 @@ func (s *Strategy) houseCleanWorker(ctx context.Context) { } func (s *Strategy) hedgeWorker(ctx context.Context) { - ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) + ticker := time.NewTicker(timejitter.Milliseconds(s.HedgeInterval.Duration(), 200)) defer ticker.Stop() profitChanged := false diff --git a/pkg/strategy/xnav/strategy.go b/pkg/strategy/xnav/strategy.go index a517dbfca..51bb84089 100644 --- a/pkg/strategy/xnav/strategy.go +++ b/pkg/strategy/xnav/strategy.go @@ -9,8 +9,8 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util/templateutil" + "github.com/c9s/bbgo/pkg/util/timejitter" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" @@ -173,7 +173,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se if s.Interval != "" { go func() { - ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000)) + ticker := time.NewTicker(timejitter.Milliseconds(s.Interval.Duration(), 1000)) defer ticker.Stop() for { diff --git a/pkg/util/time.go b/pkg/util/time.go index df1ac8d88..1d644f652 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -1,16 +1,9 @@ package util import ( - "math/rand" "time" ) -func MillisecondsJitter(d time.Duration, jitterInMilliseconds int) time.Duration { - n := rand.Intn(jitterInMilliseconds) - return d + time.Duration(n)*time.Millisecond -} - func UnixMilli() int64 { return time.Now().UnixNano() / int64(time.Millisecond) } - diff --git a/pkg/util/timejitter/milliseconds.go b/pkg/util/timejitter/milliseconds.go new file mode 100644 index 000000000..4ea27600f --- /dev/null +++ b/pkg/util/timejitter/milliseconds.go @@ -0,0 +1,21 @@ +package timejitter + +import ( + "math/rand" + "time" +) + +func Milliseconds(d time.Duration, jitterInMilliseconds int) time.Duration { + n := rand.Intn(jitterInMilliseconds) + return d + time.Duration(n)*time.Millisecond +} + +func Seconds(d time.Duration, jitterInSeconds int) time.Duration { + n := rand.Intn(jitterInSeconds) + return d + time.Duration(n)*time.Second +} + +func Microseconds(d time.Duration, us int) time.Duration { + n := rand.Intn(us) + return d + time.Duration(n)*time.Microsecond +}