diff --git a/.travis.yml b/.travis.yml index cad382afd..999bbd6b7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,4 +6,4 @@ go: before_script: - go mod download script: - - go test -v ./... + - go test -v ./pkg/... diff --git a/README.md b/README.md index 42d067f74..6ca6972d5 100644 --- a/README.md +++ b/README.md @@ -118,30 +118,43 @@ import ( "github.com/c9s/bbgo" ) -mysqlURL := viper.GetString("mysql-url") -mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) -db, err := sqlx.Connect("mysql", mysqlURL) +func main() { + mysqlURL := viper.GetString("mysql-url") + mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) + db, err := sqlx.Connect("mysql", mysqlURL) + if err != nil { + return err + } -if err != nil { - return err + environment := environ.New() + environment.AddExchange("binance", binance.New(viper.Getenv("binance-api-key"), viper.Getenv("binance-api-secret")))) + environment.AddExchange("max", max.New(viper.Getenv("max-key"), viper.Getenv("max-secret")))) + + trader := bbgo.NewTrader(bbgo.Config{ + Environment: environment, + DB: db, + }) + + trader.AddNotifier(slacknotifier.New(slackToken)) + trader.AddLogHook(slacklog.NewLogHook(slackToken)) + + // when any trade execution happened + trader.OnTrade(func(session string, exchange types.Exchange, trade types.Trade) { + notify(trade) + + notifyPnL() + }) + + // mount strategy on an exchange + trader.AddExchangeStrategy("binance", + bondtrade.New("btcusdt", "5m"), + bondtrade.New("ethusdt", "5m")) + + // mount cross exchange strategy + trader.AddCrossExchangeStrategy(hedgemaker.New("max", "binance")) + + t.Run(ctx) } - -t := bbgo.New(bbgo.Config{ - DB: db, -}) -t.AddNotifier(slacknotifier.New(slackToken)) -t.AddLogHook(slacklog.NewLogHook(slackToken)) - -t.AddExchange("binance", binance.New(viper.Getenv("binance-api-key"), viper.Getenv("binance-api-secret")))). - Subscribe("binance", "btcusdt", "kline@5m", "book", "trade"). - AddStrategy(bondtrade.New, bondtrade.New). - Symbols("btcusdt", "bnbusdt") - -t.AddExchange("max", max.New(viper.Getenv("max-key"), viper.Getenv("max-secret")))). - Subscribe("max", "btctwd", "kline@5m", "book", "trade"). - AddStrategy(flashdrop.New, bondtrade.New) - -t.AddCrossExchangeStrategy(hedgemaker.New(...)) ``` ## Support diff --git a/cmd/buyandhold/main.go b/cmd/buyandhold/main.go new file mode 100644 index 000000000..07f9ace1b --- /dev/null +++ b/cmd/buyandhold/main.go @@ -0,0 +1,84 @@ +package buyandhold + +import ( + "context" + "fmt" + "syscall" + + "github.com/jmoiron/sqlx" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/c9s/bbgo/cmd/cmdutil" + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/strategy/buyandhold" + "github.com/c9s/bbgo/pkg/types" +) + +func init() { + rootCmd.Flags().String("exchange", "", "target exchange") + rootCmd.Flags().String("symbol", "", "trading symbol") +} + +func connectMysql() (*sqlx.DB, error) { + mysqlURL := viper.GetString("mysql-url") + mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) + return sqlx.Connect("mysql", mysqlURL) +} + +var rootCmd = &cobra.Command{ + Use: "buyandhold", + Short: "buy and hold", + Long: "hold trader", + + // SilenceUsage is an option to silence usage when an error occurs. + SilenceUsage: true, + + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exchangeNameStr, err := cmd.Flags().GetString("exchange") + if err != nil { + return err + } + + exchangeName, err := types.ValidExchangeName(exchangeNameStr) + if err != nil { + return err + } + + symbol, err := cmd.Flags().GetString("symbol") + if err != nil { + return err + } + + exchange, err := cmdutil.NewExchange(exchangeName) + if err != nil { + return err + } + + db, err := cmdutil.ConnectMySQL() + if err != nil { + return err + } + + sessionID := "main" + environ := bbgo.NewEnvironment(db) + environ.AddExchange(sessionID, exchange).Subscribe(types.KLineChannel, symbol, types.SubscribeOptions{}) + + trader := bbgo.NewTrader(environ) + trader.AttachStrategy(sessionID, buyandhold.New(symbol)) + trader.Run(ctx) + + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + return nil + }, +} + +func main() { + if err := rootCmd.Execute(); err != nil { + log.WithError(err).Fatalf("cannot execute command") + } +} diff --git a/cmd/cmdutil/db.go b/cmd/cmdutil/db.go new file mode 100644 index 000000000..f50c3f7f2 --- /dev/null +++ b/cmd/cmdutil/db.go @@ -0,0 +1,14 @@ +package cmdutil + +import ( + "fmt" + + "github.com/jmoiron/sqlx" + "github.com/spf13/viper" +) + +func ConnectMySQL() (*sqlx.DB, error) { + mysqlURL := viper.GetString("mysql-url") + mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) + return sqlx.Connect("mysql", mysqlURL) +} diff --git a/cmd/cmdutil/exchange.go b/cmd/cmdutil/exchange.go new file mode 100644 index 000000000..926690406 --- /dev/null +++ b/cmd/cmdutil/exchange.go @@ -0,0 +1,36 @@ +package cmdutil + +import ( + "github.com/pkg/errors" + "github.com/spf13/viper" + + "github.com/c9s/bbgo/pkg/exchange/binance" + "github.com/c9s/bbgo/pkg/exchange/max" + "github.com/c9s/bbgo/pkg/types" +) + +func NewExchange(n types.ExchangeName) (types.Exchange, error) { + switch n { + + case types.ExchangeBinance: + key := viper.GetString("binance-api-key") + secret := viper.GetString("binance-api-secret") + if len(key) == 0 || len(secret) == 0 { + return nil, errors.New("empty key or secret") + } + + return binance.New(key, secret), nil + + case types.ExchangeMax: + key := viper.GetString("max-api-key") + secret := viper.GetString("max-api-secret") + if len(key) == 0 || len(secret) == 0 { + return nil, errors.New("empty key or secret") + } + + return max.New(key, secret), nil + + } + + return nil, nil +} diff --git a/cmd/pnl.go b/cmd/pnl.go index ae7e120fd..e2759f4e7 100644 --- a/cmd/pnl.go +++ b/cmd/pnl.go @@ -2,19 +2,15 @@ package cmd import ( "context" - "fmt" "strings" "time" - "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "github.com/spf13/viper" + "github.com/c9s/bbgo/cmd/cmdutil" "github.com/c9s/bbgo/pkg/accounting" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/exchange/binance" - "github.com/c9s/bbgo/pkg/exchange/max" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" ) @@ -26,30 +22,6 @@ func init() { RootCmd.AddCommand(pnlCmd) } -func connectMysql() (*sqlx.DB, error) { - mysqlURL := viper.GetString("mysql-url") - mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) - return sqlx.Connect("mysql", mysqlURL) -} - -func newExchange(n types.ExchangeName) types.Exchange { - switch n { - - case types.ExchangeBinance: - key := viper.GetString("binance-api-key") - secret := viper.GetString("binance-api-secret") - return binance.New(key, secret) - - case types.ExchangeMax: - key := viper.GetString("max-api-key") - secret := viper.GetString("max-api-secret") - return max.New(key, secret) - - } - - return nil -} - var pnlCmd = &cobra.Command{ Use: "pnl", Short: "pnl calculator", @@ -72,9 +44,12 @@ var pnlCmd = &cobra.Command{ return err } - exchange := newExchange(exchangeName) + exchange, err := cmdutil.NewExchange(exchangeName) + if err != nil { + return err + } - db, err := connectMysql() + db, err := cmdutil.ConnectMySQL() if err != nil { return err } diff --git a/cmd/transfers.go b/cmd/transfers.go index b541b1aea..8c13723d4 100644 --- a/cmd/transfers.go +++ b/cmd/transfers.go @@ -8,24 +8,23 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/c9s/bbgo/cmd/cmdutil" "github.com/c9s/bbgo/pkg/types" ) func init() { transferHistoryCmd.Flags().String("exchange", "", "target exchange") - transferHistoryCmd.Flags().String("asset", "BTC", "trading symbol") + transferHistoryCmd.Flags().String("asset", "", "trading symbol") transferHistoryCmd.Flags().String("since", "", "since time") RootCmd.AddCommand(transferHistoryCmd) } - - -type TimeRecord struct { +type timeRecord struct { Record interface{} - Time time.Time + Time time.Time } -type timeSlice []TimeRecord +type timeSlice []timeRecord func (p timeSlice) Len() int { return len(p) @@ -39,13 +38,9 @@ func (p timeSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - - - - var transferHistoryCmd = &cobra.Command{ - Use: "transfer-history", - Short: "show transfer history", + Use: "transfer-history", + Short: "show transfer history", SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { @@ -89,8 +84,7 @@ var transferHistoryCmd = &cobra.Command{ } } - - exchange := newExchange(exchangeName) + exchange, _ := cmdutil.NewExchange(exchangeName) var records timeSlice @@ -99,7 +93,7 @@ var transferHistoryCmd = &cobra.Command{ return err } for _, d := range deposits { - records = append(records, TimeRecord{ + records = append(records, timeRecord{ Record: d, Time: d.EffectiveTime(), }) @@ -110,7 +104,7 @@ var transferHistoryCmd = &cobra.Command{ return err } for _, w := range withdraws { - records = append(records, TimeRecord{ + records = append(records, timeRecord{ Record: w, Time: w.EffectiveTime(), }) @@ -134,39 +128,63 @@ var transferHistoryCmd = &cobra.Command{ } stats := calBaselineStats(asset, deposits, withdraws) - log.Infof("total %s deposit: %f (x %d)", asset, stats.TotalDeposit, stats.NumOfDeposit) - log.Infof("total %s withdraw: %f (x %d)", asset, stats.TotalWithdraw, stats.NumOfWithdraw) - log.Infof("baseline %s balance: %f", asset, stats.BaselineBalance) + for asset, quantity := range stats.TotalDeposit { + log.Infof("total %s deposit: %f", asset, quantity) + } + + for asset, quantity := range stats.TotalWithdraw { + log.Infof("total %s withdraw: %f", asset, quantity) + } + + for asset, quantity := range stats.BaselineBalance { + log.Infof("baseline %s balance: %f", asset, quantity) + } + return nil }, } type BaselineStats struct { - Asset string - NumOfDeposit int - NumOfWithdraw int - TotalDeposit float64 - TotalWithdraw float64 - BaselineBalance float64 + Asset string + TotalDeposit map[string]float64 + TotalWithdraw map[string]float64 + BaselineBalance map[string]float64 } func calBaselineStats(asset string, deposits []types.Deposit, withdraws []types.Withdraw) (stats BaselineStats) { stats.Asset = asset - stats.NumOfDeposit = len(deposits) - stats.NumOfWithdraw = len(withdraws) + stats.TotalDeposit = make(map[string]float64) + stats.TotalWithdraw = make(map[string]float64) + stats.BaselineBalance = make(map[string]float64) for _, deposit := range deposits { if deposit.Status == types.DepositSuccess { - stats.TotalDeposit += deposit.Amount + if _, ok := stats.TotalDeposit[deposit.Asset]; !ok { + stats.TotalDeposit[deposit.Asset] = 0.0 + } + + stats.TotalDeposit[deposit.Asset] += deposit.Amount } } for _, withdraw := range withdraws { if withdraw.Status == "completed" { - stats.TotalWithdraw += withdraw.Amount + if _, ok := stats.TotalWithdraw[withdraw.Asset]; !ok { + stats.TotalWithdraw[withdraw.Asset] = 0.0 + } + + stats.TotalWithdraw[withdraw.Asset] += withdraw.Amount } } - stats.BaselineBalance = stats.TotalDeposit - stats.TotalWithdraw + for asset, deposit := range stats.TotalDeposit { + withdraw, ok := stats.TotalWithdraw[asset] + if !ok { + withdraw = 0.0 + } + + stats.BaselineBalance[asset] = deposit - withdraw + } + return stats } diff --git a/go.mod b/go.mod index 10235d61f..c59ba51cf 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/c9s/bbgo go 1.13 require ( + github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/adshao/go-binance v0.0.0-20200604145522-bf563a35f17f github.com/c9s/goose v0.0.0-20200415105707-8da682162a5b github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect diff --git a/go.sum b/go.sum index 71b82a8ef..1ada96608 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= +github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/adshao/go-binance v0.0.0-20200604145522-bf563a35f17f h1:lVxx5HSt/imprfR8v577N3gCQmKmRgkGNz30FlHISO4= github.com/adshao/go-binance v0.0.0-20200604145522-bf563a35f17f/go.mod h1:XlIpE7brbCEQxp6VRouG/ZgjLjygQWE1xnc1DtQNp6I= diff --git a/pkg/bbgo/account.go b/pkg/bbgo/account.go index eb43c1121..46fceb045 100644 --- a/pkg/bbgo/account.go +++ b/pkg/bbgo/account.go @@ -1,10 +1,8 @@ package bbgo import ( - "context" "sync" - "github.com/c9s/bbgo/pkg/exchange/binance" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" @@ -12,31 +10,28 @@ import ( ) type Account struct { - mu sync.Mutex - + sync.Mutex Balances map[string]types.Balance } -func LoadAccount(ctx context.Context, exchange types.Exchange) (*Account, error) { - balances, err := exchange.QueryAccountBalances(ctx) - return &Account{ - Balances: balances, - }, err +func (a *Account) handleBalanceUpdates(balances map[string]types.Balance) { + a.Lock() + defer a.Unlock() + + for _, balance := range balances { + a.Balances[balance.Currency] = balance + } } -func (a *Account) BindPrivateStream(stream types.Stream) { - stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) { - a.mu.Lock() - defer a.mu.Unlock() - - for _, balance := range snapshot { - a.Balances[balance.Currency] = balance - } - }) - +func (a *Account) BindStream(stream types.Stream) { + stream.OnBalanceUpdate(a.handleBalanceUpdates) + stream.OnBalanceSnapshot(a.handleBalanceUpdates) } func (a *Account) Print() { + a.Lock() + defer a.Unlock() + for _, balance := range a.Balances { if util.NotZero(balance.Available) { log.Infof("[trader] balance %s %f", balance.Currency, balance.Available) diff --git a/pkg/bbgo/kline_regression.go b/pkg/bbgo/backtest.go similarity index 96% rename from pkg/bbgo/kline_regression.go rename to pkg/bbgo/backtest.go index 5dc556a6e..40c0fcfc4 100644 --- a/pkg/bbgo/kline_regression.go +++ b/pkg/bbgo/backtest.go @@ -2,13 +2,9 @@ package bbgo import ( "context" - "fmt" - - "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/accounting" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" ) type BackTestStream struct { @@ -37,7 +33,8 @@ func (trader *BackTestTrader) SubmitOrder(cxt context.Context, order *types.Subm trader.pendingOrders = append(trader.pendingOrders, order) } -func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy MarketStrategy) (chan struct{}, error) { +/* +func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy SingleExchangeStrategy) (chan struct{}, error) { logrus.Infof("[regression] number of kline data: %d", len(trader.SourceKLines)) done := make(chan struct{}) @@ -148,3 +145,4 @@ func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy MarketSt return done, nil } +*/ diff --git a/pkg/bbgo/order_processor.go b/pkg/bbgo/order_processor.go index e44b34ac6..0cdd906ad 100644 --- a/pkg/bbgo/order_processor.go +++ b/pkg/bbgo/order_processor.go @@ -2,13 +2,10 @@ package bbgo import ( "context" - "fmt" - "math" "github.com/pkg/errors" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" ) var ( @@ -41,6 +38,7 @@ type OrderProcessor struct { } func (p *OrderProcessor) Submit(ctx context.Context, order *types.SubmitOrder) error { + /* tradingCtx := p.Trader.Context currentPrice := tradingCtx.CurrentPrice market := order.Market @@ -126,6 +124,8 @@ func (p *OrderProcessor) Submit(ctx context.Context, order *types.SubmitOrder) e order.Quantity = quantity order.QuantityString = market.FormatVolume(quantity) + */ + return p.Exchange.SubmitOrder(ctx, order) } diff --git a/pkg/bbgo/store.go b/pkg/bbgo/store.go index 9f79c8ca5..3aba663e8 100644 --- a/pkg/bbgo/store.go +++ b/pkg/bbgo/store.go @@ -28,7 +28,7 @@ func NewMarketDataStore() *MarketDataStore { } } -func (store *MarketDataStore) BindPrivateStream(stream types.Stream) { +func (store *MarketDataStore) BindStream(stream types.Stream) { stream.OnKLineClosed(store.handleKLineClosed) } diff --git a/pkg/bbgo/strategy_test.go b/pkg/bbgo/strategy_test.go new file mode 100644 index 000000000..113b5a85e --- /dev/null +++ b/pkg/bbgo/strategy_test.go @@ -0,0 +1,68 @@ +package bbgo + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/exchange/binance" + "github.com/c9s/bbgo/pkg/service" + "github.com/c9s/bbgo/pkg/types" +) + +func TestTradeService(t *testing.T) { + db, mock, err := sqlmock.New() + assert.NoError(t, err) + _ = mock + + xdb := sqlx.NewDb(db, "mysql") + service.NewTradeService(xdb) + /* + + stmt := mock.ExpectQuery(`SELECT \* FROM trades WHERE symbol = \? ORDER BY gid DESC LIMIT 1`) + stmt.WithArgs("BTCUSDT") + stmt.WillReturnRows(sqlmock.NewRows([]string{"gid", "id", "exchange", "symbol", "price", "quantity"})) + + stmt2 := mock.ExpectQuery(`INSERT INTO trades (id, exchange, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + stmt2.WithArgs() + */ +} + +func TestEnvironment_Connect(t *testing.T) { + mysqlURL := os.Getenv("MYSQL_URL") + if len(mysqlURL) == 0 { + t.Skip("require mysql url") + } + mysqlURL = fmt.Sprintf("%s?parseTime=true", mysqlURL) + + key, secret := os.Getenv("BINANCE_API_KEY"), os.Getenv("BINANCE_API_SECRET") + if len(key) == 0 || len(secret) == 0 { + t.Skip("require key and secret") + } + + exchange := binance.New(key, secret) + assert.NotNil(t, exchange) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + xdb, err := sqlx.Connect("mysql", mysqlURL) + assert.NoError(t, err) + + environment := NewEnvironment(xdb) + environment.AddExchange("binance", exchange). + Subscribe(types.KLineChannel,"BTCUSDT", types.SubscribeOptions{}) + + err = environment.Connect(ctx) + assert.NoError(t, err) + + time.Sleep(5 * time.Second) +} + diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index 8a01bcdd6..ed2bb5b6a 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -2,50 +2,58 @@ package bbgo import ( "context" - "fmt" "strings" "time" - "github.com/fsnotify/fsnotify" "github.com/jmoiron/sqlx" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/accounting" - "github.com/c9s/bbgo/pkg/bbgo/config" - "github.com/c9s/bbgo/pkg/exchange/binance" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" + + _ "github.com/go-sql-driver/mysql" ) -// MarketStrategy represents the single Exchange strategy -type MarketStrategy interface { - OnLoad(tradingContext *Context, trader types.Trader) error - OnNewStream(stream types.Stream) error +// SingleExchangeStrategy represents the single Exchange strategy +type SingleExchangeStrategy interface { + Run(trader types.Trader, session *ExchangeSession) error } +type CrossExchangeStrategy interface { + Run(trader types.Trader, sessions map[string]*ExchangeSession) error +} + +// ExchangeSession presents the exchange connection session +// It also maintains and collects the data returned from the stream. type ExchangeSession struct { + // Exchange session name Name string + // The exchange account states Account *Account + // Stream is the connection stream of the exchange Stream types.Stream Subscriptions []types.Subscription - Exchange *binance.Exchange - - Strategies []MarketStrategy - - loadedSymbols map[string]struct{} + Exchange types.Exchange + // Markets defines market configuration of a symbol Markets map[string]types.Market + LastPrices map[string]float64 + + // Trades collects the executed trades from the exchange + // map: symbol -> []trade Trades map[string][]types.Trade + + MarketDataStore *MarketDataStore } func (session *ExchangeSession) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) *ExchangeSession { - session.Symbols(symbol) - session.Subscriptions = append(session.Subscriptions, types.Subscription{ Channel: channel, Symbol: symbol, @@ -55,92 +63,49 @@ func (session *ExchangeSession) Subscribe(channel types.Channel, symbol string, return session } -func (session *ExchangeSession) Symbols(symbols ...string) *ExchangeSession { - if session.loadedSymbols == nil { - session.loadedSymbols = make(map[string]struct{}) - } - - if session.Markets == nil { - session.Markets = make(map[string]types.Market) - } - - for _, symbol := range symbols { - session.loadedSymbols[symbol] = struct{}{} - - if market, ok := types.FindMarket(symbol); ok { - session.Markets[symbol] = market - } else { - log.Panicf("market of symbol %s not found", symbol) - } - } - - return session -} - -func (session *ExchangeSession) AddStrategy(strategy MarketStrategy) *ExchangeSession { - session.Strategies = append(session.Strategies, strategy) - return session -} - -type Trader struct { - Symbol string +// Environment presents the real exchange data layer +type Environment struct { TradeService *service.TradeService TradeSync *service.TradeSync - // Context is trading Context - Context *Context - - Exchange types.Exchange - - reportTimer *time.Timer - - ProfitAndLossCalculator *accounting.ProfitAndLossCalculator - - Account *Account - - Notifiers []Notifier - - ExchangeSessions map[string]*ExchangeSession + sessions map[string]*ExchangeSession } -func New(db *sqlx.DB, exchange types.Exchange, symbol string) *Trader { +func NewEnvironment(db *sqlx.DB) *Environment { tradeService := &service.TradeService{DB: db} - return &Trader{ - Symbol: symbol, - Exchange: exchange, + return &Environment{ TradeService: tradeService, TradeSync: &service.TradeSync{ Service: tradeService, }, + sessions: make(map[string]*ExchangeSession), } } -func (trader *Trader) AddNotifier(notifier Notifier) { - trader.Notifiers = append(trader.Notifiers, notifier) -} - -func (trader *Trader) AddExchange(name string, exchange *binance.Exchange) (session *ExchangeSession) { +func (environ *Environment) AddExchange(name string, exchange types.Exchange) (session *ExchangeSession) { session = &ExchangeSession{ - Name: name, - Exchange: exchange, + Name: name, + Exchange: exchange, + Markets: make(map[string]types.Market), + Trades: make(map[string][]types.Trade), + LastPrices: make(map[string]float64), } - if trader.ExchangeSessions == nil { - trader.ExchangeSessions = make(map[string]*ExchangeSession) - } - - trader.ExchangeSessions[name] = session + environ.sessions[name] = session return session } -func (trader *Trader) Connect(ctx context.Context) (err error) { - log.Info("syncing trades from exchange...") +func (environ *Environment) Init(ctx context.Context) (err error) { startTime := time.Now().AddDate(0, 0, -7) // sync from 7 days ago - for _, session := range trader.ExchangeSessions { + for _, session := range environ.sessions { + loadedSymbols := make(map[string]struct{}) + for _, sub := range session.Subscriptions { + loadedSymbols[sub.Symbol] = struct{}{} + } - for symbol := range session.loadedSymbols { - if err := trader.TradeSync.Sync(ctx, session.Exchange, symbol, startTime); err != nil { + for symbol := range loadedSymbols { + if err := environ.TradeSync.Sync(ctx, session.Exchange, symbol, startTime); err != nil { return err } @@ -148,9 +113,9 @@ func (trader *Trader) Connect(ctx context.Context) (err error) { tradingFeeCurrency := session.Exchange.PlatformFeeCurrency() if strings.HasPrefix(symbol, tradingFeeCurrency) { - trades, err = trader.TradeService.QueryForTradingFeeCurrency(symbol, tradingFeeCurrency) + trades, err = environ.TradeService.QueryForTradingFeeCurrency(symbol, tradingFeeCurrency) } else { - trades, err = trader.TradeService.Query(symbol) + trades, err = environ.TradeService.Query(symbol) } if err != nil { @@ -158,34 +123,51 @@ func (trader *Trader) Connect(ctx context.Context) (err error) { } log.Infof("symbol %s: %d trades loaded", symbol, len(trades)) - if session.Trades == nil { - session.Trades = make(map[string][]types.Trade) - } session.Trades[symbol] = trades - stockManager := &StockDistribution{ - Symbol: symbol, - TradingFeeCurrency: tradingFeeCurrency, - } - - checkpoints, err := stockManager.AddTrades(trades) + currentPrice, err := session.Exchange.QueryAveragePrice(ctx, symbol) if err != nil { return err } - log.Infof("symbol %s: found stock checkpoints: %+v", symbol, checkpoints) + session.LastPrices[symbol] = currentPrice } - session.Account, err = LoadAccount(ctx, session.Exchange) + balances, err := session.Exchange.QueryAccountBalances(ctx) if err != nil { return err } + session.Account = &Account{ Balances: balances } + session.Stream = session.Exchange.NewStream() - if err != nil { - return err - } + session.Account.BindStream(session.Stream) + + marketDataStore := NewMarketDataStore() + marketDataStore.BindStream(session.Stream) + + + // update last prices + session.Stream.OnKLineClosed(func(kline types.KLine) { + session.LastPrices[kline.Symbol] = kline.Close + }) + + session.Stream.OnTrade(func(trade *types.Trade) { + // append trades + session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], *trade) + + if err := environ.TradeService.Insert(*trade); err != nil { + log.WithError(err).Errorf("trade insert error: %+v", *trade) + } + }) + } + + return nil +} + +func (environ *Environment) Connect(ctx context.Context) error { + for _, session := range environ.sessions { if err := session.Stream.Connect(ctx); err != nil { return err } @@ -194,86 +176,115 @@ func (trader *Trader) Connect(ctx context.Context) (err error) { return nil } -func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error { - // query all trades from database so that we can get the correct pnl - var err error - var trades []types.Trade - tradingFeeCurrency := trader.Exchange.PlatformFeeCurrency() - if strings.HasPrefix(trader.Symbol, tradingFeeCurrency) { - trades, err = trader.TradeService.QueryForTradingFeeCurrency(trader.Symbol, tradingFeeCurrency) - } else { - trades, err = trader.TradeService.Query(trader.Symbol) +type Trader struct { + reportTimer *time.Timer + ProfitAndLossCalculator *accounting.ProfitAndLossCalculator + + notifiers []Notifier + environment *Environment + + crossExchangeStrategies []CrossExchangeStrategy + exchangeStrategies map[string][]SingleExchangeStrategy +} + +func NewTrader(environ *Environment) *Trader { + return &Trader{ + environment: environ, + exchangeStrategies: make(map[string][]SingleExchangeStrategy), + } +} + +func (trader *Trader) AddNotifier(notifier Notifier) { + trader.notifiers = append(trader.notifiers, notifier) +} + +// AttachStrategy attaches the single exchange strategy on an exchange session. +// Single exchange strategy is the default behavior. +func (trader *Trader) AttachStrategy(session string, strategy SingleExchangeStrategy) error { + if _, ok := trader.environment.sessions[session]; !ok { + return errors.New("session not defined") } - if err != nil { - return err - } - - log.Infof("%d trades loaded", len(trades)) - - stockManager := &StockDistribution{ - Symbol: trader.Symbol, - TradingFeeCurrency: tradingFeeCurrency, - } - - checkpoints, err := stockManager.AddTrades(trades) - if err != nil { - return err - } - - log.Infof("found checkpoints: %+v", checkpoints) - - market, ok := types.FindMarket(trader.Symbol) - if !ok { - return fmt.Errorf("%s market not found", trader.Symbol) - } - - currentPrice, err := trader.Exchange.QueryAveragePrice(ctx, trader.Symbol) - if err != nil { - return err - } - - trader.Context = &Context{ - CurrentPrice: currentPrice, - Symbol: trader.Symbol, - Market: market, - StockManager: stockManager, - } - - /* - if len(checkpoints) > 0 { - // get the last checkpoint - idx := checkpoints[len(checkpoints)-1] - if idx < len(trades)-1 { - trades = trades[idx:] - firstTrade := trades[0] - pnlStartTime = firstTrade.Time - notifier.Notify("%s Found the latest trade checkpoint %s", firstTrade.Symbol, firstTrade.Time, firstTrade) - } - } - */ - - trader.ProfitAndLossCalculator = &accounting.ProfitAndLossCalculator{ - TradingFeeCurrency: tradingFeeCurrency, - Symbol: trader.Symbol, - StartTime: startTime, - CurrentPrice: currentPrice, - Trades: trades, - } - - account, err := LoadAccount(ctx, trader.Exchange) - if err != nil { - return err - } - - trader.Account = account - trader.Context.Balances = account.Balances - account.Print() - + trader.exchangeStrategies[session] = append(trader.exchangeStrategies[session], strategy) return nil } -func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy MarketStrategy, configFile string) (chan struct{}, error) { +// AttachCrossExchangeStrategy attaches the cross exchange strategy +func (trader *Trader) AttachCrossExchangeStrategy(strategy CrossExchangeStrategy) error { + trader.crossExchangeStrategies = append(trader.crossExchangeStrategies, strategy) + return nil +} + +func (trader *Trader) Run(ctx context.Context) error { + if err := trader.environment.Init(ctx); err != nil { + return err + } + + // load and run session strategies + for session, strategies := range trader.exchangeStrategies { + for _, strategy := range strategies { + err := strategy.Run(trader, trader.environment.sessions[session]) + if err != nil { + return err + } + } + } + + for _, strategy := range trader.crossExchangeStrategies { + if err := strategy.Run(trader, trader.environment.sessions) ; err != nil { + return err + } + } + + + return trader.environment.Connect(ctx) + /* + stockManager := &StockDistribution{ + Symbol: symbol, + TradingFeeCurrency: tradingFeeCurrency, + } + + checkpoints, err := stockManager.AddTrades(trades) + if err != nil { + return err + } + + log.Infof("symbol %s: found stock checkpoints: %+v", symbol, checkpoints) + */ +} + +func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error { + /* + currentPrice, err := trader.Exchange.QueryAveragePrice(ctx, trader.Symbol) + if err != nil { + return err + } + + trader.Context = &Context{ + CurrentPrice: currentPrice, + Symbol: trader.Symbol, + Market: market, + StockManager: stockManager, + } + */ + + /* + trader.ProfitAndLossCalculator = &accounting.ProfitAndLossCalculator{ + TradingFeeCurrency: tradingFeeCurrency, + Symbol: trader.Symbol, + StartTime: startTime, + CurrentPrice: currentPrice, + Trades: trades, + } + */ + + // trader.Context.Balances = account.Balances + // account.Print() + return nil +} + +/* +func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy SingleExchangeStrategy, configFile string) (chan struct{}, error) { var done = make(chan struct{}) var configWatcherDone = make(chan struct{}) @@ -348,8 +359,10 @@ func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy Mar return done, nil } +*/ -func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) (chan struct{}, error) { +/* +func (trader *Trader) RunStrategy(ctx context.Context, strategy SingleExchangeStrategy) (chan struct{}, error) { if err := strategy.OnLoad(trader.Context, trader); err != nil { return nil, err } @@ -358,9 +371,9 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) // bind kline store to the stream klineStore := NewMarketDataStore() - klineStore.BindPrivateStream(stream) + klineStore.BindStream(stream) - trader.Account.BindPrivateStream(stream) + trader.Account.BindStream(stream) if err := strategy.OnNewStream(stream); err != nil { return nil, err @@ -371,14 +384,6 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) }) stream.OnTrade(func(trade *types.Trade) { - if trade.Symbol != trader.Symbol { - return - } - - if err := trader.TradeService.Insert(*trade); err != nil { - log.WithError(err).Error("trade insert error") - } - trader.NotifyTrade(trade) trader.ProfitAndLossCalculator.AddTrade(*trade) _, err := trader.Context.StockManager.AddTrades([]types.Trade{*trade}) @@ -420,6 +425,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) return done, nil } +*/ func (trader *Trader) reportPnL() { report := trader.ProfitAndLossCalculator.Calculate() @@ -428,19 +434,19 @@ func (trader *Trader) reportPnL() { } func (trader *Trader) NotifyPnL(report *accounting.ProfitAndLossReport) { - for _, n := range trader.Notifiers { + for _, n := range trader.notifiers { n.NotifyPnL(report) } } func (trader *Trader) NotifyTrade(trade *types.Trade) { - for _, n := range trader.Notifiers { + for _, n := range trader.notifiers { n.NotifyTrade(trade) } } func (trader *Trader) Notify(msg string, args ...interface{}) { - for _, n := range trader.Notifiers { + for _, n := range trader.notifiers { n.Notify(msg, args...) } } @@ -454,8 +460,9 @@ func (trader *Trader) SubmitOrder(ctx context.Context, order *types.SubmitOrder) MinAssetBalance: 0, MinProfitSpread: 0, MaxOrderAmount: 0, - Exchange: trader.Exchange, - Trader: trader, + // FIXME: + // Exchange: trader.Exchange, + Trader: trader, } err := orderProcessor.Submit(ctx, order) diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 23921cbc8..acf65dbe1 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -62,8 +62,12 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since endTime = until } - withdraws, err := e.Client.NewListWithdrawsService(). - Asset(asset). + req := e.Client.NewListWithdrawsService() + if len(asset) > 0 { + req.Asset(asset) + } + + withdraws, err := req. StartTime(startTime.UnixNano() / int64(time.Millisecond)). EndTime(endTime.UnixNano() / int64(time.Millisecond)). Do(ctx) @@ -130,8 +134,12 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, endTime = until } - deposits, err := e.Client.NewListDepositsService(). - Asset(asset). + req := e.Client.NewListDepositsService() + if len(asset) > 0 { + req.Asset(asset) + } + + deposits, err := req. StartTime(startTime.UnixNano() / int64(time.Millisecond)). EndTime(endTime.UnixNano() / int64(time.Millisecond)). Do(ctx) diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 927a746f5..8d3d9eb7a 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -99,8 +99,12 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since } log.Infof("querying withdraw %s: %s <=> %s", asset, startTime, endTime) - withdraws, err := e.client.AccountService.NewGetWithdrawalHistoryRequest(). - Currency(toLocalCurrency(asset)). + req := e.client.AccountService.NewGetWithdrawalHistoryRequest() + if len(asset) > 0 { + req.Currency(toLocalCurrency(asset)) + } + + withdraws, err := req. From(startTime.Unix()). To(endTime.Unix()). Do(ctx) @@ -165,8 +169,12 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, } log.Infof("querying deposit history %s: %s <=> %s", asset, startTime, endTime) - deposits, err := e.client.AccountService.NewGetDepositHistoryRequest(). - Currency(toLocalCurrency(asset)). + req := e.client.AccountService.NewGetDepositHistoryRequest() + if len(asset) > 0 { + req.Currency(toLocalCurrency(asset)) + } + + deposits, err := req. From(startTime.Unix()). To(endTime.Unix()).Do(ctx) diff --git a/pkg/exchange/max/maxapi/account.go b/pkg/exchange/max/maxapi/account.go index 7977f3502..2a43f2fbb 100644 --- a/pkg/exchange/max/maxapi/account.go +++ b/pkg/exchange/max/maxapi/account.go @@ -126,7 +126,7 @@ type Deposit struct { type GetDepositHistoryRequestParams struct { *PrivateRequestParams - Currency string `json:"currency"` + Currency string `json:"currency,omitempty"` From int64 `json:"from,omitempty"` // seconds To int64 `json:"to,omitempty"` // seconds State string `json:"state,omitempty"` // submitting, submitted, rejected, accepted, checking, refunded, canceled, suspect @@ -212,7 +212,7 @@ type Withdraw struct { type GetWithdrawHistoryRequestParams struct { *PrivateRequestParams - Currency string `json:"currency"` + Currency string `json:"currency,omitempty"` From int64 `json:"from,omitempty"` // seconds To int64 `json:"to,omitempty"` // seconds State string `json:"state,omitempty"` // submitting, submitted, rejected, accepted, checking, refunded, canceled, suspect diff --git a/pkg/strategy/buyandhold/main.go b/pkg/strategy/buyandhold/main.go new file mode 100644 index 000000000..582646568 --- /dev/null +++ b/pkg/strategy/buyandhold/main.go @@ -0,0 +1,30 @@ +package buyandhold + +import ( + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/types" +) + +type Strategy struct { + symbol string +} + +func New(symbol string) *Strategy { + return &Strategy{ + symbol: symbol, + } +} + +func (s *Strategy) Run(trader types.Trader, session *bbgo.ExchangeSession) error { + session.Subscribe(types.KLineChannel, s.symbol, types.SubscribeOptions{}) + session.Stream.OnKLineClosed(func(kline types.KLine) { + // trader.SubmitOrder(ctx, ....) + }) + + return nil +} + + + + +