From 300609e3db0301b1f87a1c66e5b87fd85fa0db53 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 15 Oct 2020 22:36:22 +0800 Subject: [PATCH] fix subscription initialization --- cmd/cmdutil/exchange.go | 4 ++-- cmd/root.go | 6 ++++++ pkg/bbgo/trader.go | 22 ++++++++++++++------- strategies/buyandhold/main.go | 37 +++++++++++++++++++++++------------ 4 files changed, 48 insertions(+), 21 deletions(-) diff --git a/cmd/cmdutil/exchange.go b/cmd/cmdutil/exchange.go index 926690406..a5e68e86c 100644 --- a/cmd/cmdutil/exchange.go +++ b/cmd/cmdutil/exchange.go @@ -16,7 +16,7 @@ func NewExchange(n types.ExchangeName) (types.Exchange, error) { key := viper.GetString("binance-api-key") secret := viper.GetString("binance-api-secret") if len(key) == 0 || len(secret) == 0 { - return nil, errors.New("empty key or secret") + return nil, errors.New("binance: empty key or secret") } return binance.New(key, secret), nil @@ -25,7 +25,7 @@ func NewExchange(n types.ExchangeName) (types.Exchange, error) { key := viper.GetString("max-api-key") secret := viper.GetString("max-api-secret") if len(key) == 0 || len(secret) == 0 { - return nil, errors.New("empty key or secret") + return nil, errors.New("max: empty key or secret") } return max.New(key, secret), nil diff --git a/cmd/root.go b/cmd/root.go index 71ade459f..937a1fc4d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -36,6 +36,12 @@ func init() { RootCmd.PersistentFlags().String("slack-token", "", "slack token") RootCmd.PersistentFlags().String("slack-trading-channel", "dev-bbgo", "slack trading channel") RootCmd.PersistentFlags().String("slack-error-channel", "bbgo-error", "slack error channel") + + RootCmd.PersistentFlags().String("binance-api-key", "", "binance api key") + RootCmd.PersistentFlags().String("binance-api-secret", "", "binance api secret") + + RootCmd.PersistentFlags().String("max-api-key", "", "max api key") + RootCmd.PersistentFlags().String("max-api-secret", "", "max api secret") } func Run() { diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index b81c7dd74..27f0352ab 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -36,7 +36,7 @@ type ExchangeSession struct { // Stream is the connection stream of the exchange Stream types.Stream - Subscriptions []types.Subscription + Subscriptions map[types.Subscription]types.Subscription Exchange types.Exchange @@ -53,12 +53,12 @@ type ExchangeSession struct { } func (session *ExchangeSession) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) *ExchangeSession { - session.Subscriptions = append(session.Subscriptions, types.Subscription{ + sub := types.Subscription{ Channel: channel, Symbol: symbol, Options: options, - }) - + } + session.Subscriptions[sub] = sub return session } @@ -85,6 +85,7 @@ func (environ *Environment) AddExchange(name string, exchange types.Exchange) (s session = &ExchangeSession{ Name: name, Exchange: exchange, + Subscriptions: make(map[types.Subscription]types.Subscription), Markets: make(map[string]types.Market), Trades: make(map[string][]types.Trade), LastPrices: make(map[string]float64), @@ -144,10 +145,11 @@ func (environ *Environment) Init(ctx context.Context) (err error) { return err } + stream := session.Exchange.NewStream() + + session.Stream = stream + session.Account = &Account{Balances: balances} - - session.Stream = session.Exchange.NewStream() - session.Account.BindStream(session.Stream) marketDataStore := NewMarketDataStore() @@ -173,6 +175,12 @@ func (environ *Environment) Init(ctx context.Context) (err error) { func (environ *Environment) Connect(ctx context.Context) error { for _, session := range environ.sessions { + + for _, s := range session.Subscriptions { + log.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options) + session.Stream.Subscribe(s.Channel, s.Symbol, s.Options) + } + if err := session.Stream.Connect(ctx); err != nil { return err } diff --git a/strategies/buyandhold/main.go b/strategies/buyandhold/main.go index 860f0aea2..78a7a5e71 100644 --- a/strategies/buyandhold/main.go +++ b/strategies/buyandhold/main.go @@ -1,11 +1,10 @@ -package buyandhold +package main import ( "context" - "fmt" + "strings" "syscall" - "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -17,14 +16,23 @@ import ( ) func init() { - rootCmd.Flags().String("exchange", "", "target exchange") - rootCmd.Flags().String("symbol", "", "trading symbol") -} + rootCmd.PersistentFlags().String("binance-api-key", "", "binance api key") + rootCmd.PersistentFlags().String("binance-api-secret", "", "binance api secret") + rootCmd.PersistentFlags().String("max-api-key", "", "max api key") + rootCmd.PersistentFlags().String("max-api-secret", "", "max api secret") -func connectMysql() (*sqlx.DB, error) { - mysqlURL := viper.GetString("mysql-url") - mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) - return sqlx.Connect("mysql", mysqlURL) + rootCmd.Flags().String("exchange", "binance", "target exchange") + rootCmd.Flags().String("symbol", "BTCUSDT", "trading symbol") + rootCmd.Flags().String("interval", "1h", "kline interval for price change detection") + + viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) + + // Enable environment variable binding, the env vars are not overloaded yet. + viper.AutomaticEnv() + + if err := viper.BindPFlags(rootCmd.PersistentFlags()); err != nil { + log.WithError(err).Errorf("failed to bind persistent flags. please check the flag settings.") + } } var rootCmd = &cobra.Command{ @@ -54,6 +62,11 @@ var rootCmd = &cobra.Command{ return err } + interval, err := cmd.Flags().GetString("interval") + if err != nil { + return err + } + exchange, err := cmdutil.NewExchange(exchangeName) if err != nil { return err @@ -66,10 +79,10 @@ var rootCmd = &cobra.Command{ sessionID := "main" environ := bbgo.NewEnvironment(db) - environ.AddExchange(sessionID, exchange).Subscribe(types.KLineChannel, symbol, types.SubscribeOptions{}) + environ.AddExchange(sessionID, exchange) trader := bbgo.NewTrader(environ) - trader.AttachStrategy(sessionID, buyandhold.New(symbol, "1h", 0.1)) + trader.AttachStrategy(sessionID, buyandhold.New(symbol, interval, 0.1)) // trader.AttachCrossExchangeStrategy(...) err = trader.Run(ctx)