Merge pull request #1796 from c9s/c9s/liqmaker/metrics-and-tests

IMPROVE: [liqmaker] improve interval checking and tickers
This commit is contained in:
c9s 2024-10-28 17:43:31 +08:00 committed by GitHub
commit 0c1f8f8c07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 83 additions and 36 deletions

View File

@ -14,8 +14,8 @@ import (
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/backoff"
"github.com/c9s/bbgo/pkg/util/timejitter"
)
var ErrExceededSubmitOrderRetryLimit = errors.New("exceeded submit order retry limit")
@ -146,7 +146,7 @@ func (e *GeneralOrderExecutor) updateMarginAssetMaxBorrowable(
func (e *GeneralOrderExecutor) marginAssetMaxBorrowableUpdater(
ctx context.Context, interval time.Duration, marginService types.MarginBorrowRepayService, market types.Market,
) {
t := time.NewTicker(util.MillisecondsJitter(interval, 500))
t := time.NewTicker(timejitter.Milliseconds(interval, 500))
defer t.Stop()
e.updateMarginAssetMaxBorrowable(ctx, marginService, market)

View File

@ -7,7 +7,7 @@ import (
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/timejitter"
)
func (s *Strategy) syncPeriodically(ctx context.Context) {
@ -18,7 +18,7 @@ func (s *Strategy) syncPeriodically(ctx context.Context) {
defer syncPersistenceTicker.Stop()
// sync active orders
syncActiveOrdersTicker := time.NewTicker(util.MillisecondsJitter(10*time.Minute, 5*60*1000))
syncActiveOrdersTicker := time.NewTicker(timejitter.Milliseconds(10*time.Minute, 5*60*1000))
defer syncActiveOrdersTicker.Stop()
for {

View File

@ -14,7 +14,7 @@ import (
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/timejitter"
)
var syncWindow = -3 * time.Minute
@ -48,7 +48,7 @@ func (s *Strategy) recoverPeriodically(ctx context.Context) {
return
}
interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000)
interval := timejitter.Milliseconds(25*time.Minute, 10*60*1000)
s.logger.Infof("[Recover] interval: %s", interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
@ -135,6 +136,22 @@ func (s *Strategy) Defaults() error {
return nil
}
func (s *Strategy) liquidityWorker(ctx context.Context, interval types.Interval) {
ticker := time.NewTicker(interval.Duration())
defer ticker.Stop()
s.placeLiquidityOrders(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.placeLiquidityOrders(ctx)
}
}
}
func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
if s.ProfitFixerBundle.ProfitFixerConfig != nil {
market, _ := session.Market(s.Symbol)
@ -182,19 +199,28 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
return err
}
session.UserDataStream.OnStart(func() {
s.placeLiquidityOrders(ctx)
})
session.MarketDataStream.OnKLineClosed(func(k types.KLine) {
if k.Interval == s.AdjustmentUpdateInterval {
s.placeAdjustmentOrders(ctx)
}
})
if intervalProvider, ok := session.Exchange.(types.CustomIntervalProvider); ok {
if intervalProvider.IsSupportedInterval(s.LiquidityUpdateInterval) {
session.UserDataStream.OnAuth(func() {
s.placeLiquidityOrders(ctx)
})
session.MarketDataStream.OnKLineClosed(func(k types.KLine) {
if k.Interval == s.LiquidityUpdateInterval {
s.placeLiquidityOrders(ctx)
}
})
} else {
session.UserDataStream.OnStart(func() {
go s.liquidityWorker(ctx, s.LiquidityUpdateInterval)
})
}
}
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
@ -366,7 +392,7 @@ func (s *Strategy) placeLiquidityOrders(ctx context.Context) {
if s.stopEMA != nil {
emaPrice := fixedpoint.NewFromFloat(s.stopEMA.Last(0))
if midPrice.Compare(emaPrice) > 0 {
s.logger.Infof("mid price %f < stop ema price %f, turning off ask orders", midPrice.Float64(), emaPrice.Float64())
s.logger.Infof("mid price %f > stop ema price %f, turning off bid orders", midPrice.Float64(), emaPrice.Float64())
placeBid = false
}
@ -419,6 +445,10 @@ func (s *Strategy) placeLiquidityOrders(ctx context.Context) {
}
}
s.logger.Infof("place bid: %v, place ask: %v", placeBid, placeAsk)
s.logger.Infof("bid liquidity amount %f, ask liquidity amount %f", s.BidLiquidityAmount.Float64(), s.AskLiquidityAmount.Float64())
var bidExposureInUsd = fixedpoint.Zero
var askExposureInUsd = fixedpoint.Zero
var orderForms []types.SubmitOrder

View File

@ -14,8 +14,8 @@ import (
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/templateutil"
"github.com/c9s/bbgo/pkg/util/timejitter"
)
const ID = "xbalance"
@ -283,7 +283,9 @@ func (s *Strategy) checkBalance(ctx context.Context, sessions map[string]*bbgo.E
}
}
func (s *Strategy) findHighestBalanceLevelSession(sessions map[string]*bbgo.ExchangeSession, requiredAmount fixedpoint.Value) (*bbgo.ExchangeSession, types.Balance, error) {
func (s *Strategy) findHighestBalanceLevelSession(
sessions map[string]*bbgo.ExchangeSession, requiredAmount fixedpoint.Value,
) (*bbgo.ExchangeSession, types.Balance, error) {
var balance types.Balance
var maxBalanceLevel = fixedpoint.Zero
var maxBalanceSession *bbgo.ExchangeSession = nil
@ -350,7 +352,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
}
go func() {
ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000))
ticker := time.NewTicker(timejitter.Milliseconds(s.Interval.Duration(), 1000))
defer ticker.Stop()
for {

View File

@ -21,7 +21,7 @@ import (
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/strategy/xmaker"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/timejitter"
"github.com/c9s/bbgo/pkg/util/tradingutil"
)
@ -409,10 +409,10 @@ func (s *Strategy) Defaults() error {
}
func (s *Strategy) quoteWorker(ctx context.Context) {
updateTicker := time.NewTicker(util.MillisecondsJitter(s.FastLayerUpdateInterval.Duration(), 200))
updateTicker := time.NewTicker(timejitter.Milliseconds(s.FastLayerUpdateInterval.Duration(), 200))
defer updateTicker.Stop()
fullReplenishTicker := time.NewTicker(util.MillisecondsJitter(s.FullReplenishInterval.Duration(), 200))
fullReplenishTicker := time.NewTicker(timejitter.Milliseconds(s.FullReplenishInterval.Duration(), 200))
defer fullReplenishTicker.Stop()
// clean up the previous open orders
@ -456,7 +456,7 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
}
func (s *Strategy) hedgeWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
ticker := time.NewTicker(timejitter.Milliseconds(s.HedgeInterval.Duration(), 200))
defer ticker.Stop()
for {

View File

@ -14,7 +14,7 @@ import (
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/timejitter"
)
const ID = "xgap"
@ -169,7 +169,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
go func() {
ticker := time.NewTicker(
util.MillisecondsJitter(s.UpdateInterval.Duration(), 1000),
timejitter.Milliseconds(s.UpdateInterval.Duration(), 1000),
)
defer ticker.Stop()
@ -187,7 +187,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
}
// < 10 seconds jitter sleep
delay := util.MillisecondsJitter(s.UpdateInterval.Duration(), 10*1000)
delay := timejitter.Milliseconds(s.UpdateInterval.Duration(), 10*1000)
if delay < s.UpdateInterval.Duration() {
time.Sleep(delay)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/timejitter"
)
var defaultMargin = fixedpoint.NewFromFloat(0.003)
@ -1426,7 +1427,7 @@ func (s *Strategy) Validate() error {
}
func (s *Strategy) quoteWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200))
ticker := time.NewTicker(timejitter.Milliseconds(s.UpdateInterval.Duration(), 200))
defer ticker.Stop()
defer func() {
@ -1500,7 +1501,7 @@ func (s *Strategy) houseCleanWorker(ctx context.Context) {
}
func (s *Strategy) hedgeWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
ticker := time.NewTicker(timejitter.Milliseconds(s.HedgeInterval.Duration(), 200))
defer ticker.Stop()
profitChanged := false

View File

@ -9,8 +9,8 @@ import (
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/templateutil"
"github.com/c9s/bbgo/pkg/util/timejitter"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
@ -173,7 +173,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
if s.Interval != "" {
go func() {
ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000))
ticker := time.NewTicker(timejitter.Milliseconds(s.Interval.Duration(), 1000))
defer ticker.Stop()
for {

View File

@ -1,16 +1,9 @@
package util
import (
"math/rand"
"time"
)
func MillisecondsJitter(d time.Duration, jitterInMilliseconds int) time.Duration {
n := rand.Intn(jitterInMilliseconds)
return d + time.Duration(n)*time.Millisecond
}
func UnixMilli() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}

View File

@ -0,0 +1,21 @@
package timejitter
import (
"math/rand"
"time"
)
func Milliseconds(d time.Duration, jitterInMilliseconds int) time.Duration {
n := rand.Intn(jitterInMilliseconds)
return d + time.Duration(n)*time.Millisecond
}
func Seconds(d time.Duration, jitterInSeconds int) time.Duration {
n := rand.Intn(jitterInSeconds)
return d + time.Duration(n)*time.Second
}
func Microseconds(d time.Duration, us int) time.Duration {
n := rand.Intn(us)
return d + time.Duration(n)*time.Microsecond
}