Merge pull request #1782 from c9s/c9s/xmaker/improvements2
Some checks failed
Go / build (1.21, 6.2) (push) Has been cancelled
golang-lint / lint (push) Has been cancelled

IMPROVE: [grid2][xmaker] prune expired trades
This commit is contained in:
c9s 2024-10-17 13:22:24 +08:00 committed by GitHub
commit 5a1249e871
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 69 additions and 10 deletions

View File

@ -2,6 +2,7 @@ package core
import ( import (
"sync" "sync"
"time"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -138,6 +139,26 @@ func (s *OrderStore) BindStream(stream types.Stream) {
}) })
} }
func (s *OrderStore) Prune(expiryDuration time.Duration) {
cutOffTime := time.Now().Add(-expiryDuration)
orders := make(map[uint64]types.Order, len(s.orders))
s.mu.Lock()
defer s.mu.Unlock()
for idx, o := range s.orders {
if o.Status == types.OrderStatusCanceled || o.Status == types.OrderStatusFilled {
if o.UpdateTime.Time().Before(cutOffTime) {
continue
}
}
orders[idx] = o
}
s.orders = orders
}
func (s *OrderStore) HandleOrderUpdate(order types.Order) { func (s *OrderStore) HandleOrderUpdate(order types.Order) {
switch order.Status { switch order.Status {

View File

@ -125,7 +125,7 @@ type TradeCollector struct {
func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector {
tradeStore := NewTradeStore() tradeStore := NewTradeStore()
tradeStore.EnablePrune = true tradeStore.pruneEnabled = true
return &TradeCollector{ return &TradeCollector{
Symbol: symbol, Symbol: symbol,

View File

@ -17,18 +17,33 @@ type TradeStore struct {
// any created trades for tracking trades // any created trades for tracking trades
sync.Mutex sync.Mutex
EnablePrune bool pruneEnabled bool
storeSize int
trades map[uint64]types.Trade trades map[uint64]types.Trade
lastTradeTime time.Time tradeExpiryDuration time.Duration
lastTradeTime time.Time
} }
func NewTradeStore() *TradeStore { func NewTradeStore() *TradeStore {
return &TradeStore{ return &TradeStore{
trades: make(map[uint64]types.Trade), trades: make(map[uint64]types.Trade),
storeSize: MaximumTradeStoreSize,
tradeExpiryDuration: TradeExpiryTime,
} }
} }
func (s *TradeStore) SetPruneEnabled(enabled bool) {
s.pruneEnabled = enabled
}
func (s *TradeStore) SetTradeExpiryDuration(d time.Duration) {
s.tradeExpiryDuration = d
}
func (s *TradeStore) SetStoreSize(size int) {
s.storeSize = size
}
func (s *TradeStore) Num() (num int) { func (s *TradeStore) Num() (num int) {
s.Lock() s.Lock()
num = len(s.trades) num = len(s.trades)
@ -122,7 +137,7 @@ func (s *TradeStore) Prune(curTime time.Time) {
defer s.Unlock() defer s.Unlock()
var trades = make(map[uint64]types.Trade) var trades = make(map[uint64]types.Trade)
var cutOffTime = curTime.Add(-TradeExpiryTime) var cutOffTime = curTime.Add(-s.tradeExpiryDuration)
log.Infof("pruning expired trades, cutoff time = %s", cutOffTime.String()) log.Infof("pruning expired trades, cutoff time = %s", cutOffTime.String())
for _, trade := range s.trades { for _, trade := range s.trades {
@ -144,7 +159,7 @@ func (s *TradeStore) isCoolTrade(trade types.Trade) bool {
} }
func (s *TradeStore) exceededMaximumTradeStoreSize() bool { func (s *TradeStore) exceededMaximumTradeStoreSize() bool {
return len(s.trades) > MaximumTradeStoreSize return len(s.trades) > s.storeSize
} }
func (s *TradeStore) BindStream(stream types.Stream) { func (s *TradeStore) BindStream(stream types.Stream) {
@ -152,7 +167,7 @@ func (s *TradeStore) BindStream(stream types.Stream) {
s.Add(trade) s.Add(trade)
}) })
if s.EnablePrune { if s.pruneEnabled {
stream.OnTradeUpdate(func(trade types.Trade) { stream.OnTradeUpdate(func(trade types.Trade) {
if s.isCoolTrade(trade) || s.exceededMaximumTradeStoreSize() { if s.isCoolTrade(trade) || s.exceededMaximumTradeStoreSize() {
s.Prune(time.Time(trade.Time)) s.Prune(time.Time(trade.Time))

View File

@ -1892,7 +1892,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
} }
s.historicalTrades = core.NewTradeStore() s.historicalTrades = core.NewTradeStore()
s.historicalTrades.EnablePrune = true s.historicalTrades.SetPruneEnabled(true)
s.historicalTrades.BindStream(session.UserDataStream) s.historicalTrades.BindStream(session.UserDataStream)
orderExecutor := bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position) orderExecutor := bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)

View File

@ -1297,6 +1297,8 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
} else { } else {
s.coveredPosition.Add(quantity.Neg()) s.coveredPosition.Add(quantity.Neg())
} }
s.resetPositionStartTime()
} }
func (s *Strategy) tradeRecover(ctx context.Context) { func (s *Strategy) tradeRecover(ctx context.Context) {
@ -1475,6 +1477,25 @@ func (s *Strategy) accountUpdater(ctx context.Context) {
} }
} }
func (s *Strategy) houseCleanWorker(ctx context.Context) {
expiryDuration := 3 * time.Hour
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.orderStore.Prune(expiryDuration)
}
}
}
func (s *Strategy) hedgeWorker(ctx context.Context) { func (s *Strategy) hedgeWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
defer ticker.Stop() defer ticker.Stop()
@ -1743,6 +1764,7 @@ func (s *Strategy) CrossRun(
s.orderStore.BindStream(s.makerSession.UserDataStream) s.orderStore.BindStream(s.makerSession.UserDataStream)
s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore) s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
s.tradeCollector.TradeStore().SetPruneEnabled(true)
if s.NotifyTrade { if s.NotifyTrade {
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
@ -1808,6 +1830,7 @@ func (s *Strategy) CrossRun(
go s.accountUpdater(ctx) go s.accountUpdater(ctx)
go s.hedgeWorker(ctx) go s.hedgeWorker(ctx)
go s.quoteWorker(ctx) go s.quoteWorker(ctx)
go s.houseCleanWorker(ctx)
if s.RecoverTrade { if s.RecoverTrade {
go s.tradeRecover(ctx) go s.tradeRecover(ctx)