diff --git a/pkg/strategy/xmaker/signal_book.go b/pkg/strategy/xmaker/signal_book.go index 4ab19a003..93e13dcc9 100644 --- a/pkg/strategy/xmaker/signal_book.go +++ b/pkg/strategy/xmaker/signal_book.go @@ -21,6 +21,10 @@ func init() { prometheus.MustRegister(orderBookSignalMetrics) } +type StreamBookSetter interface { + SetStreamBook(book *types.StreamOrderBook) +} + type OrderBookBestPriceVolumeSignal struct { RatioThreshold fixedpoint.Value `json:"ratioThreshold"` MinVolume fixedpoint.Value `json:"minVolume"` @@ -29,7 +33,7 @@ type OrderBookBestPriceVolumeSignal struct { book *types.StreamOrderBook } -func (s *OrderBookBestPriceVolumeSignal) BindStreamBook(book *types.StreamOrderBook) { +func (s *OrderBookBestPriceVolumeSignal) SetStreamBook(book *types.StreamOrderBook) { s.book = book } diff --git a/pkg/strategy/xmaker/signal_depth.go b/pkg/strategy/xmaker/signal_depth.go index db9d31b9c..b8455f542 100644 --- a/pkg/strategy/xmaker/signal_depth.go +++ b/pkg/strategy/xmaker/signal_depth.go @@ -31,7 +31,7 @@ type DepthRatioSignal struct { book *types.StreamOrderBook } -func (s *DepthRatioSignal) BindStreamBook(book *types.StreamOrderBook) { +func (s *DepthRatioSignal) SetStreamBook(book *types.StreamOrderBook) { s.book = book } diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index ee86b88e9..49c4ac2b5 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -90,6 +90,20 @@ type SignalConfig struct { TradeVolumeWindowSignal *TradeVolumeWindowSignal `json:"tradeVolumeWindow,omitempty"` } +func (c *SignalConfig) Get() SignalProvider { + if c.OrderBookBestPriceSignal != nil { + return c.OrderBookBestPriceSignal + } else if c.DepthRatioSignal != nil { + return c.DepthRatioSignal + } else if c.BollingerBandTrendSignal != nil { + return c.BollingerBandTrendSignal + } else if c.TradeVolumeWindowSignal != nil { + return c.TradeVolumeWindowSignal + } + + panic(fmt.Errorf("no valid signal provider found, please check your config")) +} + func init() { bbgo.RegisterStrategy(ID, &Strategy{}) } @@ -422,18 +436,9 @@ func (s *Strategy) applyBollingerMargin( func (s *Strategy) aggregateSignal(ctx context.Context) (float64, error) { sum := 0.0 voters := 0.0 - for _, signal := range s.SignalConfigList { - var sig float64 - var err error - if signal.OrderBookBestPriceSignal != nil { - sig, err = signal.OrderBookBestPriceSignal.CalculateSignal(ctx) - } else if signal.DepthRatioSignal != nil { - sig, err = signal.DepthRatioSignal.CalculateSignal(ctx) - } else if signal.BollingerBandTrendSignal != nil { - sig, err = signal.BollingerBandTrendSignal.CalculateSignal(ctx) - } else if signal.TradeVolumeWindowSignal != nil { - sig, err = signal.TradeVolumeWindowSignal.CalculateSignal(ctx) - } + for _, signalConfig := range s.SignalConfigList { + signalProvider := signalConfig.Get() + sig, err := signalProvider.CalculateSignal(ctx) if err != nil { return 0, err @@ -441,9 +446,9 @@ func (s *Strategy) aggregateSignal(ctx context.Context) (float64, error) { continue } - if signal.Weight > 0.0 { - sum += sig * signal.Weight - voters += signal.Weight + if signalConfig.Weight > 0.0 { + sum += sig * signalConfig.Weight + voters += signalConfig.Weight } else { sum += sig voters++ @@ -601,6 +606,9 @@ func (s *Strategy) updateQuote(ctx context.Context) error { s.logger.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago", s.Symbol, time.Since(bookLastUpdateTime)) + + s.sourceBook.Reset() + s.sourceSession.MarketDataStream.Reconnect() return err } @@ -608,6 +616,9 @@ func (s *Strategy) updateQuote(ctx context.Context) error { s.logger.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago", s.Symbol, time.Since(bookLastUpdateTime)) + + s.sourceBook.Reset() + s.sourceSession.MarketDataStream.Reconnect() return err } @@ -1595,6 +1606,8 @@ func (s *Strategy) CrossRun( s.sourceBook.BindStream(s.sourceSession.MarketDataStream) if s.EnableSignalMargin { + s.logger.Infof("signal margin is enabled") + scale, err := s.SignalMarginScale.Scale() if err != nil { return err @@ -1611,26 +1624,18 @@ func (s *Strategy) CrossRun( minAdditionalMargin*100.0, middleAdditionalMargin*100.0, maxAdditionalMargin*100.0) - return nil } for _, signalConfig := range s.SignalConfigList { - if signalConfig.OrderBookBestPriceSignal != nil { - signalConfig.OrderBookBestPriceSignal.book = s.sourceBook - if err := signalConfig.OrderBookBestPriceSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil { - return err - } - } else if signalConfig.DepthRatioSignal != nil { - signalConfig.DepthRatioSignal.book = s.sourceBook - if err := signalConfig.DepthRatioSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil { - return err - } - } else if signalConfig.BollingerBandTrendSignal != nil { - if err := signalConfig.BollingerBandTrendSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil { - return err - } - } else if signalConfig.TradeVolumeWindowSignal != nil { - if err := signalConfig.TradeVolumeWindowSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil { + signal := signalConfig.Get() + if setter, ok := signal.(StreamBookSetter); ok { + s.logger.Infof("setting stream book on signal %T", signal) + setter.SetStreamBook(s.sourceBook) + } + + if binder, ok := signal.(SessionBinder); ok { + s.logger.Infof("binding session on signal %T", signal) + if err := binder.Bind(ctx, s.sourceSession, s.Symbol); err != nil { return err } } @@ -1697,11 +1702,8 @@ func (s *Strategy) CrossRun( s.connectivityGroup = types.NewConnectivityGroup(sourceConnectivity) - if s.RecoverTrade { - go s.tradeRecover(ctx) - } - go func() { + s.logger.Infof("waiting for authentication connections to be ready...") select { case <-ctx.Done(): case <-s.connectivityGroup.AllAuthedC(ctx, 15*time.Second): @@ -1712,6 +1714,10 @@ func (s *Strategy) CrossRun( go s.accountUpdater(ctx) go s.hedgeWorker(ctx) go s.quoteWorker(ctx) + + if s.RecoverTrade { + go s.tradeRecover(ctx) + } }() bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {