imporve CrossExchange subscription handling

This commit is contained in:
c9s 2020-11-15 13:27:33 +08:00
parent 94aaaf21b0
commit ded970f5a4
2 changed files with 17 additions and 8 deletions

View File

@ -365,14 +365,13 @@ func (environ *Environment) Connect(ctx context.Context) error {
var logger = log.WithField("session", n)
if len(session.Subscriptions) == 0 {
logger.Warnf("no subscriptions, exchange session %s will not be connected", session.Name)
continue
}
// add the subscribe requests to the stream
for _, s := range session.Subscriptions {
logger.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
session.Stream.Subscribe(s.Channel, s.Symbol, s.Options)
logger.Warnf("exchange session %s has no subscriptions", session.Name)
} else {
// add the subscribe requests to the stream
for _, s := range session.Subscriptions {
logger.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
session.Stream.Subscribe(s.Channel, s.Symbol, s.Options)
}
}
logger.Infof("connecting session %s...", session.Name)

View File

@ -23,6 +23,10 @@ type ExchangeSessionSubscriber interface {
Subscribe(session *ExchangeSession)
}
type CrossExchangeSessionSubscriber interface {
Subscribe(sessions map[string]*ExchangeSession)
}
type CrossExchangeStrategy interface {
Run(ctx context.Context, orderExecutionRouter OrderExecutionRouter, sessions map[string]*ExchangeSession) error
}
@ -220,6 +224,12 @@ func (trader *Trader) Run(ctx context.Context) error {
sessions: trader.environment.sessions,
}
for _, strategy := range trader.crossExchangeStrategies {
if subscriber, ok := strategy.(CrossExchangeSessionSubscriber); ok {
subscriber.Subscribe(trader.environment.sessions)
}
}
for _, strategy := range trader.crossExchangeStrategies {
if err := strategy.Run(ctx, router, trader.environment.sessions); err != nil {
return err