From fa7177426f2f62d9019a2066460c806085965ee9 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 22 Jun 2022 18:19:11 +0800 Subject: [PATCH] cmd/pnl: fix trade table query --- pkg/cmd/pnl.go | 128 ++++++++++++++++--------------- pkg/service/trade.go | 46 +++++++++-- pkg/strategy/dca/strategy.go | 2 +- pkg/strategy/rsmaker/strategy.go | 2 +- 4 files changed, 108 insertions(+), 70 deletions(-) diff --git a/pkg/cmd/pnl.go b/pkg/cmd/pnl.go index 575405c18..ddebea06a 100644 --- a/pkg/cmd/pnl.go +++ b/pkg/cmd/pnl.go @@ -18,11 +18,11 @@ import ( ) func init() { - PnLCmd.Flags().String("session", "", "target exchange") + PnLCmd.Flags().StringArray("session", []string{}, "target exchange sessions") PnLCmd.Flags().String("symbol", "", "trading symbol") PnLCmd.Flags().Bool("include-transfer", false, "convert transfer records into trades") - PnLCmd.Flags().String("since", "", "query trades from a timepoint") - PnLCmd.Flags().Int("limit", 0, "number of trades") + PnLCmd.Flags().String("since", "", "query trades from a time point") + PnLCmd.Flags().Uint64("limit", 0, "number of trades") RootCmd.AddCommand(PnLCmd) } @@ -34,11 +34,15 @@ var PnLCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - sessionName, err := cmd.Flags().GetString("session") + sessionNames, err := cmd.Flags().GetStringArray("session") if err != nil { return err } + if len(sessionNames) == 0 { + return errors.New("--session [SESSION] is required") + } + symbol, err := cmd.Flags().GetString("symbol") if err != nil { return err @@ -48,41 +52,6 @@ var PnLCmd = &cobra.Command{ return errors.New("--symbol [SYMBOL] is required") } - limit, err := cmd.Flags().GetInt("limit") - if err != nil { - return err - } - - environ := bbgo.NewEnvironment() - - if err := environ.ConfigureDatabase(ctx); err != nil { - return err - } - - if err := environ.ConfigureExchangeSessions(userConfig); err != nil { - return err - } - - session, ok := environ.Session(sessionName) - if !ok { - return fmt.Errorf("session %s not found", sessionName) - } - - if err := environ.SyncSession(ctx, session, symbol); err != nil { - return err - } - - if err = environ.Init(ctx); err != nil { - return err - } - - exchange := session.Exchange - - market, ok := session.Market(symbol) - if !ok { - return fmt.Errorf("market config %s not found", symbol) - } - // this is the default since since := time.Now().AddDate(-1, 0, 0) @@ -106,36 +75,71 @@ var PnLCmd = &cobra.Command{ return err } - if includeTransfer { - transferService, ok := exchange.(types.ExchangeTransferService) + limit, err := cmd.Flags().GetUint64("limit") + if err != nil { + return err + } + + environ := bbgo.NewEnvironment() + + if err := environ.ConfigureDatabase(ctx); err != nil { + return err + } + + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + for _, sessionName := range sessionNames { + session, ok := environ.Session(sessionName) if !ok { - return fmt.Errorf("session exchange %s does not implement transfer service", sessionName) + return fmt.Errorf("session %s not found", sessionName) } - deposits, err := transferService.QueryDepositHistory(ctx, market.BaseCurrency, since, until) - if err != nil { - return err - } - _ = deposits - - withdrawals, err := transferService.QueryWithdrawHistory(ctx, market.BaseCurrency, since, until) - if err != nil { + if err := environ.SyncSession(ctx, session, symbol); err != nil { return err } - sort.Slice(withdrawals, func(i, j int) bool { - a := withdrawals[i].ApplyTime.Time() - b := withdrawals[j].ApplyTime.Time() - return a.Before(b) - }) + if includeTransfer { + exchange := session.Exchange + market, _ := session.Market(symbol) + transferService, ok := exchange.(types.ExchangeTransferService) + if !ok { + return fmt.Errorf("session exchange %s does not implement transfer service", sessionName) + } - // we need the backtest klines for the daily prices - backtestService := &service.BacktestService{DB: environ.DatabaseService.DB} - if err := backtestService.Sync(ctx, exchange, symbol, types.Interval1d, since, until); err != nil { - return err + deposits, err := transferService.QueryDepositHistory(ctx, market.BaseCurrency, since, until) + if err != nil { + return err + } + _ = deposits + + withdrawals, err := transferService.QueryWithdrawHistory(ctx, market.BaseCurrency, since, until) + if err != nil { + return err + } + + sort.Slice(withdrawals, func(i, j int) bool { + a := withdrawals[i].ApplyTime.Time() + b := withdrawals[j].ApplyTime.Time() + return a.Before(b) + }) + + // we need the backtest klines for the daily prices + backtestService := &service.BacktestService{DB: environ.DatabaseService.DB} + if err := backtestService.Sync(ctx, exchange, symbol, types.Interval1d, since, until); err != nil { + return err + } } } + if err = environ.Init(ctx); err != nil { + return err + } + + session, _ := environ.Session(sessionNames[0]) + exchange := session.Exchange + var trades []types.Trade tradingFeeCurrency := exchange.PlatformFeeCurrency() if strings.HasPrefix(symbol, tradingFeeCurrency) { @@ -143,8 +147,10 @@ var PnLCmd = &cobra.Command{ trades, err = environ.TradeService.QueryForTradingFeeCurrency(exchange.Name(), symbol, tradingFeeCurrency) } else { trades, err = environ.TradeService.Query(service.QueryTradesOptions{ - Symbol: symbol, - Limit: limit, + Symbol: symbol, + Limit: limit, + Sessions: sessionNames, + Since: &since, }) } diff --git a/pkg/service/trade.go b/pkg/service/trade.go index 4d07f0ac9..eaf782de1 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -20,12 +20,14 @@ var ErrTradeNotFound = errors.New("trade not found") type QueryTradesOptions struct { Exchange types.ExchangeName + Sessions []string Symbol string LastGID int64 + Since *time.Time // ASC or DESC Ordering string - Limit int + Limit uint64 } type TradingVolume struct { @@ -295,13 +297,43 @@ func (s *TradeService) QueryForTradingFeeCurrency(ex types.ExchangeName, symbol } func (s *TradeService) Query(options QueryTradesOptions) ([]types.Trade, error) { - sql := queryTradesSQL(options) - args := map[string]interface{}{ - "exchange": options.Exchange, - "symbol": options.Symbol, + sel := sq.Select("*"). + From("trades") + + if options.Since != nil { + sel = sel.Where(sq.GtOrEq{"traded_at": options.Since}) } - rows, err := s.DB.NamedQuery(sql, args) + sel = sel.Where(sq.Eq{"symbol": options.Symbol}) + + if options.Exchange != "" { + sel = sel.Where(sq.Eq{"exchange": options.Exchange}) + } + + if len(options.Sessions) > 0 { + // FIXME: right now we only have the exchange field in the db, we might need to add the session field too. + sel = sel.Where(sq.Eq{"exchange": options.Sessions}) + } + + if options.Ordering != "" { + sel = sel.OrderBy("traded_at " + options.Ordering) + } else { + sel = sel.OrderBy("traded_at ASC") + } + + if options.Limit > 0 { + sel = sel.Limit(options.Limit) + } + + sql, args, err := sel.ToSql() + if err != nil { + return nil, err + } + + log.Debug(sql) + log.Debug(args) + + rows, err := s.DB.Queryx(sql, args...) if err != nil { return nil, err } @@ -408,7 +440,7 @@ func queryTradesSQL(options QueryTradesOptions) string { sql += ` ORDER BY gid ` + ordering if options.Limit > 0 { - sql += ` LIMIT ` + strconv.Itoa(options.Limit) + sql += ` LIMIT ` + strconv.FormatUint(options.Limit, 10) } return sql diff --git a/pkg/strategy/dca/strategy.go b/pkg/strategy/dca/strategy.go index e8dd92774..f0e86aa53 100644 --- a/pkg/strategy/dca/strategy.go +++ b/pkg/strategy/dca/strategy.go @@ -92,7 +92,7 @@ func (s *Strategy) InstanceID() string { return fmt.Sprintf("%s:%s", ID, s.Symbol) } -func (s *Strategy) Run(ctx context.Context, session *bbgo.ExchangeSession) error { +func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { if s.BudgetQuota.IsZero() { s.BudgetQuota = s.Budget } diff --git a/pkg/strategy/rsmaker/strategy.go b/pkg/strategy/rsmaker/strategy.go index b4ad560b3..0d0c5a8fe 100644 --- a/pkg/strategy/rsmaker/strategy.go +++ b/pkg/strategy/rsmaker/strategy.go @@ -402,7 +402,7 @@ func (s *Strategy) adjustOrderQuantity(submitOrder types.SubmitOrder) types.Subm return submitOrder } -func (s *Strategy) Run(ctx context.Context, session *bbgo.ExchangeSession) error { +func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { instanceID := fmt.Sprintf("%s-%s", ID, s.Symbol) s.status = types.StrategyStatusRunning