diff --git a/pkg/core/orderstore.go b/pkg/core/orderstore.go index b639e8ce1..7ba228248 100644 --- a/pkg/core/orderstore.go +++ b/pkg/core/orderstore.go @@ -2,6 +2,7 @@ package core import ( "sync" + "time" "github.com/c9s/bbgo/pkg/types" ) @@ -138,6 +139,26 @@ func (s *OrderStore) BindStream(stream types.Stream) { }) } +func (s *OrderStore) Prune(expiryDuration time.Duration) { + cutOffTime := time.Now().Add(-expiryDuration) + orders := make(map[uint64]types.Order, len(s.orders)) + + s.mu.Lock() + defer s.mu.Unlock() + + for idx, o := range s.orders { + if o.Status == types.OrderStatusCanceled || o.Status == types.OrderStatusFilled { + if o.UpdateTime.Time().Before(cutOffTime) { + continue + } + } + + orders[idx] = o + } + + s.orders = orders +} + func (s *OrderStore) HandleOrderUpdate(order types.Order) { switch order.Status { diff --git a/pkg/core/tradecollector.go b/pkg/core/tradecollector.go index ed581704f..54568b30e 100644 --- a/pkg/core/tradecollector.go +++ b/pkg/core/tradecollector.go @@ -125,7 +125,7 @@ type TradeCollector struct { func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { tradeStore := NewTradeStore() - tradeStore.EnablePrune = true + tradeStore.pruneEnabled = true return &TradeCollector{ Symbol: symbol, diff --git a/pkg/core/tradestore.go b/pkg/core/tradestore.go index f1acdb244..85168027f 100644 --- a/pkg/core/tradestore.go +++ b/pkg/core/tradestore.go @@ -17,18 +17,33 @@ type TradeStore struct { // any created trades for tracking trades sync.Mutex - EnablePrune bool - - trades map[uint64]types.Trade - lastTradeTime time.Time + pruneEnabled bool + storeSize int + trades map[uint64]types.Trade + tradeExpiryDuration time.Duration + lastTradeTime time.Time } func NewTradeStore() *TradeStore { return &TradeStore{ - trades: make(map[uint64]types.Trade), + trades: make(map[uint64]types.Trade), + storeSize: MaximumTradeStoreSize, + tradeExpiryDuration: TradeExpiryTime, } } +func (s *TradeStore) SetPruneEnabled(enabled bool) { + s.pruneEnabled = enabled +} + +func (s *TradeStore) SetTradeExpiryDuration(d time.Duration) { + s.tradeExpiryDuration = d +} + +func (s *TradeStore) SetStoreSize(size int) { + s.storeSize = size +} + func (s *TradeStore) Num() (num int) { s.Lock() num = len(s.trades) @@ -122,7 +137,7 @@ func (s *TradeStore) Prune(curTime time.Time) { defer s.Unlock() var trades = make(map[uint64]types.Trade) - var cutOffTime = curTime.Add(-TradeExpiryTime) + var cutOffTime = curTime.Add(-s.tradeExpiryDuration) log.Infof("pruning expired trades, cutoff time = %s", cutOffTime.String()) for _, trade := range s.trades { @@ -144,7 +159,7 @@ func (s *TradeStore) isCoolTrade(trade types.Trade) bool { } func (s *TradeStore) exceededMaximumTradeStoreSize() bool { - return len(s.trades) > MaximumTradeStoreSize + return len(s.trades) > s.storeSize } func (s *TradeStore) BindStream(stream types.Stream) { @@ -152,7 +167,7 @@ func (s *TradeStore) BindStream(stream types.Stream) { s.Add(trade) }) - if s.EnablePrune { + if s.pruneEnabled { stream.OnTradeUpdate(func(trade types.Trade) { if s.isCoolTrade(trade) || s.exceededMaximumTradeStoreSize() { s.Prune(time.Time(trade.Time)) diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index bc1ca2bb5..7c7c6288e 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -1892,7 +1892,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. } s.historicalTrades = core.NewTradeStore() - s.historicalTrades.EnablePrune = true + s.historicalTrades.SetPruneEnabled(true) s.historicalTrades.BindStream(session.UserDataStream) orderExecutor := bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position) diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 5a150a8bf..2b94a9034 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -1297,6 +1297,8 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { } else { s.coveredPosition.Add(quantity.Neg()) } + + s.resetPositionStartTime() } func (s *Strategy) tradeRecover(ctx context.Context) { @@ -1475,6 +1477,25 @@ func (s *Strategy) accountUpdater(ctx context.Context) { } } +func (s *Strategy) houseCleanWorker(ctx context.Context) { + expiryDuration := 3 * time.Hour + ticker := time.NewTicker(time.Hour) + + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + s.orderStore.Prune(expiryDuration) + + } + + } + +} + func (s *Strategy) hedgeWorker(ctx context.Context) { ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) defer ticker.Stop() @@ -1743,6 +1764,7 @@ func (s *Strategy) CrossRun( s.orderStore.BindStream(s.makerSession.UserDataStream) s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore) + s.tradeCollector.TradeStore().SetPruneEnabled(true) if s.NotifyTrade { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { @@ -1808,6 +1830,7 @@ func (s *Strategy) CrossRun( go s.accountUpdater(ctx) go s.hedgeWorker(ctx) go s.quoteWorker(ctx) + go s.houseCleanWorker(ctx) if s.RecoverTrade { go s.tradeRecover(ctx)