bbgo: move orderbook to the session level so that we can access it eaiser

This commit is contained in:
c9s 2021-06-10 19:05:07 +08:00
parent 2614b25de3
commit e23c459697
4 changed files with 96 additions and 122 deletions

View File

@ -210,75 +210,6 @@ func (environ *Environment) AddExchangesByViperKeys() error {
return nil return nil
} }
func InitExchangeSession(name string, session *ExchangeSession) error {
var err error
var exchangeName = session.ExchangeName
var exchange types.Exchange
if session.Key != "" && session.Secret != "" {
if !session.PublicOnly {
if len(session.Key) == 0 || len(session.Secret) == 0 {
return fmt.Errorf("can not create exchange %s: empty key or secret", exchangeName)
}
}
exchange, err = cmdutil.NewExchangeStandard(exchangeName, session.Key, session.Secret, "", session.SubAccount)
} else {
exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, session.EnvVarPrefix)
}
if err != nil {
return err
}
// configure exchange
if session.Margin {
marginExchange, ok := exchange.(types.MarginExchange)
if !ok {
return fmt.Errorf("exchange %s does not support margin", exchangeName)
}
if session.IsolatedMargin {
marginExchange.UseIsolatedMargin(session.IsolatedMarginSymbol)
} else {
marginExchange.UseMargin()
}
}
session.Name = name
session.Notifiability = Notifiability{
SymbolChannelRouter: NewPatternChannelRouter(nil),
SessionChannelRouter: NewPatternChannelRouter(nil),
ObjectChannelRouter: NewObjectChannelRouter(),
}
session.Exchange = exchange
session.UserDataStream = exchange.NewStream()
session.MarketDataStream = exchange.NewStream()
session.MarketDataStream.SetPublicOnly()
// pointer fields
session.Subscriptions = make(map[types.Subscription]types.Subscription)
session.Account = &types.Account{}
session.Trades = make(map[string]*types.TradeSlice)
session.markets = make(map[string]types.Market)
session.lastPrices = make(map[string]float64)
session.startPrices = make(map[string]float64)
session.marketDataStores = make(map[string]*MarketDataStore)
session.positions = make(map[string]*Position)
session.standardIndicatorSets = make(map[string]*StandardIndicatorSet)
session.orderStores = make(map[string]*OrderStore)
session.OrderExecutor = &ExchangeOrderExecutor{
// copy the notification system so that we can route
Notifiability: session.Notifiability,
Session: session,
}
session.usedSymbols = make(map[string]struct{})
session.initializedSymbols = make(map[string]struct{})
session.logger = log.WithField("session", name)
return nil
}
func (environ *Environment) AddExchangesFromSessionConfig(sessions map[string]*ExchangeSession) error { func (environ *Environment) AddExchangesFromSessionConfig(sessions map[string]*ExchangeSession) error {
for sessionName, session := range sessions { for sessionName, session := range sessions {
if err := InitExchangeSession(sessionName, session); err != nil { if err := InitExchangeSession(sessionName, session); err != nil {

View File

@ -11,18 +11,12 @@ type MarketDataStore struct {
KLineWindows map[types.Interval]types.KLineWindow `json:"-"` KLineWindows map[types.Interval]types.KLineWindow `json:"-"`
kLineWindowUpdateCallbacks []func(interval types.Interval, kline types.KLineWindow) kLineWindowUpdateCallbacks []func(interval types.Interval, kline types.KLineWindow)
orderBook *types.StreamOrderBook
orderBookUpdateCallbacks []func(orderBook *types.StreamOrderBook)
} }
func NewMarketDataStore(symbol string) *MarketDataStore { func NewMarketDataStore(symbol string) *MarketDataStore {
return &MarketDataStore{ return &MarketDataStore{
Symbol: symbol, Symbol: symbol,
orderBook: types.NewStreamBook(symbol),
// KLineWindows stores all loaded klines per interval // KLineWindows stores all loaded klines per interval
KLineWindows: make(map[types.Interval]types.KLineWindow, len(types.SupportedIntervals)), // 12 interval, 1m,5m,15m,30m,1h,2h,4h,6h,12h,1d,3d,1w KLineWindows: make(map[types.Interval]types.KLineWindow, len(types.SupportedIntervals)), // 12 interval, 1m,5m,15m,30m,1h,2h,4h,6h,12h,1d,3d,1w
} }
@ -32,40 +26,14 @@ func (store *MarketDataStore) SetKLineWindows(windows map[types.Interval]types.K
store.KLineWindows = windows store.KLineWindows = windows
} }
func (store *MarketDataStore) OrderBook() types.OrderBook {
return store.orderBook.Copy()
}
// KLinesOfInterval returns the kline window of the given interval // KLinesOfInterval returns the kline window of the given interval
func (store *MarketDataStore) KLinesOfInterval(interval types.Interval) (kLines types.KLineWindow, ok bool) { func (store *MarketDataStore) KLinesOfInterval(interval types.Interval) (kLines types.KLineWindow, ok bool) {
kLines, ok = store.KLineWindows[interval] kLines, ok = store.KLineWindows[interval]
return kLines, ok return kLines, ok
} }
func (store *MarketDataStore) handleOrderBookUpdate(book types.SliceOrderBook) {
if book.Symbol != store.Symbol {
return
}
store.orderBook.Update(book)
store.EmitOrderBookUpdate(store.orderBook)
}
func (store *MarketDataStore) handleOrderBookSnapshot(book types.SliceOrderBook) {
if book.Symbol != store.Symbol {
return
}
store.orderBook.Load(book)
}
func (store *MarketDataStore) BindStream(stream types.Stream) { func (store *MarketDataStore) BindStream(stream types.Stream) {
stream.OnKLineClosed(store.handleKLineClosed) stream.OnKLineClosed(store.handleKLineClosed)
stream.OnBookSnapshot(store.handleOrderBookSnapshot)
stream.OnBookUpdate(store.handleOrderBookUpdate)
store.orderBook.BindStream(stream)
} }
func (store *MarketDataStore) handleKLineClosed(kline types.KLine) { func (store *MarketDataStore) handleKLineClosed(kline types.KLine) {

View File

@ -15,13 +15,3 @@ func (store *MarketDataStore) EmitKLineWindowUpdate(interval types.Interval, kli
cb(interval, kline) cb(interval, kline)
} }
} }
func (store *MarketDataStore) OnOrderBookUpdate(cb func(orderBook *types.StreamOrderBook)) {
store.orderBookUpdateCallbacks = append(store.orderBookUpdateCallbacks, cb)
}
func (store *MarketDataStore) EmitOrderBookUpdate(orderBook *types.StreamOrderBook) {
for _, cb := range store.orderBookUpdateCallbacks {
cb(orderBook)
}
}

View File

@ -6,6 +6,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
@ -163,6 +164,9 @@ type ExchangeSession struct {
// markets defines market configuration of a symbol // markets defines market configuration of a symbol
markets map[string]types.Market markets map[string]types.Market
// orderBooks stores the streaming order book
orderBooks map[string]*types.StreamOrderBook
// startPrices is used for backtest // startPrices is used for backtest
startPrices map[string]float64 startPrices map[string]float64
@ -205,6 +209,7 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
Account: &types.Account{}, Account: &types.Account{},
Trades: make(map[string]*types.TradeSlice), Trades: make(map[string]*types.TradeSlice),
orderBooks: make(map[string]*types.StreamOrderBook),
markets: make(map[string]types.Market), markets: make(map[string]types.Market),
startPrices: make(map[string]float64), startPrices: make(map[string]float64),
lastPrices: make(map[string]float64), lastPrices: make(map[string]float64),
@ -378,27 +383,31 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
session.standardIndicatorSets[symbol] = standardIndicatorSet session.standardIndicatorSets[symbol] = standardIndicatorSet
// used kline intervals by the given symbol // used kline intervals by the given symbol
var usedKLineIntervals = map[types.Interval]struct{}{} var klineSubscriptions = map[types.Interval]struct{}{}
// always subscribe the 1m kline so we can make sure the connection persists. // always subscribe the 1m kline so we can make sure the connection persists.
usedKLineIntervals[types.Interval1m] = struct{}{} klineSubscriptions[types.Interval1m] = struct{}{}
for _, sub := range session.Subscriptions { for _, sub := range session.Subscriptions {
if sub.Channel != types.KLineChannel { switch sub.Channel {
continue case types.BookChannel:
} book := types.NewStreamBook(sub.Symbol)
book.BindStream(session.MarketDataStream)
session.orderBooks[sub.Symbol] = book
if sub.Options.Interval == "" { case types.KLineChannel:
continue if sub.Options.Interval == "" {
} continue
}
if sub.Symbol == symbol { if sub.Symbol == symbol {
usedKLineIntervals[types.Interval(sub.Options.Interval)] = struct{}{} klineSubscriptions[types.Interval(sub.Options.Interval)] = struct{}{}
}
} }
} }
var lastPriceTime time.Time var lastPriceTime time.Time
for interval := range usedKLineIntervals { for interval := range klineSubscriptions {
// avoid querying the last unclosed kline // avoid querying the last unclosed kline
endTime := environ.startTime.Add(-interval.Duration()) endTime := environ.startTime.Add(-interval.Duration())
kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
@ -472,6 +481,12 @@ func (session *ExchangeSession) MarketDataStore(symbol string) (s *MarketDataSto
return s, ok return s, ok
} }
// MarketDataStore returns the market data store of a symbol
func (session *ExchangeSession) OrderBook(symbol string) (s *types.StreamOrderBook, ok bool) {
s, ok = session.orderBooks[symbol]
return s, ok
}
func (session *ExchangeSession) StartPrice(symbol string) (price float64, ok bool) { func (session *ExchangeSession) StartPrice(symbol string) (price float64, ok bool) {
price, ok = session.startPrices[symbol] price, ok = session.startPrices[symbol]
return price, ok return price, ok
@ -616,3 +631,73 @@ func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err err
return symbols, nil return symbols, nil
} }
func InitExchangeSession(name string, session *ExchangeSession) error {
var err error
var exchangeName = session.ExchangeName
var exchange types.Exchange
if session.Key != "" && session.Secret != "" {
if !session.PublicOnly {
if len(session.Key) == 0 || len(session.Secret) == 0 {
return fmt.Errorf("can not create exchange %s: empty key or secret", exchangeName)
}
}
exchange, err = cmdutil.NewExchangeStandard(exchangeName, session.Key, session.Secret, "", session.SubAccount)
} else {
exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, session.EnvVarPrefix)
}
if err != nil {
return err
}
// configure exchange
if session.Margin {
marginExchange, ok := exchange.(types.MarginExchange)
if !ok {
return fmt.Errorf("exchange %s does not support margin", exchangeName)
}
if session.IsolatedMargin {
marginExchange.UseIsolatedMargin(session.IsolatedMarginSymbol)
} else {
marginExchange.UseMargin()
}
}
session.Name = name
session.Notifiability = Notifiability{
SymbolChannelRouter: NewPatternChannelRouter(nil),
SessionChannelRouter: NewPatternChannelRouter(nil),
ObjectChannelRouter: NewObjectChannelRouter(),
}
session.Exchange = exchange
session.UserDataStream = exchange.NewStream()
session.MarketDataStream = exchange.NewStream()
session.MarketDataStream.SetPublicOnly()
// pointer fields
session.Subscriptions = make(map[types.Subscription]types.Subscription)
session.Account = &types.Account{}
session.Trades = make(map[string]*types.TradeSlice)
session.orderBooks = make(map[string]*types.StreamOrderBook)
session.markets = make(map[string]types.Market)
session.lastPrices = make(map[string]float64)
session.startPrices = make(map[string]float64)
session.marketDataStores = make(map[string]*MarketDataStore)
session.positions = make(map[string]*Position)
session.standardIndicatorSets = make(map[string]*StandardIndicatorSet)
session.orderStores = make(map[string]*OrderStore)
session.OrderExecutor = &ExchangeOrderExecutor{
// copy the notification system so that we can route
Notifiability: session.Notifiability,
Session: session,
}
session.usedSymbols = make(map[string]struct{})
session.initializedSymbols = make(map[string]struct{})
session.logger = log.WithField("session", name)
return nil
}