core: move market data subscription to market data stream

This commit is contained in:
c9s 2021-05-27 15:09:04 +08:00
parent 45f1a13870
commit b7c87c7744
2 changed files with 10 additions and 5 deletions

View File

@ -508,11 +508,16 @@ func (environ *Environment) Connect(ctx context.Context) error {
// add the subscribe requests to the stream
for _, s := range session.Subscriptions {
logger.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
session.UserDataStream.Subscribe(s.Channel, s.Symbol, s.Options)
session.MarketDataStream.Subscribe(s.Channel, s.Symbol, s.Options)
}
}
logger.Infof("connecting session %s...", session.Name)
logger.Infof("connecting %s market data stream...", session.Name)
if err := session.MarketDataStream.Connect(ctx); err != nil {
return err
}
logger.Infof("connecting %s user data stream...", session.Name)
if err := session.UserDataStream.Connect(ctx); err != nil {
return err
}

View File

@ -281,12 +281,12 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
})
}
session.UserDataStream.OnKLineClosed(func(kline types.KLine) {
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
log.WithField("marketData", "kline").Infof("kline closed: %+v", kline)
})
// update last prices
session.UserDataStream.OnKLineClosed(func(kline types.KLine) {
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
if _, ok := session.startPrices[kline.Symbol]; !ok {
session.startPrices[kline.Symbol] = kline.Open
}
@ -371,7 +371,7 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
session.orderStores[symbol] = orderStore
marketDataStore := NewMarketDataStore(symbol)
marketDataStore.BindStream(session.UserDataStream)
marketDataStore.BindStream(session.MarketDataStream)
session.marketDataStores[symbol] = marketDataStore
standardIndicatorSet := NewStandardIndicatorSet(symbol, marketDataStore)