xmaker: refactor signal methods

This commit is contained in:
c9s 2024-10-09 12:33:11 +08:00
parent cea59ef9cf
commit 49e949dbc9
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 48 additions and 38 deletions

View File

@ -21,6 +21,10 @@ func init() {
prometheus.MustRegister(orderBookSignalMetrics) prometheus.MustRegister(orderBookSignalMetrics)
} }
type StreamBookSetter interface {
SetStreamBook(book *types.StreamOrderBook)
}
type OrderBookBestPriceVolumeSignal struct { type OrderBookBestPriceVolumeSignal struct {
RatioThreshold fixedpoint.Value `json:"ratioThreshold"` RatioThreshold fixedpoint.Value `json:"ratioThreshold"`
MinVolume fixedpoint.Value `json:"minVolume"` MinVolume fixedpoint.Value `json:"minVolume"`
@ -29,7 +33,7 @@ type OrderBookBestPriceVolumeSignal struct {
book *types.StreamOrderBook book *types.StreamOrderBook
} }
func (s *OrderBookBestPriceVolumeSignal) BindStreamBook(book *types.StreamOrderBook) { func (s *OrderBookBestPriceVolumeSignal) SetStreamBook(book *types.StreamOrderBook) {
s.book = book s.book = book
} }

View File

@ -31,7 +31,7 @@ type DepthRatioSignal struct {
book *types.StreamOrderBook book *types.StreamOrderBook
} }
func (s *DepthRatioSignal) BindStreamBook(book *types.StreamOrderBook) { func (s *DepthRatioSignal) SetStreamBook(book *types.StreamOrderBook) {
s.book = book s.book = book
} }

View File

@ -90,6 +90,20 @@ type SignalConfig struct {
TradeVolumeWindowSignal *TradeVolumeWindowSignal `json:"tradeVolumeWindow,omitempty"` TradeVolumeWindowSignal *TradeVolumeWindowSignal `json:"tradeVolumeWindow,omitempty"`
} }
func (c *SignalConfig) Get() SignalProvider {
if c.OrderBookBestPriceSignal != nil {
return c.OrderBookBestPriceSignal
} else if c.DepthRatioSignal != nil {
return c.DepthRatioSignal
} else if c.BollingerBandTrendSignal != nil {
return c.BollingerBandTrendSignal
} else if c.TradeVolumeWindowSignal != nil {
return c.TradeVolumeWindowSignal
}
panic(fmt.Errorf("no valid signal provider found, please check your config"))
}
func init() { func init() {
bbgo.RegisterStrategy(ID, &Strategy{}) bbgo.RegisterStrategy(ID, &Strategy{})
} }
@ -422,18 +436,9 @@ func (s *Strategy) applyBollingerMargin(
func (s *Strategy) aggregateSignal(ctx context.Context) (float64, error) { func (s *Strategy) aggregateSignal(ctx context.Context) (float64, error) {
sum := 0.0 sum := 0.0
voters := 0.0 voters := 0.0
for _, signal := range s.SignalConfigList { for _, signalConfig := range s.SignalConfigList {
var sig float64 signalProvider := signalConfig.Get()
var err error sig, err := signalProvider.CalculateSignal(ctx)
if signal.OrderBookBestPriceSignal != nil {
sig, err = signal.OrderBookBestPriceSignal.CalculateSignal(ctx)
} else if signal.DepthRatioSignal != nil {
sig, err = signal.DepthRatioSignal.CalculateSignal(ctx)
} else if signal.BollingerBandTrendSignal != nil {
sig, err = signal.BollingerBandTrendSignal.CalculateSignal(ctx)
} else if signal.TradeVolumeWindowSignal != nil {
sig, err = signal.TradeVolumeWindowSignal.CalculateSignal(ctx)
}
if err != nil { if err != nil {
return 0, err return 0, err
@ -441,9 +446,9 @@ func (s *Strategy) aggregateSignal(ctx context.Context) (float64, error) {
continue continue
} }
if signal.Weight > 0.0 { if signalConfig.Weight > 0.0 {
sum += sig * signal.Weight sum += sig * signalConfig.Weight
voters += signal.Weight voters += signalConfig.Weight
} else { } else {
sum += sig sum += sig
voters++ voters++
@ -601,6 +606,9 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
s.logger.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago", s.logger.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago",
s.Symbol, s.Symbol,
time.Since(bookLastUpdateTime)) time.Since(bookLastUpdateTime))
s.sourceBook.Reset()
s.sourceSession.MarketDataStream.Reconnect()
return err return err
} }
@ -608,6 +616,9 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
s.logger.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago", s.logger.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago",
s.Symbol, s.Symbol,
time.Since(bookLastUpdateTime)) time.Since(bookLastUpdateTime))
s.sourceBook.Reset()
s.sourceSession.MarketDataStream.Reconnect()
return err return err
} }
@ -1595,6 +1606,8 @@ func (s *Strategy) CrossRun(
s.sourceBook.BindStream(s.sourceSession.MarketDataStream) s.sourceBook.BindStream(s.sourceSession.MarketDataStream)
if s.EnableSignalMargin { if s.EnableSignalMargin {
s.logger.Infof("signal margin is enabled")
scale, err := s.SignalMarginScale.Scale() scale, err := s.SignalMarginScale.Scale()
if err != nil { if err != nil {
return err return err
@ -1611,26 +1624,18 @@ func (s *Strategy) CrossRun(
minAdditionalMargin*100.0, minAdditionalMargin*100.0,
middleAdditionalMargin*100.0, middleAdditionalMargin*100.0,
maxAdditionalMargin*100.0) maxAdditionalMargin*100.0)
return nil
} }
for _, signalConfig := range s.SignalConfigList { for _, signalConfig := range s.SignalConfigList {
if signalConfig.OrderBookBestPriceSignal != nil { signal := signalConfig.Get()
signalConfig.OrderBookBestPriceSignal.book = s.sourceBook if setter, ok := signal.(StreamBookSetter); ok {
if err := signalConfig.OrderBookBestPriceSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil { s.logger.Infof("setting stream book on signal %T", signal)
return err setter.SetStreamBook(s.sourceBook)
} }
} else if signalConfig.DepthRatioSignal != nil {
signalConfig.DepthRatioSignal.book = s.sourceBook if binder, ok := signal.(SessionBinder); ok {
if err := signalConfig.DepthRatioSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil { s.logger.Infof("binding session on signal %T", signal)
return err if err := binder.Bind(ctx, s.sourceSession, s.Symbol); err != nil {
}
} else if signalConfig.BollingerBandTrendSignal != nil {
if err := signalConfig.BollingerBandTrendSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil {
return err
}
} else if signalConfig.TradeVolumeWindowSignal != nil {
if err := signalConfig.TradeVolumeWindowSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil {
return err return err
} }
} }
@ -1697,11 +1702,8 @@ func (s *Strategy) CrossRun(
s.connectivityGroup = types.NewConnectivityGroup(sourceConnectivity) s.connectivityGroup = types.NewConnectivityGroup(sourceConnectivity)
if s.RecoverTrade {
go s.tradeRecover(ctx)
}
go func() { go func() {
s.logger.Infof("waiting for authentication connections to be ready...")
select { select {
case <-ctx.Done(): case <-ctx.Done():
case <-s.connectivityGroup.AllAuthedC(ctx, 15*time.Second): case <-s.connectivityGroup.AllAuthedC(ctx, 15*time.Second):
@ -1712,6 +1714,10 @@ func (s *Strategy) CrossRun(
go s.accountUpdater(ctx) go s.accountUpdater(ctx)
go s.hedgeWorker(ctx) go s.hedgeWorker(ctx)
go s.quoteWorker(ctx) go s.quoteWorker(ctx)
if s.RecoverTrade {
go s.tradeRecover(ctx)
}
}() }()
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {