From 4b49fda4636fd267c03c5bdc35684f82a0f34420 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:04:56 +0800 Subject: [PATCH] refactor sync service --- README.md | 1 + pkg/bbgo/environment.go | 8 +- pkg/cmd/pnl.go | 89 ++++++++++++++--- pkg/cmd/transfer.go | 6 +- pkg/service/backtest.go | 50 ++++++---- pkg/service/sync.go | 208 ++-------------------------------------- 6 files changed, 128 insertions(+), 234 deletions(-) diff --git a/README.md b/README.md index debb3b2df..c61911f0b 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,7 @@ for lorca make embed && go run -tags web ./cmd/bbgo-lorca ``` + ## Support ### By contributing pull requests diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index c651d065c..63e9f2fb9 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -161,9 +161,11 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver environ.RewardService = &service.RewardService{DB: db} environ.SyncService = &service.SyncService{ - TradeService: environ.TradeService, - OrderService: environ.OrderService, - RewardService: environ.RewardService, + TradeService: environ.TradeService, + OrderService: environ.OrderService, + RewardService: environ.RewardService, + WithdrawService: &service.WithdrawService{DB: db}, + DepositService: &service.DepositService{DB: db}, } return nil diff --git a/pkg/cmd/pnl.go b/pkg/cmd/pnl.go index b42cfb4b4..37f7b2071 100644 --- a/pkg/cmd/pnl.go +++ b/pkg/cmd/pnl.go @@ -2,7 +2,10 @@ package cmd import ( "context" + "fmt" + "os" "strings" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -11,14 +14,14 @@ import ( "github.com/c9s/bbgo/pkg/accounting" "github.com/c9s/bbgo/pkg/accounting/pnl" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" ) func init() { - PnLCmd.Flags().String("exchange", "", "target exchange") - PnLCmd.Flags().String("symbol", "BTCUSDT", "trading symbol") + PnLCmd.Flags().String("session", "", "target exchange") + PnLCmd.Flags().String("symbol", "", "trading symbol") + PnLCmd.Flags().Bool("include-transfer", false, "convert transfer records into trades") PnLCmd.Flags().Int("limit", 500, "number of trades") RootCmd.AddCommand(PnLCmd) } @@ -30,12 +33,26 @@ var PnLCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - exchangeNameStr, err := cmd.Flags().GetString("exchange") + configFile, err := cmd.Flags().GetString("config") if err != nil { return err } - exchangeName, err := types.ValidExchangeName(exchangeNameStr) + if len(configFile) == 0 { + return errors.New("--config option is required") + } + + if _, err := os.Stat(configFile); os.IsNotExist(err) { + return err + } + + userConfig, err := bbgo.Load(configFile, false) + if err != nil { + return err + } + + + sessionName, err := cmd.Flags().GetString("session") if err != nil { return err } @@ -45,23 +62,73 @@ var PnLCmd = &cobra.Command{ return err } + if len(symbol) == 0 { + return errors.New("--symbol [SYMBOL] is required") + } + limit, err := cmd.Flags().GetInt("limit") if err != nil { return err } - exchange, err := cmdutil.NewExchange(exchangeName) + environ := bbgo.NewEnvironment() + if err := environ.ConfigureDatabase(ctx); err != nil { + return err + } + + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + session, ok := environ.Session(sessionName) + if !ok { + return fmt.Errorf("session %s not found", sessionName) + } + + if err := environ.Sync(ctx) ; err != nil { + return err + } + + exchange := session.Exchange + + market, ok := session.Market(symbol) + if !ok { + return fmt.Errorf("market config %s not found", symbol) + } + + since := time.Now().AddDate(-1, 0, 0) + until := time.Now() + + includeTransfer, err := cmd.Flags().GetBool("include-transfer") if err != nil { return err } + if includeTransfer { + transferService, ok := exchange.(types.ExchangeTransferService) + if !ok { + return fmt.Errorf("session exchange %s does not implement transfer service", sessionName) + } - environ := bbgo.NewEnvironment() - if err := environ.ConfigureDatabase(ctx) ; err != nil { - return err + deposits, err := transferService.QueryDepositHistory(ctx, market.BaseCurrency, since, until) + if err != nil { + return err + } + _ = deposits + + withdrawals, err := transferService.QueryWithdrawHistory(ctx, market.BaseCurrency, since, until) + if err != nil { + return err + } + _ = withdrawals + + // we need the backtest klines for the daily prices + backtestService := &service.BacktestService{DB: environ.DatabaseService.DB} + if err := backtestService.SyncKLineByInterval(ctx, exchange, symbol, types.Interval1d, since, until); err != nil { + return err + } } - var trades []types.Trade tradingFeeCurrency := exchange.PlatformFeeCurrency() if strings.HasPrefix(symbol, tradingFeeCurrency) { @@ -71,7 +138,7 @@ var PnLCmd = &cobra.Command{ trades, err = environ.TradeService.Query(service.QueryTradesOptions{ Exchange: exchange.Name(), Symbol: symbol, - Limit: limit, + Limit: limit, }) } diff --git a/pkg/cmd/transfer.go b/pkg/cmd/transfer.go index 8f3bf5657..d8d39770b 100644 --- a/pkg/cmd/transfer.go +++ b/pkg/cmd/transfer.go @@ -101,7 +101,11 @@ var TransferHistoryCmd = &cobra.Command{ var records timeSlice - exchange := session.Exchange + exchange, ok := session.Exchange.(types.ExchangeTransferService) + if !ok { + return fmt.Errorf("exchange session %s does not implement transfer service", sessionName) + } + deposits, err := exchange.QueryDepositHistory(ctx, asset, since, until) if err != nil { return err diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index f29af189b..d41e0933c 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -17,33 +17,41 @@ type BacktestService struct { DB *sqlx.DB } -func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - now := time.Now() - for interval := range types.SupportedIntervals { - log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) +func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { + log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) - lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval) - if err != nil { + lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval) + if err != nil { + return err + } + + if lastKLine != nil { + log.Infof("found last checkpoint %s", lastKLine.EndTime) + startTime = lastKLine.StartTime.Add(time.Minute) + } + + batch := &batch2.KLineBatchQuery{Exchange: exchange} + + // should use channel here + klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime) + // var previousKLine types.KLine + for k := range klineC { + if err := s.Insert(k); err != nil { return err } + } - if lastKLine != nil { - log.Infof("found last checkpoint %s", lastKLine.EndTime) - startTime = lastKLine.StartTime.Add(time.Minute) - } + if err := <-errC; err != nil { + return err + } - batch := &batch2.KLineBatchQuery{Exchange: exchange} + return nil +} - // should use channel here - klineC, errC := batch.Query(ctx, symbol, interval, startTime, now) - // var previousKLine types.KLine - for k := range klineC { - if err := s.Insert(k); err != nil { - return err - } - } - - if err := <-errC; err != nil { +func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { + endTime := time.Now() + for interval := range types.SupportedIntervals { + if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime) ; err != nil { return err } } diff --git a/pkg/service/sync.go b/pkg/service/sync.go index ab1d578ab..23b2e7970 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -5,221 +5,33 @@ import ( "errors" "time" - "github.com/sirupsen/logrus" - - "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") type SyncService struct { - TradeService *TradeService - OrderService *OrderService - RewardService *RewardService -} - -func (s *SyncService) SyncRewards(ctx context.Context, exchange types.Exchange) error { - service, ok := exchange.(types.ExchangeRewardService) - if !ok { - return ErrNotImplemented - } - - var rewardKeys = map[string]struct{}{} - - var startTime time.Time - - records, err := s.RewardService.QueryLast(exchange.Name(), 50) - if err != nil { - return err - } - - if len(records) > 0 { - lastRecord := records[0] - startTime = lastRecord.CreatedAt.Time() - - for _, record := range records { - rewardKeys[record.UUID] = struct{}{} - } - } - - batchQuery := &batch.RewardBatchQuery{Service: service} - rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now()) - - for reward := range rewardsC { - select { - - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - - } - - if _, ok := rewardKeys[reward.UUID]; ok { - continue - } - - logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt) - - if err := s.RewardService.Insert(reward); err != nil { - return err - } - } - - return <-errC -} - -func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - isMargin := false - isIsolated := false - if marginExchange, ok := exchange.(types.MarginExchange); ok { - marginSettings := marginExchange.GetMarginSettings() - isMargin = marginSettings.IsMargin - isIsolated = marginSettings.IsIsolatedMargin - if marginSettings.IsIsolatedMargin { - symbol = marginSettings.IsolatedMarginSymbol - } - } - - records, err := s.OrderService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) - if err != nil { - return err - } - - orderKeys := make(map[uint64]struct{}) - - var lastID uint64 = 0 - if len(records) > 0 { - for _, record := range records { - orderKeys[record.OrderID] = struct{}{} - } - - lastID = records[0].OrderID - startTime = records[0].CreationTime.Time() - } - - b := &batch.ClosedOrderBatchQuery{Exchange: exchange} - ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID) - for order := range ordersC { - select { - - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - - } - - if _, exists := orderKeys[order.OrderID]; exists { - continue - } - - if err := s.OrderService.Insert(order); err != nil { - return err - } - } - - return <-errC -} - -func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string) error { - isMargin := false - isIsolated := false - if marginExchange, ok := exchange.(types.MarginExchange); ok { - marginSettings := marginExchange.GetMarginSettings() - isMargin = marginSettings.IsMargin - isIsolated = marginSettings.IsIsolatedMargin - if marginSettings.IsIsolatedMargin { - symbol = marginSettings.IsolatedMarginSymbol - } - } - - // records descending ordered - records, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) - if err != nil { - return err - } - - var tradeKeys = map[types.TradeKey]struct{}{} - var lastTradeID int64 = 1 - if len(records) > 0 { - for _, record := range records { - tradeKeys[record.Key()] = struct{}{} - } - - lastTradeID = records[0].ID - } - - b := &batch.TradeBatchQuery{Exchange: exchange} - tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{ - LastTradeID: lastTradeID, - }) - - for trade := range tradeC { - select { - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - } - - key := trade.Key() - if _, exists := tradeKeys[key]; exists { - continue - } - - tradeKeys[key] = struct{}{} - - logrus.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s", - trade.Exchange, - trade.ID, - trade.Symbol, - trade.Side, - trade.Price, - trade.Quantity, - trade.MakerOrTakerLabel(), - trade.Time.String()) - - if err := s.TradeService.Insert(trade); err != nil { - return err - } - } - - return <-errC + TradeService *TradeService + OrderService *OrderService + RewardService *RewardService + WithdrawService *WithdrawService + DepositService *DepositService } // SyncSessionSymbols syncs the trades from the given exchange session func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error { for _, symbol := range symbols { - if err := s.SyncTrades(ctx, exchange, symbol); err != nil { + if err := s.TradeService.Sync(ctx, exchange, symbol); err != nil { return err } - if err := s.SyncOrders(ctx, exchange, symbol, startTime); err != nil { + if err := s.OrderService.Sync(ctx, exchange, symbol, startTime); err != nil { return err } + } - if err := s.SyncRewards(ctx, exchange); err != nil { - if err == ErrNotImplemented { - continue - } - + if err := s.RewardService.Sync(ctx, exchange); err != nil { + if err != ErrNotImplemented { return err } }