From b28b5e409791a061a2c8ed44a5e70a026d4b555b Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 11 Nov 2023 07:42:29 +0800 Subject: [PATCH] bbgo: add environment config for disabling some klines defaults --- pkg/bbgo/config.go | 7 ++++ pkg/bbgo/environment.go | 4 +-- pkg/bbgo/session.go | 78 ++++++++++++++++++++++------------------- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/pkg/bbgo/config.go b/pkg/bbgo/config.go index 4cbc23048..591176484 100644 --- a/pkg/bbgo/config.go +++ b/pkg/bbgo/config.go @@ -326,6 +326,11 @@ type ServiceConfig struct { GoogleSpreadSheetService *GoogleSpreadSheetServiceConfig `json:"googleSpreadSheet" yaml:"googleSpreadSheet"` } +type EnvironmentConfig struct { + DisableDefaultKLineSubscription bool `json:"disableDefaultKLineSubscription"` + DisableHistoryKLinePreload bool `json:"disableHistoryKLinePreload"` +} + type Config struct { Build *BuildConfig `json:"build,omitempty" yaml:"build,omitempty"` @@ -343,6 +348,8 @@ type Config struct { Service *ServiceConfig `json:"services,omitempty" yaml:"services,omitempty"` + Environment *EnvironmentConfig `json:"environment,omitempty" yaml:"environment,omitempty"` + Sessions map[string]*ExchangeSession `json:"sessions,omitempty" yaml:"sessions,omitempty"` RiskControls *RiskControls `json:"riskControls,omitempty" yaml:"riskControls,omitempty"` diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 71ef121eb..ce8b98fb5 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -108,13 +108,13 @@ type Environment struct { syncStatus SyncStatus syncConfig *SyncConfig - loggingConfig *LoggingConfig + loggingConfig *LoggingConfig + environmentConfig *EnvironmentConfig sessions map[string]*ExchangeSession } func NewEnvironment() *Environment { - now := time.Now() return &Environment{ // default trade scan time diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 3d2cc82e9..101abd3f9 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -472,49 +472,53 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ } if sub.Symbol == symbol { - klineSubscriptions[types.Interval(sub.Options.Interval)] = struct{}{} + klineSubscriptions[sub.Options.Interval] = struct{}{} } } } - // always subscribe the 1m kline so we can make sure the connection persists. - klineSubscriptions[minInterval] = struct{}{} - - for interval := range klineSubscriptions { - // avoid querying the last unclosed kline - endTime := environ.startTime - var i int64 - for i = 0; i < KLinePreloadLimit; i += 1000 { - var duration time.Duration = time.Duration(-i * int64(interval.Duration())) - e := endTime.Add(duration) - - kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ - EndTime: &e, - Limit: 1000, // indicators need at least 100 - }) - if err != nil { - return err - } - - if len(kLines) == 0 { - log.Warnf("no kline data for %s %s (end time <= %s)", symbol, interval, e) - continue - } - - // update last prices by the given kline - lastKLine := kLines[len(kLines)-1] - if interval == minInterval { - session.lastPrices[symbol] = lastKLine.Close - } - - for _, k := range kLines { - // let market data store trigger the update, so that the indicator could be updated too. - marketDataStore.AddKLine(k) - } - } + if !(environ.environmentConfig != nil && environ.environmentConfig.DisableDefaultKLineSubscription) { + // subscribe the 1m kline by default so we can make sure the connection persists. + klineSubscriptions[minInterval] = struct{}{} } - log.Infof("%s last price: %v", symbol, session.lastPrices[symbol]) + if !(environ.environmentConfig != nil && environ.environmentConfig.DisableHistoryKLinePreload) { + for interval := range klineSubscriptions { + // avoid querying the last unclosed kline + endTime := environ.startTime + var i int64 + for i = 0; i < KLinePreloadLimit; i += 1000 { + var duration time.Duration = time.Duration(-i * int64(interval.Duration())) + e := endTime.Add(duration) + + kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ + EndTime: &e, + Limit: 1000, // indicators need at least 100 + }) + if err != nil { + return err + } + + if len(kLines) == 0 { + log.Warnf("no kline data for %s %s (end time <= %s)", symbol, interval, e) + continue + } + + // update last prices by the given kline + lastKLine := kLines[len(kLines)-1] + if interval == minInterval { + session.lastPrices[symbol] = lastKLine.Close + } + + for _, k := range kLines { + // let market data store trigger the update, so that the indicator could be updated too. + marketDataStore.AddKLine(k) + } + } + } + + log.Infof("%s last price: %v", symbol, session.lastPrices[symbol]) + } session.initializedSymbols[symbol] = struct{}{} return nil