diff --git a/bbgo/kline_regression.go b/bbgo/kline_regression.go index 99f6017cf..0a3a7af25 100644 --- a/bbgo/kline_regression.go +++ b/bbgo/kline_regression.go @@ -38,13 +38,13 @@ func (trader *BackTestTrader) SubmitOrder(cxt context.Context, order *types.Subm trader.pendingOrders = append(trader.pendingOrders, order) } -func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error) { +func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy MarketStrategy) (chan struct{}, error) { logrus.Infof("[regression] number of kline data: %d", len(trader.SourceKLines)) done := make(chan struct{}) defer close(done) - if err := strategy.Load(trader.Context, trader); err != nil { + if err := strategy.OnLoad(trader.Context, trader); err != nil { return nil, err } diff --git a/bbgo/service/trade.go b/bbgo/service/trade.go index eb58da5f2..219259e79 100644 --- a/bbgo/service/trade.go +++ b/bbgo/service/trade.go @@ -8,16 +8,14 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/c9s/bbgo/pkg/bbgo/exchange/binance" "github.com/c9s/bbgo/pkg/bbgo/types" ) type TradeSync struct { Service *TradeService - Exchange *binance.Exchange } -func (s *TradeSync) Sync(ctx context.Context, symbol string, startTime time.Time) error { +func (s *TradeSync) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { lastTrade, err := s.Service.QueryLast(symbol) if err != nil { return err @@ -31,7 +29,7 @@ func (s *TradeSync) Sync(ctx context.Context, symbol string, startTime time.Time log.Infof("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime) } - trades, err := s.Exchange.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{ + trades, err := exchange.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{ StartTime: &startTime, Limit: 200, LastTradeID: lastID, diff --git a/bbgo/trader.go b/bbgo/trader.go index afc69d922..3aafae352 100644 --- a/bbgo/trader.go +++ b/bbgo/trader.go @@ -12,17 +12,68 @@ import ( "github.com/c9s/bbgo/pkg/bbgo/accounting" "github.com/c9s/bbgo/pkg/bbgo/config" - "github.com/c9s/bbgo/pkg/bbgo/service" - "github.com/c9s/bbgo/pkg/bbgo/exchange/binance" + "github.com/c9s/bbgo/pkg/bbgo/service" "github.com/c9s/bbgo/pkg/bbgo/types" ) -type Strategy interface { - Load(tradingContext *Context, trader types.Trader) error +// MarketStrategy represents the single Exchange strategy +type MarketStrategy interface { + OnLoad(tradingContext *Context, trader types.Trader) error OnNewStream(stream types.PrivateStream) error } +type ExchangeSession struct { + Name string + + Account *Account + + Stream types.PrivateStream + + Subscriptions []types.Subscription + + Exchange *binance.Exchange + + Strategies []MarketStrategy + + loadedSymbols map[string]struct{} + + Markets map[string]types.Market + + Trades map[string][]types.Trade +} + +func (session *ExchangeSession) Subscribe(channel string, symbol string, options types.SubscribeOptions) *ExchangeSession { + session.Symbols(symbol) + + session.Subscriptions = append(session.Subscriptions, types.Subscription{ + Channel: channel, + Symbol: symbol, + Options: options, + }) + + return session +} + +func (session *ExchangeSession) Symbols(symbols ...string) *ExchangeSession { + for _, symbol := range symbols { + session.loadedSymbols[symbol] = struct{}{} + + if market, ok := types.FindMarket(symbol); ok { + session.Markets[symbol] = market + } else { + log.Panicf("market of symbol %s not found", symbol) + } + } + + return session +} + +func (session *ExchangeSession) AddStrategy(strategy MarketStrategy) *ExchangeSession { + session.Strategies = append(session.Strategies, strategy) + return session +} + type Trader struct { Symbol string TradeService *service.TradeService @@ -39,16 +90,9 @@ type Trader struct { Account *Account - // new fieldss - Exchanges map[string]*binance.Exchange - - ExchangeAccounts map[string]*Account - - ExchangeStreams map[string]types.PrivateStream - - ExchangeSubscriptions map[string][]types.Subscription - Notifiers []Notifier + + ExchangeSessions map[string]*ExchangeSession } func New(db *sqlx.DB, exchange *binance.Exchange, symbol string) *Trader { @@ -58,8 +102,7 @@ func New(db *sqlx.DB, exchange *binance.Exchange, symbol string) *Trader { Exchange: exchange, TradeService: tradeService, TradeSync: &service.TradeSync{ - Service: tradeService, - Exchange: exchange, + Service: tradeService, }, } } @@ -68,36 +111,67 @@ func (trader *Trader) AddNotifier(notifier Notifier) { trader.Notifiers = append(trader.Notifiers, notifier) } -func (trader *Trader) AddExchange(name string, exchange *binance.Exchange) { - trader.Exchanges[name] = exchange +func (trader *Trader) AddExchange(name string, exchange *binance.Exchange) (session *ExchangeSession) { + session = &ExchangeSession{ + Name: name, + Exchange: exchange, + } + + trader.ExchangeSessions[name] = session + return session } -func (trader *Trader) Subscribe(exchange string, channel string, symbol string, options types.SubscribeOptions) { - trader.ExchangeSubscriptions[exchange] = append(trader.ExchangeSubscriptions[exchange], types.Subscription{ - Channel: channel, - Symbol: symbol, - Options: options, - }) -} +func (trader *Trader) Connect(ctx context.Context) (err error) { + log.Info("syncing trades from exchange...") + startTime := time.Now().AddDate(0, 0, -7) // sync from 7 days ago -func (trader *Trader) Connect(ctx context.Context) error { - for n, ex := range trader.Exchanges { - account, err := LoadAccount(ctx, ex) + for _, session := range trader.ExchangeSessions { + + for symbol := range session.loadedSymbols { + if err := trader.TradeSync.Sync(ctx, session.Exchange, symbol, startTime); err != nil { + return err + } + + var trades []types.Trade + + tradingFeeCurrency := session.Exchange.PlatformFeeCurrency() + if strings.HasPrefix(symbol, tradingFeeCurrency) { + trades, err = trader.TradeService.QueryForTradingFeeCurrency(symbol, tradingFeeCurrency) + } else { + trades, err = trader.TradeService.Query(symbol) + } + + if err != nil { + return err + } + + log.Infof("symbol %s: %d trades loaded", symbol, len(trades)) + session.Trades[symbol] = trades + + stockManager := &StockManager{ + Symbol: symbol, + TradingFeeCurrency: tradingFeeCurrency, + } + + checkpoints, err := stockManager.AddTrades(trades) + if err != nil { + return err + } + + log.Infof("symbol %s: found stock checkpoints: %+v", symbol, checkpoints) + } + + session.Account, err = LoadAccount(ctx, session.Exchange) if err != nil { return err } - trader.ExchangeAccounts[n] = account - - - stream, err := ex.NewPrivateStream() + session.Stream, err = session.Exchange.NewPrivateStream() if err != nil { return err } - trader.ExchangeStreams[n] = stream - - if err := stream.Connect(ctx); err != nil { + if err := session.Stream.Connect(ctx); err != nil { return err } } @@ -106,11 +180,7 @@ func (trader *Trader) Connect(ctx context.Context) error { } func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error { - log.Info("syncing trades from exchange...") - if err := trader.TradeSync.Sync(ctx, trader.Symbol, startTime); err != nil { - return err - } - + // query all trades from database so that we can get the correct pnl var err error var trades []types.Trade tradingFeeCurrency := trader.Exchange.PlatformFeeCurrency() @@ -188,7 +258,7 @@ func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error return nil } -func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy Strategy, configFile string) (chan struct{}, error) { +func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy MarketStrategy, configFile string) (chan struct{}, error) { var done = make(chan struct{}) var configWatcherDone = make(chan struct{}) @@ -264,8 +334,8 @@ func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy Str return done, nil } -func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error) { - if err := strategy.Load(trader.Context, trader); err != nil { +func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) (chan struct{}, error) { + if err := strategy.OnLoad(trader.Context, trader); err != nil { return nil, err } diff --git a/bbgo/types/exchange.go b/bbgo/types/exchange.go index eb7da5133..fbf12ad96 100644 --- a/bbgo/types/exchange.go +++ b/bbgo/types/exchange.go @@ -13,7 +13,9 @@ type Exchange interface { QueryAccountBalances(ctx context.Context) (map[string]Balance, error) QueryKLines(ctx context.Context, symbol string, interval string, options KLineQueryOptions) ([]KLine, error) + QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error) + BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error) SubmitOrder(ctx context.Context, order *SubmitOrder) error }