diff --git a/pkg/bbgo/config.go b/pkg/bbgo/config.go index 591176484..1fac9e314 100644 --- a/pkg/bbgo/config.go +++ b/pkg/bbgo/config.go @@ -329,6 +329,8 @@ type ServiceConfig struct { type EnvironmentConfig struct { DisableDefaultKLineSubscription bool `json:"disableDefaultKLineSubscription"` DisableHistoryKLinePreload bool `json:"disableHistoryKLinePreload"` + DisableSessionTradeBuffer bool `json:"disableSessionTradeBuffer"` + MaxSessionTradeBufferSize int `json:"maxSessionTradeBufferSize"` } type Config struct { diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 617abef2f..75012fb03 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strings" "sync" "time" @@ -20,7 +19,6 @@ import ( exchange2 "github.com/c9s/bbgo/pkg/exchange" "github.com/c9s/bbgo/pkg/fixedpoint" - "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" ) @@ -398,50 +396,39 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ return fmt.Errorf("market %s is not defined", symbol) } - var err error - var trades []types.Trade - if environ.SyncService != nil && environ.BacktestService == nil { - tradingFeeCurrency := session.Exchange.PlatformFeeCurrency() - if strings.HasPrefix(symbol, tradingFeeCurrency) { - trades, err = environ.TradeService.QueryForTradingFeeCurrency(session.Exchange.Name(), symbol, tradingFeeCurrency) - } else { - trades, err = environ.TradeService.Query(service.QueryTradesOptions{ - Exchange: session.Exchange.Name(), - Symbol: symbol, - Ordering: "DESC", - Limit: 100, - }) - } - - if err != nil { - return err - } - - trades = types.SortTradesAscending(trades) - log.Infof("symbol %s: %d trades loaded", symbol, len(trades)) + disableSessionTradeBuffer := environ.environmentConfig != nil && environ.environmentConfig.DisableSessionTradeBuffer + maxSessionTradeBufferSize := 0 + if environ.environmentConfig != nil && environ.environmentConfig.MaxSessionTradeBufferSize > 0 { + maxSessionTradeBufferSize = environ.environmentConfig.MaxSessionTradeBufferSize } - session.Trades[symbol] = &types.TradeSlice{Trades: trades} - session.UserDataStream.OnTradeUpdate(func(trade types.Trade) { - if trade.Symbol != symbol { - return - } + session.Trades[symbol] = &types.TradeSlice{Trades: nil} - session.Trades[symbol].Append(trade) - }) + if !disableSessionTradeBuffer { + session.UserDataStream.OnTradeUpdate(func(trade types.Trade) { + if trade.Symbol != symbol { + return + } + session.Trades[symbol].Append(trade) + + if maxSessionTradeBufferSize > 0 { + session.Trades[symbol].Truncate(maxSessionTradeBufferSize) + } + }) + } + + // session wide position position := &types.Position{ Symbol: symbol, BaseCurrency: market.BaseCurrency, QuoteCurrency: market.QuoteCurrency, } - position.AddTrades(trades) position.BindStream(session.UserDataStream) session.positions[symbol] = position orderStore := core.NewOrderStore(symbol) orderStore.AddOrderUpdate = true - orderStore.BindStream(session.UserDataStream) session.orderStores[symbol] = orderStore diff --git a/pkg/types/trade.go b/pkg/types/trade.go index f80b864a2..5a7eb0a7a 100644 --- a/pkg/types/trade.go +++ b/pkg/types/trade.go @@ -47,6 +47,16 @@ func (s *TradeSlice) Append(t Trade) { s.mu.Unlock() } +func (s *TradeSlice) Truncate(size int) { + s.mu.Lock() + + if len(s.Trades) > size { + s.Trades = s.Trades[len(s.Trades)-1-size:] + } + + s.mu.Unlock() +} + type Trade struct { // GID is the global ID GID int64 `json:"gid" db:"gid"`