diff --git a/pkg/core/tradecollector.go b/pkg/core/tradecollector.go index 07c92b3dd..e6e5d515f 100644 --- a/pkg/core/tradecollector.go +++ b/pkg/core/tradecollector.go @@ -34,12 +34,15 @@ type TradeCollector struct { } func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { + tradeStore := NewTradeStore() + tradeStore.EnablePrune = true + return &TradeCollector{ Symbol: symbol, orderSig: sigchan.New(1), tradeC: make(chan types.Trade, 100), - tradeStore: NewTradeStore(), + tradeStore: tradeStore, doneTrades: make(map[types.TradeKey]struct{}), position: position, orderStore: orderStore, @@ -88,7 +91,9 @@ func (c *TradeCollector) Emit() { c.orderSig.Emit() } -func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error { +func (c *TradeCollector) Recover( + ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time, +) error { logrus.Debugf("recovering %s trades...", symbol) trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ diff --git a/pkg/core/tradestore.go b/pkg/core/tradestore.go index 485f820dc..f1acdb244 100644 --- a/pkg/core/tradestore.go +++ b/pkg/core/tradestore.go @@ -4,11 +4,14 @@ import ( "sync" "time" + log "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/types" ) -const TradeExpiryTime = 24 * time.Hour -const PruneTriggerNumOfTrades = 10_000 +const TradeExpiryTime = 3 * time.Hour +const CoolTradePeriod = 1 * time.Hour +const MaximumTradeStoreSize = 1_000 type TradeStore struct { // any created trades for tracking trades @@ -112,14 +115,16 @@ func (s *TradeStore) touchLastTradeTime(trade types.Trade) { } } -// pruneExpiredTrades prunes trades that are older than the expiry time -// see TradeExpiryTime -func (s *TradeStore) pruneExpiredTrades(curTime time.Time) { +// Prune prunes trades that are older than the expiry time +// see TradeExpiryTime (3 hours) +func (s *TradeStore) Prune(curTime time.Time) { s.Lock() defer s.Unlock() var trades = make(map[uint64]types.Trade) var cutOffTime = curTime.Add(-TradeExpiryTime) + + log.Infof("pruning expired trades, cutoff time = %s", cutOffTime.String()) for _, trade := range s.trades { if trade.Time.Before(cutOffTime) { continue @@ -129,15 +134,17 @@ func (s *TradeStore) pruneExpiredTrades(curTime time.Time) { } s.trades = trades -} -func (s *TradeStore) Prune(curTime time.Time) { - s.pruneExpiredTrades(curTime) + log.Infof("trade pruning done, size: %d", len(trades)) } func (s *TradeStore) isCoolTrade(trade types.Trade) bool { - // if the time of last trade is over 1 hour, we call it's cool trade - return s.lastTradeTime != (time.Time{}) && time.Time(trade.Time).Sub(s.lastTradeTime) > time.Hour + // if the duration between the current trade and the last trade is over 1 hour, we call it "cool trade" + return !s.lastTradeTime.IsZero() && time.Time(trade.Time).Sub(s.lastTradeTime) > CoolTradePeriod +} + +func (s *TradeStore) exceededMaximumTradeStoreSize() bool { + return len(s.trades) > MaximumTradeStoreSize } func (s *TradeStore) BindStream(stream types.Stream) { @@ -147,7 +154,7 @@ func (s *TradeStore) BindStream(stream types.Stream) { if s.EnablePrune { stream.OnTradeUpdate(func(trade types.Trade) { - if s.isCoolTrade(trade) { + if s.isCoolTrade(trade) || s.exceededMaximumTradeStoreSize() { s.Prune(time.Time(trade.Time)) } }) diff --git a/pkg/core/tradestore_test.go b/pkg/core/tradestore_test.go index 431572ab4..c820a2a02 100644 --- a/pkg/core/tradestore_test.go +++ b/pkg/core/tradestore_test.go @@ -30,7 +30,7 @@ func TestTradeStore_Prune(t *testing.T) { store := NewTradeStore() store.Add( types.Trade{ID: 1, Time: types.Time(now.Add(-25 * time.Hour))}, - types.Trade{ID: 2, Time: types.Time(now.Add(-23 * time.Hour))}, + types.Trade{ID: 2, Time: types.Time(now.Add(-2 * time.Hour))}, types.Trade{ID: 3, Time: types.Time(now.Add(-2 * time.Minute))}, types.Trade{ID: 4, Time: types.Time(now.Add(-1 * time.Minute))}, )