From eac0195815cd281e4fadcfdfeb9dbb3396e7c0ea Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 17 Nov 2023 17:11:22 +0800 Subject: [PATCH] bbgo: truncate trade buffer if it gets too large --- pkg/bbgo/config.go | 2 ++ pkg/bbgo/session.go | 28 ++++++++++++++++++++-------- pkg/types/trade.go | 10 ++++++++++ 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pkg/bbgo/config.go b/pkg/bbgo/config.go index 591176484..1fac9e314 100644 --- a/pkg/bbgo/config.go +++ b/pkg/bbgo/config.go @@ -329,6 +329,8 @@ type ServiceConfig struct { type EnvironmentConfig struct { DisableDefaultKLineSubscription bool `json:"disableDefaultKLineSubscription"` DisableHistoryKLinePreload bool `json:"disableHistoryKLinePreload"` + DisableSessionTradeBuffer bool `json:"disableSessionTradeBuffer"` + MaxSessionTradeBufferSize int `json:"maxSessionTradeBufferSize"` } type Config struct { diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index aacb86b69..75012fb03 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -396,14 +396,27 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ return fmt.Errorf("market %s is not defined", symbol) } - session.Trades[symbol] = &types.TradeSlice{Trades: nil} - session.UserDataStream.OnTradeUpdate(func(trade types.Trade) { - if trade.Symbol != symbol { - return - } + disableSessionTradeBuffer := environ.environmentConfig != nil && environ.environmentConfig.DisableSessionTradeBuffer + maxSessionTradeBufferSize := 0 + if environ.environmentConfig != nil && environ.environmentConfig.MaxSessionTradeBufferSize > 0 { + maxSessionTradeBufferSize = environ.environmentConfig.MaxSessionTradeBufferSize + } - session.Trades[symbol].Append(trade) - }) + session.Trades[symbol] = &types.TradeSlice{Trades: nil} + + if !disableSessionTradeBuffer { + session.UserDataStream.OnTradeUpdate(func(trade types.Trade) { + if trade.Symbol != symbol { + return + } + + session.Trades[symbol].Append(trade) + + if maxSessionTradeBufferSize > 0 { + session.Trades[symbol].Truncate(maxSessionTradeBufferSize) + } + }) + } // session wide position position := &types.Position{ @@ -416,7 +429,6 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ orderStore := core.NewOrderStore(symbol) orderStore.AddOrderUpdate = true - orderStore.BindStream(session.UserDataStream) session.orderStores[symbol] = orderStore diff --git a/pkg/types/trade.go b/pkg/types/trade.go index f80b864a2..5a7eb0a7a 100644 --- a/pkg/types/trade.go +++ b/pkg/types/trade.go @@ -47,6 +47,16 @@ func (s *TradeSlice) Append(t Trade) { s.mu.Unlock() } +func (s *TradeSlice) Truncate(size int) { + s.mu.Lock() + + if len(s.Trades) > size { + s.Trades = s.Trades[len(s.Trades)-1-size:] + } + + s.mu.Unlock() +} + type Trade struct { // GID is the global ID GID int64 `json:"gid" db:"gid"`