add kline to the market data store

This commit is contained in:
c9s 2020-10-18 00:06:08 +08:00
parent 530da665d3
commit c224eb7af7
5 changed files with 33 additions and 15 deletions

View File

@ -6,7 +6,7 @@ import (
"time"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/c9s/bbgo/cmd/cmdutil"
@ -73,6 +73,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
session.Markets = markets
for symbol := range loadedSymbols {
log.Infof("syncing trades from %s for symbol %s...", session.Exchange.Name(), symbol)
if err := environ.TradeSync.Sync(ctx, session.Exchange, symbol, startTime); err != nil {
return err
}
@ -90,7 +91,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
return err
}
logrus.Infof("symbol %s: %d trades loaded", symbol, len(trades))
log.Infof("symbol %s: %d trades loaded", symbol, len(trades))
session.Trades[symbol] = trades
currentPrice, err := session.Exchange.QueryAveragePrice(ctx, symbol)
@ -99,8 +100,11 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
}
session.LastPrices[symbol] = currentPrice
session.MarketDataStores[symbol] = NewMarketDataStore(symbol)
}
log.Infof("querying balances...")
balances, err := session.Exchange.QueryAccountBalances(ctx)
if err != nil {
return err
@ -108,27 +112,26 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
stream := session.Exchange.NewStream()
session.Stream = stream
session.Account = &Account{balances: balances}
session.Account.BindStream(session.Stream)
marketDataStore := NewMarketDataStore()
marketDataStore.BindStream(session.Stream)
account := &Account{balances: balances}
account.BindStream(stream)
session.Account = account
// update last prices
session.Stream.OnKLineClosed(func(kline types.KLine) {
stream.OnKLineClosed(func(kline types.KLine) {
session.LastPrices[kline.Symbol] = kline.Close
session.MarketDataStores[kline.Symbol].AddKLine(kline)
})
session.Stream.OnTrade(func(trade *types.Trade) {
stream.OnTrade(func(trade *types.Trade) {
// append trades
session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], *trade)
if err := environ.TradeService.Insert(*trade); err != nil {
logrus.WithError(err).Errorf("trade insert error: %+v", *trade)
log.WithError(err).Errorf("trade insert error: %+v", *trade)
}
})
session.Stream = stream
}
return nil
@ -136,8 +139,10 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
func (environ *Environment) Connect(ctx context.Context) error {
for _, session := range environ.sessions {
log.Infof("connecting session %s...", session.Name)
for _, s := range session.Subscriptions {
logrus.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
log.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
session.Stream.Subscribe(s.Channel, s.Symbol, s.Options)
}

View File

@ -15,14 +15,18 @@ type KLineCallback func(kline types.KLine)
//go:generate callbackgen -type MarketDataStore
type MarketDataStore struct {
Symbol string
// KLineWindows stores all loaded klines per interval
KLineWindows map[Interval]types.KLineWindow `json:"-"`
updateCallbacks []KLineCallback
}
func NewMarketDataStore() *MarketDataStore {
func NewMarketDataStore(symbol string) *MarketDataStore {
return &MarketDataStore{
Symbol: symbol,
// KLineWindows stores all loaded klines per interval
KLineWindows: make(map[Interval]types.KLineWindow),
}
@ -33,7 +37,9 @@ func (store *MarketDataStore) BindStream(stream types.Stream) {
}
func (store *MarketDataStore) handleKLineClosed(kline types.KLine) {
store.AddKLine(kline)
if kline.Symbol == store.Symbol {
store.AddKLine(kline)
}
}
func (store *MarketDataStore) AddKLine(kline types.KLine) {

View File

@ -65,6 +65,9 @@ var Cmd = &cobra.Command{
trader := bbgo.NewTrader(environ)
trader.AttachStrategy(string(exchangeName), New(symbol, interval, baseQuantity))
err = trader.Run(ctx)
if err != nil {
return err
}
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return err

View File

@ -11,6 +11,10 @@ import (
type ExchangeName string
func (n ExchangeName) String() string {
return string(n)
}
const (
ExchangeMax = ExchangeName("max")
ExchangeBinance = ExchangeName("binance")