From dcff850c64e8b4ca7b160b1997b691b3b70da41c Mon Sep 17 00:00:00 2001 From: chiahung Date: Mon, 6 Nov 2023 18:52:01 +0800 Subject: [PATCH 1/5] FEATURE: add ttl for position/grid2.profit_stats persistence --- pkg/strategy/grid2/profit_stats.go | 11 +++++++++++ pkg/strategy/grid2/strategy.go | 5 +++++ pkg/types/position.go | 11 +++++++++++ 3 files changed, 27 insertions(+) diff --git a/pkg/strategy/grid2/profit_stats.go b/pkg/strategy/grid2/profit_stats.go index cd8367c23..c39602937 100644 --- a/pkg/strategy/grid2/profit_stats.go +++ b/pkg/strategy/grid2/profit_stats.go @@ -24,6 +24,9 @@ type GridProfitStats struct { Market types.Market `json:"market,omitempty"` Since *time.Time `json:"since,omitempty"` InitialOrderID uint64 `json:"initialOrderID"` + + // ttl is the ttl to keep in persistence + ttl time.Duration } func newGridProfitStats(market types.Market) *GridProfitStats { @@ -40,6 +43,14 @@ func newGridProfitStats(market types.Market) *GridProfitStats { } } +func (s *GridProfitStats) SetTTL(ttl time.Duration) { + s.ttl = ttl +} + +func (s *GridProfitStats) Expiration() time.Duration { + return s.ttl +} + func (s *GridProfitStats) AddTrade(trade types.Trade) { if s.TotalFee == nil { s.TotalFee = make(map[string]fixedpoint.Value) diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index 0c8248d93..bc361a72b 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -177,6 +177,7 @@ type Strategy struct { GridProfitStats *GridProfitStats `persistence:"grid_profit_stats"` Position *types.Position `persistence:"position"` + PersistenceTTL types.Duration `json:"persistenceTTL"` // ExchangeSession is an injection field ExchangeSession *bbgo.ExchangeSession @@ -1835,13 +1836,17 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. s.ProfitSpread = s.Market.TruncatePrice(s.ProfitSpread) } + s.logger.Infof("ttl: %s", s.PersistenceTTL.Duration()) + if s.GridProfitStats == nil { s.GridProfitStats = newGridProfitStats(s.Market) } + s.GridProfitStats.SetTTL(s.PersistenceTTL.Duration()) if s.Position == nil { s.Position = types.NewPositionFromMarket(s.Market) } + s.Position.SetTTL(s.PersistenceTTL.Duration()) // initialize and register prometheus metrics if s.PrometheusLabels != nil { diff --git a/pkg/types/position.go b/pkg/types/position.go index 983c1c551..4b649a48f 100644 --- a/pkg/types/position.go +++ b/pkg/types/position.go @@ -65,6 +65,17 @@ type Position struct { // Modify position callbacks modifyCallbacks []func(baseQty fixedpoint.Value, quoteQty fixedpoint.Value, price fixedpoint.Value) + + // ttl is the ttl to keep in persistence + ttl time.Duration +} + +func (s *Position) SetTTL(ttl time.Duration) { + s.ttl = ttl +} + +func (s *Position) Expiration() time.Duration { + return s.ttl } func (p *Position) CsvHeader() []string { From c8becbe4f5f558afbcdbaa9e7896bdcf9e051ab7 Mon Sep 17 00:00:00 2001 From: chiahung Date: Tue, 7 Nov 2023 10:56:19 +0800 Subject: [PATCH 2/5] bbgo.sync when syncActiveOrders --- pkg/strategy/grid2/active_order_recover.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go index cfdaeac80..55b48fcdf 100644 --- a/pkg/strategy/grid2/active_order_recover.go +++ b/pkg/strategy/grid2/active_order_recover.go @@ -30,7 +30,7 @@ func (s *Strategy) initializeRecoverC() bool { if s.recoverC == nil { s.logger.Info("initializing recover channel") - s.recoverC = make(chan struct{}, 1) + s.recoverC = make(chan struct{}, 10) } else { s.logger.Info("recover channel is already initialized, trigger active orders recover") isInitialize = true @@ -66,22 +66,26 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { exchange: s.session.Exchange, } + var lastRecoverTime time.Time + for { select { - case <-ctx.Done(): return case <-ticker.C: - if err := syncActiveOrders(ctx, opts); err != nil { - log.WithError(err).Errorf("unable to sync active orders") - } - + s.recoverC <- struct{}{} + bbgo.Sync(ctx, s) case <-s.recoverC: - if err := syncActiveOrders(ctx, opts); err != nil { - log.WithError(err).Errorf("unable to sync active orders") + if !time.Now().After(lastRecoverTime.Add(10 * time.Minute)) { + continue } + if err := syncActiveOrders(ctx, opts); err != nil { + log.WithError(err).Errorf("unable to sync active orders") + } else { + lastRecoverTime = time.Now() + } } } } From 7de49155eb515e5c0a56e1896a88f78ce0a39d30 Mon Sep 17 00:00:00 2001 From: chiahung Date: Tue, 7 Nov 2023 13:30:58 +0800 Subject: [PATCH 3/5] fix --- pkg/strategy/grid2/profit_stats.go | 3 +++ pkg/strategy/grid2/strategy.go | 2 +- pkg/types/position.go | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/strategy/grid2/profit_stats.go b/pkg/strategy/grid2/profit_stats.go index c39602937..a770b67fe 100644 --- a/pkg/strategy/grid2/profit_stats.go +++ b/pkg/strategy/grid2/profit_stats.go @@ -44,6 +44,9 @@ func newGridProfitStats(market types.Market) *GridProfitStats { } func (s *GridProfitStats) SetTTL(ttl time.Duration) { + if ttl.Nanoseconds() <= 0 { + return + } s.ttl = ttl } diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index bc361a72b..620d91ce1 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -1836,7 +1836,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. s.ProfitSpread = s.Market.TruncatePrice(s.ProfitSpread) } - s.logger.Infof("ttl: %s", s.PersistenceTTL.Duration()) + s.logger.Infof("persistence ttl: %s", s.PersistenceTTL.Duration()) if s.GridProfitStats == nil { s.GridProfitStats = newGridProfitStats(s.Market) diff --git a/pkg/types/position.go b/pkg/types/position.go index 4b649a48f..589fa2a2b 100644 --- a/pkg/types/position.go +++ b/pkg/types/position.go @@ -71,6 +71,9 @@ type Position struct { } func (s *Position) SetTTL(ttl time.Duration) { + if ttl.Nanoseconds() <= 0 { + return + } s.ttl = ttl } From e6fc0067477d61a56da611e684547f2a2242e013 Mon Sep 17 00:00:00 2001 From: chiahung Date: Tue, 7 Nov 2023 15:21:48 +0800 Subject: [PATCH 4/5] recoverC back to size 1 --- pkg/strategy/grid2/active_order_recover.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go index 55b48fcdf..5c64488b4 100644 --- a/pkg/strategy/grid2/active_order_recover.go +++ b/pkg/strategy/grid2/active_order_recover.go @@ -30,7 +30,7 @@ func (s *Strategy) initializeRecoverC() bool { if s.recoverC == nil { s.logger.Info("initializing recover channel") - s.recoverC = make(chan struct{}, 10) + s.recoverC = make(chan struct{}, 1) } else { s.logger.Info("recover channel is already initialized, trigger active orders recover") isInitialize = true From 52d4f50c883fb945a1968a9d374895ce371f28c8 Mon Sep 17 00:00:00 2001 From: chiahung Date: Wed, 8 Nov 2023 11:15:06 +0800 Subject: [PATCH 5/5] remove sync every ticker --- pkg/strategy/grid2/active_order_recover.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go index 5c64488b4..c25d45bc4 100644 --- a/pkg/strategy/grid2/active_order_recover.go +++ b/pkg/strategy/grid2/active_order_recover.go @@ -75,7 +75,6 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { case <-ticker.C: s.recoverC <- struct{}{} - bbgo.Sync(ctx, s) case <-s.recoverC: if !time.Now().After(lastRecoverTime.Add(10 * time.Minute)) { continue