diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index eee35ca31..39840cb9e 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -365,14 +365,13 @@ func (environ *Environment) Connect(ctx context.Context) error { var logger = log.WithField("session", n) if len(session.Subscriptions) == 0 { - logger.Warnf("no subscriptions, exchange session %s will not be connected", session.Name) - continue - } - - // add the subscribe requests to the stream - for _, s := range session.Subscriptions { - logger.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options) - session.Stream.Subscribe(s.Channel, s.Symbol, s.Options) + logger.Warnf("exchange session %s has no subscriptions", session.Name) + } else { + // add the subscribe requests to the stream + for _, s := range session.Subscriptions { + logger.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options) + session.Stream.Subscribe(s.Channel, s.Symbol, s.Options) + } } logger.Infof("connecting session %s...", session.Name) diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index 67ce7e7cc..ff7158800 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -23,6 +23,10 @@ type ExchangeSessionSubscriber interface { Subscribe(session *ExchangeSession) } +type CrossExchangeSessionSubscriber interface { + Subscribe(sessions map[string]*ExchangeSession) +} + type CrossExchangeStrategy interface { Run(ctx context.Context, orderExecutionRouter OrderExecutionRouter, sessions map[string]*ExchangeSession) error } @@ -220,6 +224,12 @@ func (trader *Trader) Run(ctx context.Context) error { sessions: trader.environment.sessions, } + for _, strategy := range trader.crossExchangeStrategies { + if subscriber, ok := strategy.(CrossExchangeSessionSubscriber); ok { + subscriber.Subscribe(trader.environment.sessions) + } + } + for _, strategy := range trader.crossExchangeStrategies { if err := strategy.Run(ctx, router, trader.environment.sessions); err != nil { return err