implement backtest command, stream and add backtest config

This commit is contained in:
c9s 2020-11-07 02:57:50 +08:00
parent 8823a39fc2
commit 22a214328d
15 changed files with 421 additions and 75 deletions

View File

@ -43,6 +43,20 @@ sessions:
exchange: binance
envVarPrefix: binance
backtest:
# for testing max draw down (MDD) at 03-12
# see here for more details
# https://www.investopedia.com/terms/m/maximum-drawdown-mdd.asp
startTime: "2020-01-01"
account:
makerCommission: 15
takerCommission: 15
buyerCommission: 0
sellerCommission: 0
balances:
BTC: 1.0
USDT: 5000.0
exchangeStrategies:
- on: binance
buyandhold:

106
pkg/backtest/exchange.go Normal file
View File

@ -0,0 +1,106 @@
package backtest
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/exchange/binance"
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
type Exchange struct {
sourceExchange types.ExchangeName
publicExchange types.Exchange
srv *service.BacktestService
startTime time.Time
closedOrders []types.SubmitOrder
openOrders []types.SubmitOrder
}
func NewExchange(sourceExchange types.ExchangeName, srv *service.BacktestService, startTime time.Time) *Exchange {
ex, err := newPublicExchange(sourceExchange)
if err != nil {
panic(err)
}
return &Exchange{
sourceExchange: sourceExchange,
publicExchange: ex,
srv: srv,
startTime: startTime,
}
}
func (e *Exchange) NewStream() types.Stream {
// TODO: return the stream and feed the data
return &Stream{}
}
func (e Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
panic("implement me")
}
func (e Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) {
panic("implement me")
}
func (e Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
panic("implement me")
}
func (e Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error {
panic("implement me")
}
func (e Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
panic("implement me")
}
func (e Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, error) {
panic("implement me")
}
func (e Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
return e.publicExchange.QueryKLines(ctx, symbol, interval, options)
}
func (e Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) {
// we don't need query trades for backtest
return nil, nil
}
func (e Exchange) Name() types.ExchangeName {
return e.publicExchange.Name()
}
func (e Exchange) PlatformFeeCurrency() string {
return e.publicExchange.PlatformFeeCurrency()
}
func (e Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
return e.publicExchange.QueryMarkets(ctx)
}
func (e Exchange) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []types.Deposit, err error) {
return nil, nil
}
func (e Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) {
return nil, nil
}
func newPublicExchange(sourceExchange types.ExchangeName) (types.Exchange, error) {
switch sourceExchange {
case types.ExchangeBinance:
return binance.New("", ""), nil
case types.ExchangeMax:
return max.New("", ""), nil
}
return nil, errors.Errorf("exchange %s is not supported", sourceExchange)
}

View File

@ -3,16 +3,57 @@ package backtest
import (
"context"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/types"
)
type Stream struct {
types.StandardStream
exchange *Exchange
}
func (s *Stream) Connect(ctx context.Context) error {
loadedSymbols := map[string]struct{}{}
loadedIntervals := map[types.Interval]struct{}{}
for _, sub := range s.Subscriptions {
loadedSymbols[sub.Symbol] = struct{}{}
switch sub.Channel {
case types.KLineChannel:
loadedIntervals[types.Interval(sub.Options.Interval)] = struct{}{}
default:
return errors.Errorf("stream channel %s is not supported in backtest", sub.Channel)
}
}
var symbols []string
for symbol := range loadedSymbols {
symbols = append(symbols, symbol)
}
var intervals []types.Interval
for interval := range loadedIntervals {
intervals = append(intervals, interval)
}
// TODO: we can sync before we connect
/*
if err := backtestService.Sync(ctx, exchange, symbol, startTime); err != nil {
return err
}
*/
klineC, errC := s.exchange.srv.QueryKLinesCh(s.exchange.startTime, s.exchange, symbols, intervals)
for k := range klineC {
s.EmitKLineClosed(k)
}
if err := <-errC; err != nil {
return err
}
return nil
}
@ -20,16 +61,6 @@ func (s *Stream) Close() error {
return nil
}
type Trader struct {
// Context is trading Context
Context *bbgo.Context
SourceKLines []types.KLine
ProfitAndLossCalculator *pnl.AverageCostCalculator
doneOrders []types.SubmitOrder
pendingOrders []types.SubmitOrder
}
/*
func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy SingleExchangeStrategy) (chan struct{}, error) {
logrus.Infof("[regression] number of kline data: %d", len(trader.SourceKLines))

View File

@ -7,6 +7,8 @@ import (
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
type PnLReporterConfig struct {
@ -50,9 +52,26 @@ type Session struct {
EnvVarPrefix string `json:"envVarPrefix" yaml:"envVarPrefix"`
}
type Backtest struct {
StartTime string `json:"startTime" yaml:"startTime"`
Account BacktestAccount `json:"account" yaml:"account"`
}
type BacktestAccount struct {
MakerCommission int `json:"makerCommission"`
TakerCommission int `json:"takerCommission"`
BuyerCommission int `json:"buyerCommission"`
SellerCommission int `json:"sellerCommission"`
Balances BacktestAccountBalanceMap `json:"balances" yaml:"balances"`
}
type BacktestAccountBalanceMap map[string]fixedpoint.Value
type Config struct {
Imports []string `json:"imports" yaml:"imports"`
Backtest *Backtest `json:"backtest,omitempty" yaml:"backtest,omitempty"`
Notifications *NotificationConfig `json:"notifications,omitempty" yaml:"notifications,omitempty"`
Sessions map[string]Session `json:"sessions,omitempty" yaml:"sessions,omitempty"`

View File

@ -59,6 +59,7 @@ func TestLoadConfig(t *testing.T) {
assert.Len(t, config.ExchangeStrategies, 1)
},
},
{
name: "order_executor",
args: args{configFile: "testdata/order_executor.yaml"},
@ -85,6 +86,19 @@ func TestLoadConfig(t *testing.T) {
assert.NotNil(t, executorConf)
},
},
{
name: "backtest",
args: args{configFile: "testdata/backtest.yaml"},
wantErr: false,
f: func(t *testing.T, config *Config) {
assert.Len(t, config.ExchangeStrategies, 1)
assert.NotNil(t, config.Backtest)
assert.NotNil(t, config.Backtest.Account)
assert.NotNil(t, config.Backtest.Account.Balances)
assert.Len(t, config.Backtest.Account.Balances, 2)
assert.NotEmpty(t, config.Backtest.StartTime)
},
},
}
for _, tt := range tests {
@ -107,4 +121,5 @@ func TestLoadConfig(t *testing.T) {
}
})
}
}

View File

@ -29,6 +29,8 @@ func RegisterStrategy(key string, s interface{}) {
}
}
var emptyTime time.Time
// Environment presents the real exchange data layer
type Environment struct {
// Notifiability here for environment is for the streaming data notification
@ -38,6 +40,8 @@ type Environment struct {
TradeService *service.TradeService
TradeSync *service.SyncService
// startTime is the time of start point (which is used in the backtest)
startTime time.Time
tradeScanTime time.Time
sessions map[string]*ExchangeSession
}
@ -109,13 +113,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
}
session.Trades[symbol] = trades
averagePrice, err := session.Exchange.QueryAveragePrice(ctx, symbol)
if err != nil {
return err
}
session.lastPrices[symbol] = averagePrice
session.lastPrices[symbol] = 0.0
marketDataStore := NewMarketDataStore(symbol)
marketDataStore.BindStream(session.Stream)
@ -125,29 +123,6 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
session.standardIndicatorSets[symbol] = standardIndicatorSet
}
now := time.Now()
for symbol := range session.loadedSymbols {
marketDataStore, ok := session.marketDataStores[symbol]
if !ok {
return errors.Errorf("symbol %s is not defined", symbol)
}
for interval := range types.SupportedIntervals {
kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
EndTime: &now,
Limit: 500, // indicators need at least 100
})
if err != nil {
return err
}
for _, k := range kLines {
// let market data store trigger the update, so that the indicator could be updated too.
marketDataStore.AddKLine(k)
}
}
}
log.Infof("querying balances...")
balances, err := session.Exchange.QueryAccountBalances(ctx)
if err != nil {
@ -164,6 +139,53 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
session.marketDataStores[kline.Symbol].AddKLine(kline)
})
session.Stream.OnTradeUpdate(func(trade types.Trade) {
session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], trade)
})
// feed klines into the market data store
if environ.startTime == emptyTime {
environ.startTime = time.Now()
}
for symbol := range session.loadedSymbols {
marketDataStore, ok := session.marketDataStores[symbol]
if !ok {
return errors.Errorf("symbol %s is not defined", symbol)
}
var lastPriceTime time.Time
for interval := range types.SupportedIntervals {
kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
EndTime: &environ.startTime,
Limit: 500, // indicators need at least 100
})
if err != nil {
return err
}
if len(kLines) == 0 {
log.Warnf("no kline data for interval %s", interval)
continue
}
// update last prices by the given kline
lastKLine := kLines[len(kLines) - 1]
if lastPriceTime == emptyTime {
session.lastPrices[symbol] = lastKLine.Close
lastPriceTime = lastKLine.EndTime
} else if lastPriceTime.Before(lastKLine.EndTime) {
session.lastPrices[symbol] = lastKLine.Close
lastPriceTime = lastKLine.EndTime
}
for _, k := range kLines {
// let market data store trigger the update, so that the indicator could be updated too.
marketDataStore.AddKLine(k)
}
}
}
if environ.TradeService != nil {
session.Stream.OnTradeUpdate(func(trade types.Trade) {
if err := environ.TradeService.Insert(trade); err != nil {
@ -172,12 +194,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
})
}
session.Stream.OnTradeUpdate(func(trade types.Trade) {
// append trades
session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], trade)
})
// move market data store dispatch to here, use one callback to dispatch the market data
// TODO: move market data store dispatch to here, use one callback to dispatch the market data
// session.Stream.OnKLineClosed(func(kline types.KLine) { })
}
@ -355,4 +372,3 @@ func (environ *Environment) Connect(ctx context.Context) error {
return nil
}

33
pkg/bbgo/testdata/backtest.yaml vendored Normal file
View File

@ -0,0 +1,33 @@
---
sessions:
max:
exchange: max
envVarPrefix: max
binance:
exchange: binance
envVarPrefix: binance
backtest:
# for testing max draw down (MDD) at 03-12
# see here for more details
# https://www.investopedia.com/terms/m/maximum-drawdown-mdd.asp
startTime: "2020-01-01"
account:
makerCommission: 15
takerCommission: 15
buyerCommission: 0
sellerCommission: 0
balances:
BTC: 1.0
USDT: 5000.0
exchangeStrategies:
- on: binance
test:
symbol: "BTCUSDT"
interval: "1m"
baseQuantity: 0.1
minDropPercentage: -0.05

View File

@ -9,19 +9,10 @@ import (
"github.com/c9s/bbgo/pkg/types"
_ "github.com/go-sql-driver/mysql"
flag "github.com/spf13/pflag"
)
var SupportedExchanges = []types.ExchangeName{"binance", "max"}
// PersistentFlags defines the flags for environments
func PersistentFlags(flags *flag.FlagSet) {
flags.String("binance-api-key", "", "binance api key")
flags.String("binance-api-secret", "", "binance api secret")
flags.String("max-api-key", "", "max api key")
flags.String("max-api-secret", "", "max api secret")
}
// SingleExchangeStrategy represents the single Exchange strategy
type SingleExchangeStrategy interface {
Run(ctx context.Context, orderExecutor OrderExecutor, session *ExchangeSession) error

106
pkg/cmd/backtest.go Normal file
View File

@ -0,0 +1,106 @@
package cmd
import (
"context"
"syscall"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/backtest"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
func init() {
BacktestCmd.Flags().String("exchange", "", "target exchange")
BacktestCmd.Flags().String("start", "", "start time")
BacktestCmd.Flags().Bool("backtest", true, "sync backtest data")
BacktestCmd.Flags().String("config", "config/bbgo.yaml", "strategy config file")
RootCmd.AddCommand(BacktestCmd)
}
var BacktestCmd = &cobra.Command{
Use: "backtest",
Short: "backtest your strategies",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
configFile, err := cmd.Flags().GetString("config")
if err != nil {
return err
}
if len(configFile) == 0 {
return errors.New("--config option is required")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
userConfig, err := bbgo.Load(configFile)
if err != nil {
return err
}
exchangeNameStr, err := cmd.Flags().GetString("exchange")
if err != nil {
return err
}
exchangeName, err := types.ValidExchangeName(exchangeNameStr)
if err != nil {
return err
}
db, err := cmdutil.ConnectMySQL()
if err != nil {
return err
}
// set default start time to the past 6 months
startTime := time.Now().AddDate(0, -6, 0)
startTimeArg, err := cmd.Flags().GetString("start")
if err != nil {
return err
}
if len(startTimeArg) > 0 {
startTime, err = time.Parse("2006-01-02", startTimeArg)
if err != nil {
return err
}
}
backtestService := &service.BacktestService{DB: db}
exchange := backtest.NewExchange(exchangeName, backtestService, startTime)
environ := bbgo.NewEnvironment()
environ.AddExchange(exchangeName.String(), exchange)
trader := bbgo.NewTrader(environ)
if userConfig.RiskControls != nil {
trader.SetRiskControls(userConfig.RiskControls)
}
for _, entry := range userConfig.ExchangeStrategies {
log.Infof("attaching strategy %T on %s instead of %v", entry.Strategy, exchangeName.String(), entry.Mounts)
trader.AttachStrategyOn(exchangeName.String(), entry.Strategy)
}
if len(userConfig.CrossExchangeStrategies) > 0 {
log.Warnf("backtest does not support CrossExchangeStrategy, strategies won't be added.")
}
if err := trader.Run(ctx) ; err != nil {
return err
}
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil
},
}

11
pkg/cmd/cmdutil/flags.go Normal file
View File

@ -0,0 +1,11 @@
package cmdutil
import "github.com/spf13/pflag"
// PersistentFlags defines the flags for environments
func PersistentFlags(flags *pflag.FlagSet) {
flags.String("binance-api-key", "", "binance api key")
flags.String("binance-api-secret", "", "binance api secret")
flags.String("max-api-key", "", "max api key")
flags.String("max-api-secret", "", "max api secret")
}

View File

@ -5,6 +5,7 @@ import (
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -118,8 +119,17 @@ var PnLCmd = &cobra.Command{
logrus.Infof("found checkpoints: %+v", checkpoints)
logrus.Infof("stock: %f", stockManager.Stocks.Quantity())
currentPrice, err := exchange.QueryAveragePrice(ctx, symbol)
now := time.Now()
kLines, err := exchange.QueryKLines(ctx, symbol, types.Interval1m, types.KLineQueryOptions{
Limit: 100,
EndTime: &now,
})
if len(kLines) == 0 {
return errors.New("no kline data for current price")
}
currentPrice := kLines[len(kLines) - 1].Close
calculator := &pnl.AverageCostCalculator{
TradingFeeCurrency: tradingFeeCurrency,
}

View File

@ -91,7 +91,7 @@ var SyncCmd = &cobra.Command{
for interval := range types.SupportedIntervals {
log.Infof("verifying %s kline data...", interval)
klineC, errC := backtestService.QueryKLinesCh(startTime, exchange, symbol, interval)
klineC, errC := backtestService.QueryKLinesCh(startTime, exchange, []string{symbol}, []types.Interval{interval})
var emptyKLine types.KLine
var prevKLine types.KLine
for k := range klineC {

View File

@ -410,14 +410,10 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
limit = options.Limit
}
i, err := maxapi.ParseInterval(string(interval))
if err != nil {
return nil, err
}
// workaround for the kline query
// workaround for the kline query, because MAX does not support query by end time
// so we need to use the given end time and the limit number to calculate the start time
if options.EndTime != nil && options.StartTime == nil {
startTime := options.EndTime.Add(- time.Duration(limit) * time.Minute * time.Duration(i))
startTime := options.EndTime.Add(- time.Duration(limit) * interval.Duration())
options.StartTime = &startTime
}

View File

@ -83,13 +83,13 @@ func (s *BacktestService) QueryLast(ex types.ExchangeName, symbol string, interv
return nil, rows.Err()
}
func (s *BacktestService) QueryKLinesCh(since time.Time, exchange types.Exchange, symbol string, intervals ...types.Interval) (chan types.KLine, chan error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :since AND `symbol` = :symbol AND `interval` IN (:intervals) ORDER BY end_time ASC"
func (s *BacktestService) QueryKLinesCh(since time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval) (chan types.KLine, chan error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :since AND `symbol` IN (:symbols) AND `interval` IN (:intervals) ORDER BY end_time ASC"
sql = strings.ReplaceAll(sql, "binance_klines", exchange.Name().String()+"_klines")
sql, args, err := sqlx.Named(sql, map[string]interface{}{
"since": since,
"symbol": symbol,
"symbols": symbols,
"intervals": types.IntervalSlice(intervals),
})
sql, args, err = sqlx.In(sql, args...)

View File

@ -48,8 +48,6 @@ type Exchange interface {
QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error)
QueryAveragePrice(ctx context.Context, symbol string) (float64, error)
QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error)
QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error)