From 6398f049d03142155b49fbf021b1cfca108b28ae Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 12 Oct 2020 22:46:06 +0800 Subject: [PATCH] bind market data store and query avg price before we start --- cmd/buyandhold/main.go | 84 +++++++++ cmd/cmdutil/db.go | 14 ++ cmd/cmdutil/exchange.go | 36 ++++ cmd/pnl.go | 37 +--- cmd/transfers.go | 3 +- pkg/bbgo/account.go | 24 ++- pkg/bbgo/kline_regression.go | 2 +- pkg/bbgo/order_processor.go | 6 +- pkg/bbgo/store.go | 2 +- pkg/bbgo/strategy_test.go | 6 +- pkg/bbgo/trader.go | 315 ++++++++++++++------------------ pkg/strategy/buyandhold/main.go | 30 +++ 12 files changed, 329 insertions(+), 230 deletions(-) create mode 100644 cmd/buyandhold/main.go create mode 100644 cmd/cmdutil/db.go create mode 100644 cmd/cmdutil/exchange.go create mode 100644 pkg/strategy/buyandhold/main.go diff --git a/cmd/buyandhold/main.go b/cmd/buyandhold/main.go new file mode 100644 index 000000000..07f9ace1b --- /dev/null +++ b/cmd/buyandhold/main.go @@ -0,0 +1,84 @@ +package buyandhold + +import ( + "context" + "fmt" + "syscall" + + "github.com/jmoiron/sqlx" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/c9s/bbgo/cmd/cmdutil" + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/strategy/buyandhold" + "github.com/c9s/bbgo/pkg/types" +) + +func init() { + rootCmd.Flags().String("exchange", "", "target exchange") + rootCmd.Flags().String("symbol", "", "trading symbol") +} + +func connectMysql() (*sqlx.DB, error) { + mysqlURL := viper.GetString("mysql-url") + mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) + return sqlx.Connect("mysql", mysqlURL) +} + +var rootCmd = &cobra.Command{ + Use: "buyandhold", + Short: "buy and hold", + Long: "hold trader", + + // SilenceUsage is an option to silence usage when an error occurs. + SilenceUsage: true, + + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exchangeNameStr, err := cmd.Flags().GetString("exchange") + if err != nil { + return err + } + + exchangeName, err := types.ValidExchangeName(exchangeNameStr) + if err != nil { + return err + } + + symbol, err := cmd.Flags().GetString("symbol") + if err != nil { + return err + } + + exchange, err := cmdutil.NewExchange(exchangeName) + if err != nil { + return err + } + + db, err := cmdutil.ConnectMySQL() + if err != nil { + return err + } + + sessionID := "main" + environ := bbgo.NewEnvironment(db) + environ.AddExchange(sessionID, exchange).Subscribe(types.KLineChannel, symbol, types.SubscribeOptions{}) + + trader := bbgo.NewTrader(environ) + trader.AttachStrategy(sessionID, buyandhold.New(symbol)) + trader.Run(ctx) + + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + return nil + }, +} + +func main() { + if err := rootCmd.Execute(); err != nil { + log.WithError(err).Fatalf("cannot execute command") + } +} diff --git a/cmd/cmdutil/db.go b/cmd/cmdutil/db.go new file mode 100644 index 000000000..f50c3f7f2 --- /dev/null +++ b/cmd/cmdutil/db.go @@ -0,0 +1,14 @@ +package cmdutil + +import ( + "fmt" + + "github.com/jmoiron/sqlx" + "github.com/spf13/viper" +) + +func ConnectMySQL() (*sqlx.DB, error) { + mysqlURL := viper.GetString("mysql-url") + mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) + return sqlx.Connect("mysql", mysqlURL) +} diff --git a/cmd/cmdutil/exchange.go b/cmd/cmdutil/exchange.go new file mode 100644 index 000000000..926690406 --- /dev/null +++ b/cmd/cmdutil/exchange.go @@ -0,0 +1,36 @@ +package cmdutil + +import ( + "github.com/pkg/errors" + "github.com/spf13/viper" + + "github.com/c9s/bbgo/pkg/exchange/binance" + "github.com/c9s/bbgo/pkg/exchange/max" + "github.com/c9s/bbgo/pkg/types" +) + +func NewExchange(n types.ExchangeName) (types.Exchange, error) { + switch n { + + case types.ExchangeBinance: + key := viper.GetString("binance-api-key") + secret := viper.GetString("binance-api-secret") + if len(key) == 0 || len(secret) == 0 { + return nil, errors.New("empty key or secret") + } + + return binance.New(key, secret), nil + + case types.ExchangeMax: + key := viper.GetString("max-api-key") + secret := viper.GetString("max-api-secret") + if len(key) == 0 || len(secret) == 0 { + return nil, errors.New("empty key or secret") + } + + return max.New(key, secret), nil + + } + + return nil, nil +} diff --git a/cmd/pnl.go b/cmd/pnl.go index ae7e120fd..e2759f4e7 100644 --- a/cmd/pnl.go +++ b/cmd/pnl.go @@ -2,19 +2,15 @@ package cmd import ( "context" - "fmt" "strings" "time" - "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "github.com/spf13/viper" + "github.com/c9s/bbgo/cmd/cmdutil" "github.com/c9s/bbgo/pkg/accounting" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/exchange/binance" - "github.com/c9s/bbgo/pkg/exchange/max" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" ) @@ -26,30 +22,6 @@ func init() { RootCmd.AddCommand(pnlCmd) } -func connectMysql() (*sqlx.DB, error) { - mysqlURL := viper.GetString("mysql-url") - mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) - return sqlx.Connect("mysql", mysqlURL) -} - -func newExchange(n types.ExchangeName) types.Exchange { - switch n { - - case types.ExchangeBinance: - key := viper.GetString("binance-api-key") - secret := viper.GetString("binance-api-secret") - return binance.New(key, secret) - - case types.ExchangeMax: - key := viper.GetString("max-api-key") - secret := viper.GetString("max-api-secret") - return max.New(key, secret) - - } - - return nil -} - var pnlCmd = &cobra.Command{ Use: "pnl", Short: "pnl calculator", @@ -72,9 +44,12 @@ var pnlCmd = &cobra.Command{ return err } - exchange := newExchange(exchangeName) + exchange, err := cmdutil.NewExchange(exchangeName) + if err != nil { + return err + } - db, err := connectMysql() + db, err := cmdutil.ConnectMySQL() if err != nil { return err } diff --git a/cmd/transfers.go b/cmd/transfers.go index 088b0daa5..8c13723d4 100644 --- a/cmd/transfers.go +++ b/cmd/transfers.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/c9s/bbgo/cmd/cmdutil" "github.com/c9s/bbgo/pkg/types" ) @@ -83,7 +84,7 @@ var transferHistoryCmd = &cobra.Command{ } } - exchange := newExchange(exchangeName) + exchange, _ := cmdutil.NewExchange(exchangeName) var records timeSlice diff --git a/pkg/bbgo/account.go b/pkg/bbgo/account.go index c19ef1459..5e0464adf 100644 --- a/pkg/bbgo/account.go +++ b/pkg/bbgo/account.go @@ -11,11 +11,12 @@ import ( ) type Account struct { - mu sync.Mutex + sync.Mutex Balances map[string]types.Balance } +// TODO: rewrite this as NewAccount(map balances) func LoadAccount(ctx context.Context, exchange types.Exchange) (*Account, error) { balances, err := exchange.QueryAccountBalances(ctx) return &Account{ @@ -23,19 +24,24 @@ func LoadAccount(ctx context.Context, exchange types.Exchange) (*Account, error) }, err } -func (a *Account) BindPrivateStream(stream types.Stream) { - stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) { - a.mu.Lock() - defer a.mu.Unlock() +func (a *Account) handleBalanceUpdates(balances map[string]types.Balance) { + a.Lock() + defer a.Unlock() - for _, balance := range snapshot { - a.Balances[balance.Currency] = balance - } - }) + for _, balance := range balances { + a.Balances[balance.Currency] = balance + } +} +func (a *Account) BindStream(stream types.Stream) { + stream.OnBalanceUpdate(a.handleBalanceUpdates) + stream.OnBalanceSnapshot(a.handleBalanceUpdates) } func (a *Account) Print() { + a.Lock() + defer a.Unlock() + for _, balance := range a.Balances { if util.NotZero(balance.Available) { log.Infof("[trader] balance %s %f", balance.Currency, balance.Available) diff --git a/pkg/bbgo/kline_regression.go b/pkg/bbgo/kline_regression.go index 5dc556a6e..9b5960885 100644 --- a/pkg/bbgo/kline_regression.go +++ b/pkg/bbgo/kline_regression.go @@ -37,7 +37,7 @@ func (trader *BackTestTrader) SubmitOrder(cxt context.Context, order *types.Subm trader.pendingOrders = append(trader.pendingOrders, order) } -func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy MarketStrategy) (chan struct{}, error) { +func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy SingleExchangeStrategy) (chan struct{}, error) { logrus.Infof("[regression] number of kline data: %d", len(trader.SourceKLines)) done := make(chan struct{}) diff --git a/pkg/bbgo/order_processor.go b/pkg/bbgo/order_processor.go index e44b34ac6..0cdd906ad 100644 --- a/pkg/bbgo/order_processor.go +++ b/pkg/bbgo/order_processor.go @@ -2,13 +2,10 @@ package bbgo import ( "context" - "fmt" - "math" "github.com/pkg/errors" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" ) var ( @@ -41,6 +38,7 @@ type OrderProcessor struct { } func (p *OrderProcessor) Submit(ctx context.Context, order *types.SubmitOrder) error { + /* tradingCtx := p.Trader.Context currentPrice := tradingCtx.CurrentPrice market := order.Market @@ -126,6 +124,8 @@ func (p *OrderProcessor) Submit(ctx context.Context, order *types.SubmitOrder) e order.Quantity = quantity order.QuantityString = market.FormatVolume(quantity) + */ + return p.Exchange.SubmitOrder(ctx, order) } diff --git a/pkg/bbgo/store.go b/pkg/bbgo/store.go index 9f79c8ca5..3aba663e8 100644 --- a/pkg/bbgo/store.go +++ b/pkg/bbgo/store.go @@ -28,7 +28,7 @@ func NewMarketDataStore() *MarketDataStore { } } -func (store *MarketDataStore) BindPrivateStream(stream types.Stream) { +func (store *MarketDataStore) BindStream(stream types.Stream) { stream.OnKLineClosed(store.handleKLineClosed) } diff --git a/pkg/bbgo/strategy_test.go b/pkg/bbgo/strategy_test.go index c94977043..113b5a85e 100644 --- a/pkg/bbgo/strategy_test.go +++ b/pkg/bbgo/strategy_test.go @@ -5,7 +5,7 @@ import ( "fmt" "os" "testing" - time "time" + "time" "github.com/DATA-DOG/go-sqlmock" "github.com/jmoiron/sqlx" @@ -14,7 +14,6 @@ import ( "github.com/c9s/bbgo/pkg/exchange/binance" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" - ) func TestTradeService(t *testing.T) { @@ -59,10 +58,11 @@ func TestEnvironment_Connect(t *testing.T) { environment := NewEnvironment(xdb) environment.AddExchange("binance", exchange). - Subscribe(types.KLineChannel,"BTCUSDT", types.SubscribeOptions{ }) + Subscribe(types.KLineChannel,"BTCUSDT", types.SubscribeOptions{}) err = environment.Connect(ctx) assert.NoError(t, err) time.Sleep(5 * time.Second) } + diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index 6a606f303..342cb943d 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -2,31 +2,33 @@ package bbgo import ( "context" - "fmt" "strings" "time" - "github.com/fsnotify/fsnotify" "github.com/jmoiron/sqlx" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/accounting" - "github.com/c9s/bbgo/pkg/bbgo/config" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" _ "github.com/go-sql-driver/mysql" ) -// MarketStrategy represents the single Exchange strategy -type MarketStrategy interface { - OnLoad(tradingContext *Context, trader types.Trader) error - OnNewStream(stream types.Stream) error +// SingleExchangeStrategy represents the single Exchange strategy +type SingleExchangeStrategy interface { + Run(trader types.Trader, session *ExchangeSession) error } +type CrossExchangeStrategy interface { + Run(trader types.Trader, sessions map[string]*ExchangeSession) error +} + +// ExchangeSession presents the exchange connection session +// It also maintains and collects the data returned from the stream. type ExchangeSession struct { - // Session name + // Exchange session name Name string // The exchange account states @@ -39,14 +41,16 @@ type ExchangeSession struct { Exchange types.Exchange - loadedSymbols map[string]struct{} - // Markets defines market configuration of a symbol Markets map[string]types.Market + LastPrices map[string]float64 + // Trades collects the executed trades from the exchange // map: symbol -> []trade Trades map[string][]types.Trade + + MarketDataStore *MarketDataStore } func (session *ExchangeSession) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) *ExchangeSession { @@ -64,7 +68,7 @@ type Environment struct { TradeService *service.TradeService TradeSync *service.TradeSync - ExchangeSessions map[string]*ExchangeSession + sessions map[string]*ExchangeSession } func NewEnvironment(db *sqlx.DB) *Environment { @@ -74,26 +78,27 @@ func NewEnvironment(db *sqlx.DB) *Environment { TradeSync: &service.TradeSync{ Service: tradeService, }, - ExchangeSessions: make(map[string]*ExchangeSession), + sessions: make(map[string]*ExchangeSession), } } func (environ *Environment) AddExchange(name string, exchange types.Exchange) (session *ExchangeSession) { session = &ExchangeSession{ - Name: name, - Exchange: exchange, - Markets: make(map[string]types.Market), - Trades: make(map[string][]types.Trade), + Name: name, + Exchange: exchange, + Markets: make(map[string]types.Market), + Trades: make(map[string][]types.Trade), + LastPrices: make(map[string]float64), } - environ.ExchangeSessions[name] = session + environ.sessions[name] = session return session } -func (environ *Environment) Connect(ctx context.Context) (err error) { +func (environ *Environment) Init(ctx context.Context) (err error) { startTime := time.Now().AddDate(0, 0, -7) // sync from 7 days ago - for _, session := range environ.ExchangeSessions { + for _, session := range environ.sessions { loadedSymbols := make(map[string]struct{}) for _, sub := range session.Subscriptions { loadedSymbols[sub.Symbol] = struct{}{} @@ -119,6 +124,13 @@ func (environ *Environment) Connect(ctx context.Context) (err error) { log.Infof("symbol %s: %d trades loaded", symbol, len(trades)) session.Trades[symbol] = trades + + currentPrice, err := session.Exchange.QueryAveragePrice(ctx, symbol) + if err != nil { + return err + } + + session.LastPrices[symbol] = currentPrice } session.Account, err = LoadAccount(ctx, session.Exchange) @@ -127,6 +139,18 @@ func (environ *Environment) Connect(ctx context.Context) (err error) { } session.Stream = session.Exchange.NewStream() + + session.Account.BindStream(session.Stream) + + marketDataStore := NewMarketDataStore() + marketDataStore.BindStream(session.Stream) + + + // update last prices + session.Stream.OnKLineClosed(func(kline types.KLine) { + session.LastPrices[kline.Symbol] = kline.Close + }) + session.Stream.OnTrade(func(trade *types.Trade) { // append trades session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], *trade) @@ -135,197 +159,130 @@ func (environ *Environment) Connect(ctx context.Context) (err error) { log.WithError(err).Errorf("trade insert error: %+v", *trade) } }) + } + return nil +} + +func (environ *Environment) Connect(ctx context.Context) error { + for _, session := range environ.sessions { if err := session.Stream.Connect(ctx); err != nil { return err } } - return err + return nil } type Trader struct { - Symbol string - TradeService *service.TradeService - TradeSync *service.TradeSync - - // Context is trading Context - Context *Context - - Exchange types.Exchange - - reportTimer *time.Timer - + reportTimer *time.Timer ProfitAndLossCalculator *accounting.ProfitAndLossCalculator - Account *Account + notifiers []Notifier + environment *Environment - Notifiers []Notifier - - ExchangeSessions map[string]*ExchangeSession + crossExchangeStrategies []CrossExchangeStrategy + exchangeStrategies map[string][]SingleExchangeStrategy } -func NewTrader(db *sqlx.DB, exchange types.Exchange, symbol string) *Trader { - tradeService := &service.TradeService{DB: db} +func NewTrader(environ *Environment) *Trader { return &Trader{ - Symbol: symbol, - Exchange: exchange, - TradeService: tradeService, - TradeSync: &service.TradeSync{ - Service: tradeService, - }, + environment: environ, + exchangeStrategies: make(map[string][]SingleExchangeStrategy), } } func (trader *Trader) AddNotifier(notifier Notifier) { - trader.Notifiers = append(trader.Notifiers, notifier) + trader.notifiers = append(trader.notifiers, notifier) } -func (trader *Trader) Connect(ctx context.Context) (err error) { - log.Info("syncing trades from exchange...") - startTime := time.Now().AddDate(0, 0, -7) // sync from 7 days ago +// AttachStrategy attaches the single exchange strategy on an exchange session. +// Single exchange strategy is the default behavior. +func (trader *Trader) AttachStrategy(session string, strategy SingleExchangeStrategy) error { + if _, ok := trader.environment.sessions[session]; !ok { + return errors.New("session not defined") + } - for _, session := range trader.ExchangeSessions { - for symbol := range session.loadedSymbols { - market, ok := types.FindMarket(symbol) - if !ok { - return errors.Errorf("market %s is not defined", symbol) - } + trader.exchangeStrategies[session] = append(trader.exchangeStrategies[session], strategy) + return nil +} - session.Markets[symbol] = market +// AttachCrossExchangeStrategy attaches the cross exchange strategy +func (trader *Trader) AttachCrossExchangeStrategy(strategy CrossExchangeStrategy) error { + trader.crossExchangeStrategies = append(trader.crossExchangeStrategies, strategy) + return nil +} +func (trader *Trader) Run(ctx context.Context) error { + if err := trader.environment.Init(ctx); err != nil { + return err + } - if err := trader.TradeSync.Sync(ctx, session.Exchange, symbol, startTime); err != nil { - return err - } - - var trades []types.Trade - - tradingFeeCurrency := session.Exchange.PlatformFeeCurrency() - if strings.HasPrefix(symbol, tradingFeeCurrency) { - trades, err = trader.TradeService.QueryForTradingFeeCurrency(symbol, tradingFeeCurrency) - } else { - trades, err = trader.TradeService.Query(symbol) - } - + // load and run session strategies + for session, strategies := range trader.exchangeStrategies { + for _, strategy := range strategies { + err := strategy.Run(trader, trader.environment.sessions[session]) if err != nil { return err } - - log.Infof("symbol %s: %d trades loaded", symbol, len(trades)) - session.Trades[symbol] = trades - - stockManager := &StockDistribution{ - Symbol: symbol, - TradingFeeCurrency: tradingFeeCurrency, - } - - checkpoints, err := stockManager.AddTrades(trades) - if err != nil { - return err - } - - log.Infof("symbol %s: found stock checkpoints: %+v", symbol, checkpoints) } + } - session.Account, err = LoadAccount(ctx, session.Exchange) - if err != nil { - return err - } - - session.Stream = session.Exchange.NewStream() - if err != nil { - return err - } - - if err := session.Stream.Connect(ctx); err != nil { + for _, strategy := range trader.crossExchangeStrategies { + if err := strategy.Run(trader, trader.environment.sessions) ; err != nil { return err } } - return nil + + return trader.environment.Connect(ctx) + /* + stockManager := &StockDistribution{ + Symbol: symbol, + TradingFeeCurrency: tradingFeeCurrency, + } + + checkpoints, err := stockManager.AddTrades(trades) + if err != nil { + return err + } + + log.Infof("symbol %s: found stock checkpoints: %+v", symbol, checkpoints) + */ } func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error { - // query all trades from database so that we can get the correct pnl - var err error - var trades []types.Trade - tradingFeeCurrency := trader.Exchange.PlatformFeeCurrency() - if strings.HasPrefix(trader.Symbol, tradingFeeCurrency) { - trades, err = trader.TradeService.QueryForTradingFeeCurrency(trader.Symbol, tradingFeeCurrency) - } else { - trades, err = trader.TradeService.Query(trader.Symbol) - } - - if err != nil { - return err - } - - log.Infof("%d trades loaded", len(trades)) - - stockManager := &StockDistribution{ - Symbol: trader.Symbol, - TradingFeeCurrency: tradingFeeCurrency, - } - - checkpoints, err := stockManager.AddTrades(trades) - if err != nil { - return err - } - - log.Infof("found checkpoints: %+v", checkpoints) - - market, ok := types.FindMarket(trader.Symbol) - if !ok { - return fmt.Errorf("%s market not found", trader.Symbol) - } - - currentPrice, err := trader.Exchange.QueryAveragePrice(ctx, trader.Symbol) - if err != nil { - return err - } - - trader.Context = &Context{ - CurrentPrice: currentPrice, - Symbol: trader.Symbol, - Market: market, - StockManager: stockManager, - } - /* - if len(checkpoints) > 0 { - // get the last checkpoint - idx := checkpoints[len(checkpoints)-1] - if idx < len(trades)-1 { - trades = trades[idx:] - firstTrade := trades[0] - pnlStartTime = firstTrade.Time - notifier.Notify("%s Found the latest trade checkpoint %s", firstTrade.Symbol, firstTrade.Time, firstTrade) - } + currentPrice, err := trader.Exchange.QueryAveragePrice(ctx, trader.Symbol) + if err != nil { + return err + } + + trader.Context = &Context{ + CurrentPrice: currentPrice, + Symbol: trader.Symbol, + Market: market, + StockManager: stockManager, } */ - trader.ProfitAndLossCalculator = &accounting.ProfitAndLossCalculator{ - TradingFeeCurrency: tradingFeeCurrency, - Symbol: trader.Symbol, - StartTime: startTime, - CurrentPrice: currentPrice, - Trades: trades, - } - - account, err := LoadAccount(ctx, trader.Exchange) - if err != nil { - return err - } - - trader.Account = account - trader.Context.Balances = account.Balances - account.Print() + /* + trader.ProfitAndLossCalculator = &accounting.ProfitAndLossCalculator{ + TradingFeeCurrency: tradingFeeCurrency, + Symbol: trader.Symbol, + StartTime: startTime, + CurrentPrice: currentPrice, + Trades: trades, + } + */ + // trader.Context.Balances = account.Balances + // account.Print() return nil } -func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy MarketStrategy, configFile string) (chan struct{}, error) { +/* +func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy SingleExchangeStrategy, configFile string) (chan struct{}, error) { var done = make(chan struct{}) var configWatcherDone = make(chan struct{}) @@ -400,8 +357,10 @@ func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy Mar return done, nil } +*/ -func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) (chan struct{}, error) { +/* +func (trader *Trader) RunStrategy(ctx context.Context, strategy SingleExchangeStrategy) (chan struct{}, error) { if err := strategy.OnLoad(trader.Context, trader); err != nil { return nil, err } @@ -410,9 +369,9 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) // bind kline store to the stream klineStore := NewMarketDataStore() - klineStore.BindPrivateStream(stream) + klineStore.BindStream(stream) - trader.Account.BindPrivateStream(stream) + trader.Account.BindStream(stream) if err := strategy.OnNewStream(stream); err != nil { return nil, err @@ -423,14 +382,6 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) }) stream.OnTrade(func(trade *types.Trade) { - if trade.Symbol != trader.Symbol { - return - } - - if err := trader.TradeService.Insert(*trade); err != nil { - log.WithError(err).Error("trade insert error") - } - trader.NotifyTrade(trade) trader.ProfitAndLossCalculator.AddTrade(*trade) _, err := trader.Context.StockManager.AddTrades([]types.Trade{*trade}) @@ -472,6 +423,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) return done, nil } +*/ func (trader *Trader) reportPnL() { report := trader.ProfitAndLossCalculator.Calculate() @@ -480,19 +432,19 @@ func (trader *Trader) reportPnL() { } func (trader *Trader) NotifyPnL(report *accounting.ProfitAndLossReport) { - for _, n := range trader.Notifiers { + for _, n := range trader.notifiers { n.NotifyPnL(report) } } func (trader *Trader) NotifyTrade(trade *types.Trade) { - for _, n := range trader.Notifiers { + for _, n := range trader.notifiers { n.NotifyTrade(trade) } } func (trader *Trader) Notify(msg string, args ...interface{}) { - for _, n := range trader.Notifiers { + for _, n := range trader.notifiers { n.Notify(msg, args...) } } @@ -506,8 +458,9 @@ func (trader *Trader) SubmitOrder(ctx context.Context, order *types.SubmitOrder) MinAssetBalance: 0, MinProfitSpread: 0, MaxOrderAmount: 0, - Exchange: trader.Exchange, - Trader: trader, + // FIXME: + // Exchange: trader.Exchange, + Trader: trader, } err := orderProcessor.Submit(ctx, order) diff --git a/pkg/strategy/buyandhold/main.go b/pkg/strategy/buyandhold/main.go new file mode 100644 index 000000000..582646568 --- /dev/null +++ b/pkg/strategy/buyandhold/main.go @@ -0,0 +1,30 @@ +package buyandhold + +import ( + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/types" +) + +type Strategy struct { + symbol string +} + +func New(symbol string) *Strategy { + return &Strategy{ + symbol: symbol, + } +} + +func (s *Strategy) Run(trader types.Trader, session *bbgo.ExchangeSession) error { + session.Subscribe(types.KLineChannel, s.symbol, types.SubscribeOptions{}) + session.Stream.OnKLineClosed(func(kline types.KLine) { + // trader.SubmitOrder(ctx, ....) + }) + + return nil +} + + + + +