Merge pull request #1423 from c9s/c9s/improve-trade-buffer-mem-size

This commit is contained in:
c9s 2023-11-17 19:45:55 +08:00 committed by GitHub
commit 8f7c22b0f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 32 deletions

View File

@ -329,6 +329,8 @@ type ServiceConfig struct {
type EnvironmentConfig struct {
DisableDefaultKLineSubscription bool `json:"disableDefaultKLineSubscription"`
DisableHistoryKLinePreload bool `json:"disableHistoryKLinePreload"`
DisableSessionTradeBuffer bool `json:"disableSessionTradeBuffer"`
MaxSessionTradeBufferSize int `json:"maxSessionTradeBufferSize"`
}
type Config struct {

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
@ -20,7 +19,6 @@ import (
exchange2 "github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
@ -398,50 +396,39 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
return fmt.Errorf("market %s is not defined", symbol)
}
var err error
var trades []types.Trade
if environ.SyncService != nil && environ.BacktestService == nil {
tradingFeeCurrency := session.Exchange.PlatformFeeCurrency()
if strings.HasPrefix(symbol, tradingFeeCurrency) {
trades, err = environ.TradeService.QueryForTradingFeeCurrency(session.Exchange.Name(), symbol, tradingFeeCurrency)
} else {
trades, err = environ.TradeService.Query(service.QueryTradesOptions{
Exchange: session.Exchange.Name(),
Symbol: symbol,
Ordering: "DESC",
Limit: 100,
})
}
if err != nil {
return err
}
trades = types.SortTradesAscending(trades)
log.Infof("symbol %s: %d trades loaded", symbol, len(trades))
disableSessionTradeBuffer := environ.environmentConfig != nil && environ.environmentConfig.DisableSessionTradeBuffer
maxSessionTradeBufferSize := 0
if environ.environmentConfig != nil && environ.environmentConfig.MaxSessionTradeBufferSize > 0 {
maxSessionTradeBufferSize = environ.environmentConfig.MaxSessionTradeBufferSize
}
session.Trades[symbol] = &types.TradeSlice{Trades: trades}
session.UserDataStream.OnTradeUpdate(func(trade types.Trade) {
if trade.Symbol != symbol {
return
}
session.Trades[symbol] = &types.TradeSlice{Trades: nil}
session.Trades[symbol].Append(trade)
})
if !disableSessionTradeBuffer {
session.UserDataStream.OnTradeUpdate(func(trade types.Trade) {
if trade.Symbol != symbol {
return
}
session.Trades[symbol].Append(trade)
if maxSessionTradeBufferSize > 0 {
session.Trades[symbol].Truncate(maxSessionTradeBufferSize)
}
})
}
// session wide position
position := &types.Position{
Symbol: symbol,
BaseCurrency: market.BaseCurrency,
QuoteCurrency: market.QuoteCurrency,
}
position.AddTrades(trades)
position.BindStream(session.UserDataStream)
session.positions[symbol] = position
orderStore := core.NewOrderStore(symbol)
orderStore.AddOrderUpdate = true
orderStore.BindStream(session.UserDataStream)
session.orderStores[symbol] = orderStore

View File

@ -47,6 +47,16 @@ func (s *TradeSlice) Append(t Trade) {
s.mu.Unlock()
}
func (s *TradeSlice) Truncate(size int) {
s.mu.Lock()
if len(s.Trades) > size {
s.Trades = s.Trades[len(s.Trades)-1-size:]
}
s.mu.Unlock()
}
type Trade struct {
// GID is the global ID
GID int64 `json:"gid" db:"gid"`