From 8025d05eac324d97519bb5538805a653d3b625fd Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Dec 2023 18:18:34 +0800 Subject: [PATCH 1/5] core: log trades pruning --- pkg/core/tradestore.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/core/tradestore.go b/pkg/core/tradestore.go index 485f820dc..d54d804cb 100644 --- a/pkg/core/tradestore.go +++ b/pkg/core/tradestore.go @@ -4,6 +4,8 @@ import ( "sync" "time" + log "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/types" ) @@ -112,14 +114,16 @@ func (s *TradeStore) touchLastTradeTime(trade types.Trade) { } } -// pruneExpiredTrades prunes trades that are older than the expiry time -// see TradeExpiryTime -func (s *TradeStore) pruneExpiredTrades(curTime time.Time) { +// Prune prunes trades that are older than the expiry time +// see TradeExpiryTime (24 hours) +func (s *TradeStore) Prune(curTime time.Time) { s.Lock() defer s.Unlock() var trades = make(map[uint64]types.Trade) var cutOffTime = curTime.Add(-TradeExpiryTime) + + log.Infof("pruning expired trades, cutoff time = %s", cutOffTime.String()) for _, trade := range s.trades { if trade.Time.Before(cutOffTime) { continue @@ -129,15 +133,13 @@ func (s *TradeStore) pruneExpiredTrades(curTime time.Time) { } s.trades = trades -} -func (s *TradeStore) Prune(curTime time.Time) { - s.pruneExpiredTrades(curTime) + log.Infof("trade pruning done, size: %d", len(trades)) } func (s *TradeStore) isCoolTrade(trade types.Trade) bool { - // if the time of last trade is over 1 hour, we call it's cool trade - return s.lastTradeTime != (time.Time{}) && time.Time(trade.Time).Sub(s.lastTradeTime) > time.Hour + // if the duration between the current trade and the last trade is over 1 hour, we call it "cool trade" + return !s.lastTradeTime.IsZero() && time.Time(trade.Time).Sub(s.lastTradeTime) > time.Hour } func (s *TradeStore) BindStream(stream types.Stream) { From 97c39921bd95b02b312c72a176249b1b7c7e9855 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Dec 2023 18:18:59 +0800 Subject: [PATCH 2/5] core: adjust TradeExpiryTime to 3 hour --- pkg/core/tradestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/core/tradestore.go b/pkg/core/tradestore.go index d54d804cb..9f1cc6af0 100644 --- a/pkg/core/tradestore.go +++ b/pkg/core/tradestore.go @@ -9,7 +9,7 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -const TradeExpiryTime = 24 * time.Hour +const TradeExpiryTime = 3 * time.Hour const PruneTriggerNumOfTrades = 10_000 type TradeStore struct { @@ -115,7 +115,7 @@ func (s *TradeStore) touchLastTradeTime(trade types.Trade) { } // Prune prunes trades that are older than the expiry time -// see TradeExpiryTime (24 hours) +// see TradeExpiryTime (3 hours) func (s *TradeStore) Prune(curTime time.Time) { s.Lock() defer s.Unlock() From 685f332495180ec610506838ae61b682e2d2208c Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Dec 2023 18:21:22 +0800 Subject: [PATCH 3/5] core: enable trade store's trade pruning in NewTradeCollector --- pkg/core/tradecollector.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/core/tradecollector.go b/pkg/core/tradecollector.go index 07c92b3dd..e6e5d515f 100644 --- a/pkg/core/tradecollector.go +++ b/pkg/core/tradecollector.go @@ -34,12 +34,15 @@ type TradeCollector struct { } func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { + tradeStore := NewTradeStore() + tradeStore.EnablePrune = true + return &TradeCollector{ Symbol: symbol, orderSig: sigchan.New(1), tradeC: make(chan types.Trade, 100), - tradeStore: NewTradeStore(), + tradeStore: tradeStore, doneTrades: make(map[types.TradeKey]struct{}), position: position, orderStore: orderStore, @@ -88,7 +91,9 @@ func (c *TradeCollector) Emit() { c.orderSig.Emit() } -func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error { +func (c *TradeCollector) Recover( + ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time, +) error { logrus.Debugf("recovering %s trades...", symbol) trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ From 21c8593c45d400a2edfc55690eda6e505387ebee Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Dec 2023 18:24:33 +0800 Subject: [PATCH 4/5] core: add exceededMaximumTradeStoreSize check --- pkg/core/tradestore.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/core/tradestore.go b/pkg/core/tradestore.go index 9f1cc6af0..340e3df15 100644 --- a/pkg/core/tradestore.go +++ b/pkg/core/tradestore.go @@ -10,7 +10,7 @@ import ( ) const TradeExpiryTime = 3 * time.Hour -const PruneTriggerNumOfTrades = 10_000 +const MaximumTradeStoreSize = 1_000 type TradeStore struct { // any created trades for tracking trades @@ -142,6 +142,10 @@ func (s *TradeStore) isCoolTrade(trade types.Trade) bool { return !s.lastTradeTime.IsZero() && time.Time(trade.Time).Sub(s.lastTradeTime) > time.Hour } +func (s *TradeStore) exceededMaximumTradeStoreSize() bool { + return len(s.trades) > MaximumTradeStoreSize +} + func (s *TradeStore) BindStream(stream types.Stream) { stream.OnTradeUpdate(func(trade types.Trade) { s.Add(trade) @@ -149,7 +153,7 @@ func (s *TradeStore) BindStream(stream types.Stream) { if s.EnablePrune { stream.OnTradeUpdate(func(trade types.Trade) { - if s.isCoolTrade(trade) { + if s.isCoolTrade(trade) || s.exceededMaximumTradeStoreSize() { s.Prune(time.Time(trade.Time)) } }) From 4e26b9d2adf2de88d78c38b1a7b4d20d45f73d1c Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Dec 2023 18:25:09 +0800 Subject: [PATCH 5/5] core: pull out cool trade period to a constant --- pkg/core/tradestore.go | 3 ++- pkg/core/tradestore_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/core/tradestore.go b/pkg/core/tradestore.go index 340e3df15..f1acdb244 100644 --- a/pkg/core/tradestore.go +++ b/pkg/core/tradestore.go @@ -10,6 +10,7 @@ import ( ) const TradeExpiryTime = 3 * time.Hour +const CoolTradePeriod = 1 * time.Hour const MaximumTradeStoreSize = 1_000 type TradeStore struct { @@ -139,7 +140,7 @@ func (s *TradeStore) Prune(curTime time.Time) { func (s *TradeStore) isCoolTrade(trade types.Trade) bool { // if the duration between the current trade and the last trade is over 1 hour, we call it "cool trade" - return !s.lastTradeTime.IsZero() && time.Time(trade.Time).Sub(s.lastTradeTime) > time.Hour + return !s.lastTradeTime.IsZero() && time.Time(trade.Time).Sub(s.lastTradeTime) > CoolTradePeriod } func (s *TradeStore) exceededMaximumTradeStoreSize() bool { diff --git a/pkg/core/tradestore_test.go b/pkg/core/tradestore_test.go index 431572ab4..c820a2a02 100644 --- a/pkg/core/tradestore_test.go +++ b/pkg/core/tradestore_test.go @@ -30,7 +30,7 @@ func TestTradeStore_Prune(t *testing.T) { store := NewTradeStore() store.Add( types.Trade{ID: 1, Time: types.Time(now.Add(-25 * time.Hour))}, - types.Trade{ID: 2, Time: types.Time(now.Add(-23 * time.Hour))}, + types.Trade{ID: 2, Time: types.Time(now.Add(-2 * time.Hour))}, types.Trade{ID: 3, Time: types.Time(now.Add(-2 * time.Minute))}, types.Trade{ID: 4, Time: types.Time(now.Add(-1 * time.Minute))}, )