Merge pull request #1448 from c9s/c9s/fix-grid2-memory-leaks

FIX: [core] solve memory leaks
This commit is contained in:
c9s 2023-12-13 09:01:56 +08:00 committed by GitHub
commit cc3302816a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 14 deletions

View File

@ -34,12 +34,15 @@ type TradeCollector struct {
} }
func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector {
tradeStore := NewTradeStore()
tradeStore.EnablePrune = true
return &TradeCollector{ return &TradeCollector{
Symbol: symbol, Symbol: symbol,
orderSig: sigchan.New(1), orderSig: sigchan.New(1),
tradeC: make(chan types.Trade, 100), tradeC: make(chan types.Trade, 100),
tradeStore: NewTradeStore(), tradeStore: tradeStore,
doneTrades: make(map[types.TradeKey]struct{}), doneTrades: make(map[types.TradeKey]struct{}),
position: position, position: position,
orderStore: orderStore, orderStore: orderStore,
@ -88,7 +91,9 @@ func (c *TradeCollector) Emit() {
c.orderSig.Emit() c.orderSig.Emit()
} }
func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error { func (c *TradeCollector) Recover(
ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time,
) error {
logrus.Debugf("recovering %s trades...", symbol) logrus.Debugf("recovering %s trades...", symbol)
trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{

View File

@ -4,11 +4,14 @@ import (
"sync" "sync"
"time" "time"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
const TradeExpiryTime = 24 * time.Hour const TradeExpiryTime = 3 * time.Hour
const PruneTriggerNumOfTrades = 10_000 const CoolTradePeriod = 1 * time.Hour
const MaximumTradeStoreSize = 1_000
type TradeStore struct { type TradeStore struct {
// any created trades for tracking trades // any created trades for tracking trades
@ -112,14 +115,16 @@ func (s *TradeStore) touchLastTradeTime(trade types.Trade) {
} }
} }
// pruneExpiredTrades prunes trades that are older than the expiry time // Prune prunes trades that are older than the expiry time
// see TradeExpiryTime // see TradeExpiryTime (3 hours)
func (s *TradeStore) pruneExpiredTrades(curTime time.Time) { func (s *TradeStore) Prune(curTime time.Time) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
var trades = make(map[uint64]types.Trade) var trades = make(map[uint64]types.Trade)
var cutOffTime = curTime.Add(-TradeExpiryTime) var cutOffTime = curTime.Add(-TradeExpiryTime)
log.Infof("pruning expired trades, cutoff time = %s", cutOffTime.String())
for _, trade := range s.trades { for _, trade := range s.trades {
if trade.Time.Before(cutOffTime) { if trade.Time.Before(cutOffTime) {
continue continue
@ -129,15 +134,17 @@ func (s *TradeStore) pruneExpiredTrades(curTime time.Time) {
} }
s.trades = trades s.trades = trades
}
func (s *TradeStore) Prune(curTime time.Time) { log.Infof("trade pruning done, size: %d", len(trades))
s.pruneExpiredTrades(curTime)
} }
func (s *TradeStore) isCoolTrade(trade types.Trade) bool { func (s *TradeStore) isCoolTrade(trade types.Trade) bool {
// if the time of last trade is over 1 hour, we call it's cool trade // if the duration between the current trade and the last trade is over 1 hour, we call it "cool trade"
return s.lastTradeTime != (time.Time{}) && time.Time(trade.Time).Sub(s.lastTradeTime) > time.Hour return !s.lastTradeTime.IsZero() && time.Time(trade.Time).Sub(s.lastTradeTime) > CoolTradePeriod
}
func (s *TradeStore) exceededMaximumTradeStoreSize() bool {
return len(s.trades) > MaximumTradeStoreSize
} }
func (s *TradeStore) BindStream(stream types.Stream) { func (s *TradeStore) BindStream(stream types.Stream) {
@ -147,7 +154,7 @@ func (s *TradeStore) BindStream(stream types.Stream) {
if s.EnablePrune { if s.EnablePrune {
stream.OnTradeUpdate(func(trade types.Trade) { stream.OnTradeUpdate(func(trade types.Trade) {
if s.isCoolTrade(trade) { if s.isCoolTrade(trade) || s.exceededMaximumTradeStoreSize() {
s.Prune(time.Time(trade.Time)) s.Prune(time.Time(trade.Time))
} }
}) })

View File

@ -30,7 +30,7 @@ func TestTradeStore_Prune(t *testing.T) {
store := NewTradeStore() store := NewTradeStore()
store.Add( store.Add(
types.Trade{ID: 1, Time: types.Time(now.Add(-25 * time.Hour))}, types.Trade{ID: 1, Time: types.Time(now.Add(-25 * time.Hour))},
types.Trade{ID: 2, Time: types.Time(now.Add(-23 * time.Hour))}, types.Trade{ID: 2, Time: types.Time(now.Add(-2 * time.Hour))},
types.Trade{ID: 3, Time: types.Time(now.Add(-2 * time.Minute))}, types.Trade{ID: 3, Time: types.Time(now.Add(-2 * time.Minute))},
types.Trade{ID: 4, Time: types.Time(now.Add(-1 * time.Minute))}, types.Trade{ID: 4, Time: types.Time(now.Add(-1 * time.Minute))},
) )