all: move jitter helpr to a single package

This commit is contained in:
c9s 2024-10-28 17:28:56 +08:00
parent 48bb7a280b
commit eae2d63ac1
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
11 changed files with 78 additions and 35 deletions

View File

@ -14,8 +14,8 @@ import (
"github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/backoff" "github.com/c9s/bbgo/pkg/util/backoff"
"github.com/c9s/bbgo/pkg/util/timejitter"
) )
var ErrExceededSubmitOrderRetryLimit = errors.New("exceeded submit order retry limit") var ErrExceededSubmitOrderRetryLimit = errors.New("exceeded submit order retry limit")
@ -146,7 +146,7 @@ func (e *GeneralOrderExecutor) updateMarginAssetMaxBorrowable(
func (e *GeneralOrderExecutor) marginAssetMaxBorrowableUpdater( func (e *GeneralOrderExecutor) marginAssetMaxBorrowableUpdater(
ctx context.Context, interval time.Duration, marginService types.MarginBorrowRepayService, market types.Market, ctx context.Context, interval time.Duration, marginService types.MarginBorrowRepayService, market types.Market,
) { ) {
t := time.NewTicker(util.MillisecondsJitter(interval, 500)) t := time.NewTicker(timejitter.Milliseconds(interval, 500))
defer t.Stop() defer t.Stop()
e.updateMarginAssetMaxBorrowable(ctx, marginService, market) e.updateMarginAssetMaxBorrowable(ctx, marginService, market)

View File

@ -7,7 +7,7 @@ import (
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util/timejitter"
) )
func (s *Strategy) syncPeriodically(ctx context.Context) { func (s *Strategy) syncPeriodically(ctx context.Context) {
@ -18,7 +18,7 @@ func (s *Strategy) syncPeriodically(ctx context.Context) {
defer syncPersistenceTicker.Stop() defer syncPersistenceTicker.Stop()
// sync active orders // sync active orders
syncActiveOrdersTicker := time.NewTicker(util.MillisecondsJitter(10*time.Minute, 5*60*1000)) syncActiveOrdersTicker := time.NewTicker(timejitter.Milliseconds(10*time.Minute, 5*60*1000))
defer syncActiveOrdersTicker.Stop() defer syncActiveOrdersTicker.Stop()
for { for {

View File

@ -14,7 +14,7 @@ import (
"github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util/timejitter"
) )
var syncWindow = -3 * time.Minute var syncWindow = -3 * time.Minute
@ -48,7 +48,7 @@ func (s *Strategy) recoverPeriodically(ctx context.Context) {
return return
} }
interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000) interval := timejitter.Milliseconds(25*time.Minute, 10*60*1000)
s.logger.Infof("[Recover] interval: %s", interval) s.logger.Infof("[Recover] interval: %s", interval)
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -135,6 +136,22 @@ func (s *Strategy) Defaults() error {
return nil return nil
} }
func (s *Strategy) liquidityWorker(ctx context.Context, interval types.Interval) {
ticker := time.NewTicker(interval.Duration())
defer ticker.Stop()
s.placeLiquidityOrders(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.placeLiquidityOrders(ctx)
}
}
}
func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
if s.ProfitFixerBundle.ProfitFixerConfig != nil { if s.ProfitFixerBundle.ProfitFixerConfig != nil {
market, _ := session.Market(s.Symbol) market, _ := session.Market(s.Symbol)
@ -182,20 +199,29 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
return err return err
} }
session.UserDataStream.OnStart(func() {
s.placeLiquidityOrders(ctx)
})
session.MarketDataStream.OnKLineClosed(func(k types.KLine) { session.MarketDataStream.OnKLineClosed(func(k types.KLine) {
if k.Interval == s.AdjustmentUpdateInterval { if k.Interval == s.AdjustmentUpdateInterval {
s.placeAdjustmentOrders(ctx) s.placeAdjustmentOrders(ctx)
} }
if k.Interval == s.LiquidityUpdateInterval {
s.placeLiquidityOrders(ctx)
}
}) })
if intervalProvider, ok := session.Exchange.(types.CustomIntervalProvider); ok {
if intervalProvider.IsSupportedInterval(s.LiquidityUpdateInterval) {
session.UserDataStream.OnAuth(func() {
s.placeLiquidityOrders(ctx)
})
session.MarketDataStream.OnKLineClosed(func(k types.KLine) {
if k.Interval == s.LiquidityUpdateInterval {
s.placeLiquidityOrders(ctx)
}
})
} else {
session.UserDataStream.OnStart(func() {
go s.liquidityWorker(ctx, s.LiquidityUpdateInterval)
})
}
}
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()

View File

@ -14,8 +14,8 @@ import (
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/templateutil" "github.com/c9s/bbgo/pkg/util/templateutil"
"github.com/c9s/bbgo/pkg/util/timejitter"
) )
const ID = "xbalance" const ID = "xbalance"
@ -283,7 +283,9 @@ func (s *Strategy) checkBalance(ctx context.Context, sessions map[string]*bbgo.E
} }
} }
func (s *Strategy) findHighestBalanceLevelSession(sessions map[string]*bbgo.ExchangeSession, requiredAmount fixedpoint.Value) (*bbgo.ExchangeSession, types.Balance, error) { func (s *Strategy) findHighestBalanceLevelSession(
sessions map[string]*bbgo.ExchangeSession, requiredAmount fixedpoint.Value,
) (*bbgo.ExchangeSession, types.Balance, error) {
var balance types.Balance var balance types.Balance
var maxBalanceLevel = fixedpoint.Zero var maxBalanceLevel = fixedpoint.Zero
var maxBalanceSession *bbgo.ExchangeSession = nil var maxBalanceSession *bbgo.ExchangeSession = nil
@ -350,7 +352,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
} }
go func() { go func() {
ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000)) ticker := time.NewTicker(timejitter.Milliseconds(s.Interval.Duration(), 1000))
defer ticker.Stop() defer ticker.Stop()
for { for {

View File

@ -21,7 +21,7 @@ import (
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/strategy/xmaker" "github.com/c9s/bbgo/pkg/strategy/xmaker"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util/timejitter"
"github.com/c9s/bbgo/pkg/util/tradingutil" "github.com/c9s/bbgo/pkg/util/tradingutil"
) )
@ -409,10 +409,10 @@ func (s *Strategy) Defaults() error {
} }
func (s *Strategy) quoteWorker(ctx context.Context) { func (s *Strategy) quoteWorker(ctx context.Context) {
updateTicker := time.NewTicker(util.MillisecondsJitter(s.FastLayerUpdateInterval.Duration(), 200)) updateTicker := time.NewTicker(timejitter.Milliseconds(s.FastLayerUpdateInterval.Duration(), 200))
defer updateTicker.Stop() defer updateTicker.Stop()
fullReplenishTicker := time.NewTicker(util.MillisecondsJitter(s.FullReplenishInterval.Duration(), 200)) fullReplenishTicker := time.NewTicker(timejitter.Milliseconds(s.FullReplenishInterval.Duration(), 200))
defer fullReplenishTicker.Stop() defer fullReplenishTicker.Stop()
// clean up the previous open orders // clean up the previous open orders
@ -456,7 +456,7 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
} }
func (s *Strategy) hedgeWorker(ctx context.Context) { func (s *Strategy) hedgeWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) ticker := time.NewTicker(timejitter.Milliseconds(s.HedgeInterval.Duration(), 200))
defer ticker.Stop() defer ticker.Stop()
for { for {

View File

@ -14,7 +14,7 @@ import (
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util/timejitter"
) )
const ID = "xgap" const ID = "xgap"
@ -169,7 +169,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
go func() { go func() {
ticker := time.NewTicker( ticker := time.NewTicker(
util.MillisecondsJitter(s.UpdateInterval.Duration(), 1000), timejitter.Milliseconds(s.UpdateInterval.Duration(), 1000),
) )
defer ticker.Stop() defer ticker.Stop()
@ -187,7 +187,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
} }
// < 10 seconds jitter sleep // < 10 seconds jitter sleep
delay := util.MillisecondsJitter(s.UpdateInterval.Duration(), 10*1000) delay := timejitter.Milliseconds(s.UpdateInterval.Duration(), 10*1000)
if delay < s.UpdateInterval.Duration() { if delay < s.UpdateInterval.Duration() {
time.Sleep(delay) time.Sleep(delay)
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/timejitter"
) )
var defaultMargin = fixedpoint.NewFromFloat(0.003) var defaultMargin = fixedpoint.NewFromFloat(0.003)
@ -1426,7 +1427,7 @@ func (s *Strategy) Validate() error {
} }
func (s *Strategy) quoteWorker(ctx context.Context) { func (s *Strategy) quoteWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200)) ticker := time.NewTicker(timejitter.Milliseconds(s.UpdateInterval.Duration(), 200))
defer ticker.Stop() defer ticker.Stop()
defer func() { defer func() {
@ -1500,7 +1501,7 @@ func (s *Strategy) houseCleanWorker(ctx context.Context) {
} }
func (s *Strategy) hedgeWorker(ctx context.Context) { func (s *Strategy) hedgeWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) ticker := time.NewTicker(timejitter.Milliseconds(s.HedgeInterval.Duration(), 200))
defer ticker.Stop() defer ticker.Stop()
profitChanged := false profitChanged := false

View File

@ -9,8 +9,8 @@ import (
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/templateutil" "github.com/c9s/bbgo/pkg/util/templateutil"
"github.com/c9s/bbgo/pkg/util/timejitter"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -173,7 +173,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
if s.Interval != "" { if s.Interval != "" {
go func() { go func() {
ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000)) ticker := time.NewTicker(timejitter.Milliseconds(s.Interval.Duration(), 1000))
defer ticker.Stop() defer ticker.Stop()
for { for {

View File

@ -1,16 +1,9 @@
package util package util
import ( import (
"math/rand"
"time" "time"
) )
func MillisecondsJitter(d time.Duration, jitterInMilliseconds int) time.Duration {
n := rand.Intn(jitterInMilliseconds)
return d + time.Duration(n)*time.Millisecond
}
func UnixMilli() int64 { func UnixMilli() int64 {
return time.Now().UnixNano() / int64(time.Millisecond) return time.Now().UnixNano() / int64(time.Millisecond)
} }

View File

@ -0,0 +1,21 @@
package timejitter
import (
"math/rand"
"time"
)
func Milliseconds(d time.Duration, jitterInMilliseconds int) time.Duration {
n := rand.Intn(jitterInMilliseconds)
return d + time.Duration(n)*time.Millisecond
}
func Seconds(d time.Duration, jitterInSeconds int) time.Duration {
n := rand.Intn(jitterInSeconds)
return d + time.Duration(n)*time.Second
}
func Microseconds(d time.Duration, us int) time.Duration {
n := rand.Intn(us)
return d + time.Duration(n)*time.Microsecond
}