From c224eb7af7137b4f565448af14a6710c41ceacd9 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 18 Oct 2020 00:06:08 +0800 Subject: [PATCH] add kline to the market data store --- pkg/bbgo/environment.go | 31 +++++++++++-------- pkg/bbgo/store.go | 10 ++++-- pkg/strategy/buyandhold/cmd.go | 3 ++ .../buyandhold/{main.go => strategy.go} | 0 pkg/types/exchange.go | 4 +++ 5 files changed, 33 insertions(+), 15 deletions(-) rename pkg/strategy/buyandhold/{main.go => strategy.go} (100%) diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index d90aea380..fe335fae5 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -6,7 +6,7 @@ import ( "time" "github.com/jmoiron/sqlx" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/c9s/bbgo/cmd/cmdutil" @@ -73,6 +73,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) { session.Markets = markets for symbol := range loadedSymbols { + log.Infof("syncing trades from %s for symbol %s...", session.Exchange.Name(), symbol) if err := environ.TradeSync.Sync(ctx, session.Exchange, symbol, startTime); err != nil { return err } @@ -90,7 +91,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) { return err } - logrus.Infof("symbol %s: %d trades loaded", symbol, len(trades)) + log.Infof("symbol %s: %d trades loaded", symbol, len(trades)) session.Trades[symbol] = trades currentPrice, err := session.Exchange.QueryAveragePrice(ctx, symbol) @@ -99,8 +100,11 @@ func (environ *Environment) Init(ctx context.Context) (err error) { } session.LastPrices[symbol] = currentPrice + + session.MarketDataStores[symbol] = NewMarketDataStore(symbol) } + log.Infof("querying balances...") balances, err := session.Exchange.QueryAccountBalances(ctx) if err != nil { return err @@ -108,27 +112,26 @@ func (environ *Environment) Init(ctx context.Context) (err error) { stream := session.Exchange.NewStream() - session.Stream = stream - - session.Account = &Account{balances: balances} - session.Account.BindStream(session.Stream) - - marketDataStore := NewMarketDataStore() - marketDataStore.BindStream(session.Stream) + account := &Account{balances: balances} + account.BindStream(stream) + session.Account = account // update last prices - session.Stream.OnKLineClosed(func(kline types.KLine) { + stream.OnKLineClosed(func(kline types.KLine) { session.LastPrices[kline.Symbol] = kline.Close + session.MarketDataStores[kline.Symbol].AddKLine(kline) }) - session.Stream.OnTrade(func(trade *types.Trade) { + stream.OnTrade(func(trade *types.Trade) { // append trades session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], *trade) if err := environ.TradeService.Insert(*trade); err != nil { - logrus.WithError(err).Errorf("trade insert error: %+v", *trade) + log.WithError(err).Errorf("trade insert error: %+v", *trade) } }) + + session.Stream = stream } return nil @@ -136,8 +139,10 @@ func (environ *Environment) Init(ctx context.Context) (err error) { func (environ *Environment) Connect(ctx context.Context) error { for _, session := range environ.sessions { + log.Infof("connecting session %s...", session.Name) + for _, s := range session.Subscriptions { - logrus.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options) + log.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options) session.Stream.Subscribe(s.Channel, s.Symbol, s.Options) } diff --git a/pkg/bbgo/store.go b/pkg/bbgo/store.go index 3aba663e8..67988954b 100644 --- a/pkg/bbgo/store.go +++ b/pkg/bbgo/store.go @@ -15,14 +15,18 @@ type KLineCallback func(kline types.KLine) //go:generate callbackgen -type MarketDataStore type MarketDataStore struct { + Symbol string + // KLineWindows stores all loaded klines per interval KLineWindows map[Interval]types.KLineWindow `json:"-"` updateCallbacks []KLineCallback } -func NewMarketDataStore() *MarketDataStore { +func NewMarketDataStore(symbol string) *MarketDataStore { return &MarketDataStore{ + Symbol: symbol, + // KLineWindows stores all loaded klines per interval KLineWindows: make(map[Interval]types.KLineWindow), } @@ -33,7 +37,9 @@ func (store *MarketDataStore) BindStream(stream types.Stream) { } func (store *MarketDataStore) handleKLineClosed(kline types.KLine) { - store.AddKLine(kline) + if kline.Symbol == store.Symbol { + store.AddKLine(kline) + } } func (store *MarketDataStore) AddKLine(kline types.KLine) { diff --git a/pkg/strategy/buyandhold/cmd.go b/pkg/strategy/buyandhold/cmd.go index d142f1e9d..286669a94 100644 --- a/pkg/strategy/buyandhold/cmd.go +++ b/pkg/strategy/buyandhold/cmd.go @@ -65,6 +65,9 @@ var Cmd = &cobra.Command{ trader := bbgo.NewTrader(environ) trader.AttachStrategy(string(exchangeName), New(symbol, interval, baseQuantity)) err = trader.Run(ctx) + if err != nil { + return err + } cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) return err diff --git a/pkg/strategy/buyandhold/main.go b/pkg/strategy/buyandhold/strategy.go similarity index 100% rename from pkg/strategy/buyandhold/main.go rename to pkg/strategy/buyandhold/strategy.go diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 85f7088b1..b85e409c6 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -11,6 +11,10 @@ import ( type ExchangeName string +func (n ExchangeName) String() string { + return string(n) +} + const ( ExchangeMax = ExchangeName("max") ExchangeBinance = ExchangeName("binance")