fix session initialization issue

This commit is contained in:
c9s 2020-10-19 21:58:50 +08:00
parent b0b1d2bd49
commit 6d6e79eab3
2 changed files with 45 additions and 32 deletions

View File

@ -58,7 +58,6 @@ func (environ *Environment) AddExchange(name string, exchange types.Exchange) (s
}
func (environ *Environment) Init(ctx context.Context) (err error) {
startTime := time.Now().AddDate(0, 0, -7) // sync from 7 days ago
for _, session := range environ.sessions {
loadedSymbols := make(map[string]struct{})
@ -76,6 +75,40 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
// allocate stream before anything
stream := session.Exchange.NewStream()
log.Infof("querying balances...")
balances, err := session.Exchange.QueryAccountBalances(ctx)
if err != nil {
return err
}
account := &types.Account{}
session.Account = account
account.UpdateBalances(balances)
account.BindStream(stream)
session.Stream = stream
}
return nil
}
func (environ *Environment) Connect(ctx context.Context) error {
var err error
var startTime = time.Now().AddDate(0, 0, -7) // sync from 7 days ago
for n := range environ.sessions {
// avoid using the placeholder variable for the session because we use that in the callbacks
var session = environ.sessions[n]
loadedSymbols := make(map[string]struct{})
for _, s := range session.Subscriptions {
symbol := strings.ToUpper(s.Symbol)
loadedSymbols[symbol] = struct{}{}
log.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
session.Stream.Subscribe(s.Channel, s.Symbol, s.Options)
}
for symbol := range loadedSymbols {
log.Infof("syncing trades from %s for symbol %s...", session.Exchange.Name(), symbol)
if err := environ.TradeSync.Sync(ctx, session.Exchange, symbol, startTime); err != nil {
@ -106,30 +139,21 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
session.lastPrices[symbol] = currentPrice
marketDataStore := store.NewMarketDataStore(symbol)
marketDataStore.BindStream(stream)
marketDataStore.BindStream(session.Stream)
session.marketDataStores[symbol] = marketDataStore
}
log.Infof("querying balances...")
balances, err := session.Exchange.QueryAccountBalances(ctx)
if err != nil {
return err
}
account := &types.Account{}
account.UpdateBalances(balances)
account.BindStream(stream)
session.Account = account
log.Infof("loaded symbol: %+v", loadedSymbols)
// update last prices
stream.OnKLineClosed(func(kline types.KLine) {
session.Stream.OnKLineClosed(func(kline types.KLine) {
log.Infof("kline closed: %+v", kline)
session.lastPrices[kline.Symbol] = kline.Close
session.marketDataStores[kline.Symbol].AddKLine(kline)
})
stream.OnTrade(func(trade *types.Trade) {
session.Stream.OnTrade(func(trade *types.Trade) {
// append trades
session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], *trade)
@ -138,21 +162,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
}
})
session.Stream = stream
}
return nil
}
func (environ *Environment) Connect(ctx context.Context) error {
for _, session := range environ.sessions {
log.Infof("connecting session %s...", session.Name)
for _, s := range session.Subscriptions {
log.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
session.Stream.Subscribe(s.Channel, s.Symbol, s.Options)
}
if err := session.Stream.Connect(ctx); err != nil {
return err
}

View File

@ -31,8 +31,12 @@ func NewMarketDataStore(symbol string) *MarketDataStore {
}
}
func (store *MarketDataStore) SetKLineWindows(windows map[types.Interval]types.KLineWindow) {
store.KLineWindows = windows
}
func (store *MarketDataStore) OrderBook() types.OrderBook {
return store.orderBook.Copy()
return store.orderBook.Copy()
}
// KLinesOfInterval returns the kline window of the given interval
@ -76,8 +80,7 @@ func (store *MarketDataStore) handleKLineClosed(kline types.KLine) {
}
func (store *MarketDataStore) AddKLine(kline types.KLine) {
var interval = types.Interval(kline.Interval)
var window = store.KLineWindows[interval]
window := store.KLineWindows[types.Interval(kline.Interval)]
window.Add(kline)
store.LastKLine = kline