From 908388144276aba8d7b2ba84eceff411ac880e56 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 4 Jun 2022 02:23:23 +0800 Subject: [PATCH] refactor exchange factory and solve the incorrect pkg import dependency from ftx --- examples/create-self-trade/main.go | 3 +- pkg/bbgo/environment.go | 4 +- pkg/bbgo/session.go | 26 +-- pkg/cmd/backtest.go | 3 +- pkg/cmd/cmdutil/exchange.go | 63 ------- pkg/exchange/factory.go | 65 ++++++++ pkg/exchange/ftx/stream.go | 6 +- .../websocketbase/client.go} | 4 +- .../websocketclientbase_callbacks.go | 2 +- pkg/service/backtest.go | 154 ++++++++++++++++-- pkg/service/backtest_test.go | 58 +++++++ 11 files changed, 287 insertions(+), 101 deletions(-) create mode 100644 pkg/exchange/factory.go rename pkg/{service/websocket.go => net/websocketbase/client.go} (93%) rename pkg/{service => net/websocketbase}/websocketclientbase_callbacks.go (98%) create mode 100644 pkg/service/backtest_test.go diff --git a/examples/create-self-trade/main.go b/examples/create-self-trade/main.go index 42c3d236c..d8533b463 100644 --- a/examples/create-self-trade/main.go +++ b/examples/create-self-trade/main.go @@ -14,6 +14,7 @@ import ( "github.com/spf13/viper" "github.com/c9s/bbgo/pkg/cmd/cmdutil" + exchange2 "github.com/c9s/bbgo/pkg/exchange" "github.com/c9s/bbgo/pkg/types" ) @@ -52,7 +53,7 @@ var rootCmd = &cobra.Command{ return err } - exchange, err := cmdutil.NewExchange(exchangeName) + exchange, err := exchange2.New(exchangeName) if err != nil { return err } diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index e60544c7c..76b63fd26 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -21,7 +21,7 @@ import ( "github.com/spf13/viper" "gopkg.in/tucnak/telebot.v2" - "github.com/c9s/bbgo/pkg/cmd/cmdutil" + exchange2 "github.com/c9s/bbgo/pkg/exchange" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/interact" "github.com/c9s/bbgo/pkg/notifier/slacknotifier" @@ -221,7 +221,7 @@ func (environ *Environment) ConfigureExchangeSessions(userConfig *Config) error func (environ *Environment) AddExchangesByViperKeys() error { for _, n := range types.SupportedExchanges { if viper.IsSet(string(n) + "-api-key") { - exchange, err := cmdutil.NewExchangeWithEnvVarPrefix(n, "") + exchange, err := exchange2.NewWithEnvVarPrefix(n, "") if err != nil { return err } diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index d2c0ebad5..1c7ae02d8 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -9,13 +9,13 @@ import ( "github.com/slack-go/slack" - "github.com/c9s/bbgo/pkg/cache" - "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/spf13/viper" - "github.com/c9s/bbgo/pkg/cmd/cmdutil" + "github.com/c9s/bbgo/pkg/cache" + + exchange2 "github.com/c9s/bbgo/pkg/exchange" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/indicator" "github.com/c9s/bbgo/pkg/service" @@ -740,17 +740,17 @@ func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err err // InitExchange initialize the exchange instance and allocate memory for fields // In this stage, the session var could be loaded from the JSON config, so the pointer fields are still nil // The Init method will be called after this stage, environment.Init will call the session.Init method later. -func (session *ExchangeSession) InitExchange(name string, exchange types.Exchange) error { +func (session *ExchangeSession) InitExchange(name string, ex types.Exchange) error { var err error var exchangeName = session.ExchangeName - if exchange == nil { + if ex == nil { if session.PublicOnly { - exchange, err = cmdutil.NewExchangePublic(exchangeName) + ex, err = exchange2.NewPublic(exchangeName) } else { if session.Key != "" && session.Secret != "" { - exchange, err = cmdutil.NewExchangeStandard(exchangeName, session.Key, session.Secret, session.Passphrase, session.SubAccount) + ex, err = exchange2.NewStandard(exchangeName, session.Key, session.Secret, session.Passphrase, session.SubAccount) } else { - exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, session.EnvVarPrefix) + ex, err = exchange2.NewWithEnvVarPrefix(exchangeName, session.EnvVarPrefix) } } } @@ -761,7 +761,7 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang // configure exchange if session.Margin { - marginExchange, ok := exchange.(types.MarginExchange) + marginExchange, ok := ex.(types.MarginExchange) if !ok { return fmt.Errorf("exchange %s does not support margin", exchangeName) } @@ -774,7 +774,7 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang } if session.Futures { - futuresExchange, ok := exchange.(types.FuturesExchange) + futuresExchange, ok := ex.(types.FuturesExchange) if !ok { return fmt.Errorf("exchange %s does not support futures", exchangeName) } @@ -792,9 +792,9 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang SessionChannelRouter: NewPatternChannelRouter(nil), ObjectChannelRouter: NewObjectChannelRouter(), } - session.Exchange = exchange - session.UserDataStream = exchange.NewStream() - session.MarketDataStream = exchange.NewStream() + session.Exchange = ex + session.UserDataStream = ex.NewStream() + session.MarketDataStream = ex.NewStream() session.MarketDataStream.SetPublicOnly() // pointer fields diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index 021c62bd0..27ce52c44 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -22,6 +22,7 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/data/tsv" + "github.com/c9s/bbgo/pkg/exchange" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" @@ -186,7 +187,7 @@ var BacktestCmd = &cobra.Command{ return err } - publicExchange, err := cmdutil.NewExchangePublic(exName) + publicExchange, err := exchange.NewPublic(exName) if err != nil { return err } diff --git a/pkg/cmd/cmdutil/exchange.go b/pkg/cmd/cmdutil/exchange.go index a3c2c5e28..3baca0880 100644 --- a/pkg/cmd/cmdutil/exchange.go +++ b/pkg/cmd/cmdutil/exchange.go @@ -1,65 +1,2 @@ package cmdutil -import ( - "fmt" - "os" - "strings" - - "github.com/c9s/bbgo/pkg/exchange/binance" - "github.com/c9s/bbgo/pkg/exchange/ftx" - "github.com/c9s/bbgo/pkg/exchange/kucoin" - "github.com/c9s/bbgo/pkg/exchange/max" - "github.com/c9s/bbgo/pkg/exchange/okex" - "github.com/c9s/bbgo/pkg/types" -) - -func NewExchangePublic(exchangeName types.ExchangeName) (types.Exchange, error) { - return NewExchangeStandard(exchangeName, "", "", "", "") -} - -func NewExchangeStandard(n types.ExchangeName, key, secret, passphrase, subAccount string) (types.Exchange, error) { - switch n { - - case types.ExchangeFTX: - return ftx.NewExchange(key, secret, subAccount), nil - - case types.ExchangeBinance: - return binance.New(key, secret), nil - - case types.ExchangeMax: - return max.New(key, secret), nil - - case types.ExchangeOKEx: - return okex.New(key, secret, passphrase), nil - - case types.ExchangeKucoin: - return kucoin.New(key, secret, passphrase), nil - - default: - return nil, fmt.Errorf("unsupported exchange: %v", n) - - } -} - -func NewExchangeWithEnvVarPrefix(n types.ExchangeName, varPrefix string) (types.Exchange, error) { - if len(varPrefix) == 0 { - varPrefix = n.String() - } - - varPrefix = strings.ToUpper(varPrefix) - - key := os.Getenv(varPrefix + "_API_KEY") - secret := os.Getenv(varPrefix + "_API_SECRET") - if len(key) == 0 || len(secret) == 0 { - return nil, fmt.Errorf("can not initialize exchange %s: empty key or secret, env var prefix: %s", n, varPrefix) - } - - passphrase := os.Getenv(varPrefix + "_API_PASSPHRASE") - subAccount := os.Getenv(varPrefix + "_SUBACCOUNT") - return NewExchangeStandard(n, key, secret, passphrase, subAccount) -} - -// NewExchange constructor exchange object from viper config. -func NewExchange(n types.ExchangeName) (types.Exchange, error) { - return NewExchangeWithEnvVarPrefix(n, "") -} diff --git a/pkg/exchange/factory.go b/pkg/exchange/factory.go new file mode 100644 index 000000000..d03f8654e --- /dev/null +++ b/pkg/exchange/factory.go @@ -0,0 +1,65 @@ +package exchange + +import ( + "fmt" + "os" + "strings" + + "github.com/c9s/bbgo/pkg/exchange/binance" + "github.com/c9s/bbgo/pkg/exchange/ftx" + "github.com/c9s/bbgo/pkg/exchange/kucoin" + "github.com/c9s/bbgo/pkg/exchange/max" + "github.com/c9s/bbgo/pkg/exchange/okex" + "github.com/c9s/bbgo/pkg/types" +) + +func NewPublic(exchangeName types.ExchangeName) (types.Exchange, error) { + return NewStandard(exchangeName, "", "", "", "") +} + +func NewStandard(n types.ExchangeName, key, secret, passphrase, subAccount string) (types.Exchange, error) { + switch n { + + case types.ExchangeFTX: + return ftx.NewExchange(key, secret, subAccount), nil + + case types.ExchangeBinance: + return binance.New(key, secret), nil + + case types.ExchangeMax: + return max.New(key, secret), nil + + case types.ExchangeOKEx: + return okex.New(key, secret, passphrase), nil + + case types.ExchangeKucoin: + return kucoin.New(key, secret, passphrase), nil + + default: + return nil, fmt.Errorf("unsupported exchange: %v", n) + + } +} + +func NewWithEnvVarPrefix(n types.ExchangeName, varPrefix string) (types.Exchange, error) { + if len(varPrefix) == 0 { + varPrefix = n.String() + } + + varPrefix = strings.ToUpper(varPrefix) + + key := os.Getenv(varPrefix + "_API_KEY") + secret := os.Getenv(varPrefix + "_API_SECRET") + if len(key) == 0 || len(secret) == 0 { + return nil, fmt.Errorf("can not initialize exchange %s: empty key or secret, env var prefix: %s", n, varPrefix) + } + + passphrase := os.Getenv(varPrefix + "_API_PASSPHRASE") + subAccount := os.Getenv(varPrefix + "_SUBACCOUNT") + return NewStandard(n, key, secret, passphrase, subAccount) +} + +// New constructor exchange object from viper config. +func New(n types.ExchangeName) (types.Exchange, error) { + return NewWithEnvVarPrefix(n, "") +} diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index 9909da399..4e6650c5d 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -9,7 +9,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/c9s/bbgo/pkg/service" + "github.com/c9s/bbgo/pkg/net/websocketbase" "github.com/c9s/bbgo/pkg/types" ) @@ -18,7 +18,7 @@ const endpoint = "wss://ftx.com/ws/" type Stream struct { *types.StandardStream - ws *service.WebsocketClientBase + ws *websocketbase.WebsocketClientBase exchange *Exchange key string @@ -42,7 +42,7 @@ func NewStream(key, secret string, subAccount string, e *Exchange) *Stream { secret: secret, subAccount: subAccount, StandardStream: &types.StandardStream{}, - ws: service.NewWebsocketClientBase(endpoint, 3*time.Second), + ws: websocketbase.NewWebsocketClientBase(endpoint, 3*time.Second), } s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage) diff --git a/pkg/service/websocket.go b/pkg/net/websocketbase/client.go similarity index 93% rename from pkg/service/websocket.go rename to pkg/net/websocketbase/client.go index 45cbfde91..0754777f1 100644 --- a/pkg/service/websocket.go +++ b/pkg/net/websocketbase/client.go @@ -1,4 +1,4 @@ -package service +package websocketbase import ( "context" @@ -8,6 +8,8 @@ import ( "github.com/gorilla/websocket" ) +// WebsocketClientBase is a legacy base client +// Deprecated: please use standard stream instead. //go:generate callbackgen -type WebsocketClientBase type WebsocketClientBase struct { baseURL string diff --git a/pkg/service/websocketclientbase_callbacks.go b/pkg/net/websocketbase/websocketclientbase_callbacks.go similarity index 98% rename from pkg/service/websocketclientbase_callbacks.go rename to pkg/net/websocketbase/websocketclientbase_callbacks.go index b7afc5a0d..444535785 100644 --- a/pkg/service/websocketclientbase_callbacks.go +++ b/pkg/net/websocketbase/websocketclientbase_callbacks.go @@ -1,6 +1,6 @@ // Code generated by "callbackgen -type WebsocketClientBase"; DO NOT EDIT. -package service +package websocketbase import ( "github.com/gorilla/websocket" diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index fdfb6e43d..cc686525e 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -33,7 +33,8 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type tasks := []SyncTask{ { - Type: types.KLine{}, + Type: types.KLine{}, + Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, 100), Time: func(obj interface{}) time.Time { return obj.(types.KLine).StartTime.Time().UTC() }, @@ -41,7 +42,6 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type kline := obj.(types.KLine) return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10) }, - Select: SelectLastKLines(exchange.Name(), symbol, interval, 100), BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { q := &batch.KLineBatchQuery{Exchange: exchange} return q.Query(ctx, symbol, interval, startTime, endTime) @@ -54,13 +54,12 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type } for _, sel := range tasks { - if err := sel.execute(ctx, s.DB, startTime); err != nil { + if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil { return err } } return nil - } func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error { @@ -129,12 +128,11 @@ func (s *BacktestService) QueryFirstKLine(ex types.ExchangeName, symbol string, func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) { log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval) - tableName := s._targetKlineTable(ex) + tableName := targetKlineTable(ex) // make the SQL syntax IDE friendly, so that it can analyze it. - sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName) + sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName) rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ - "exchange": ex.String(), "interval": interval, "symbol": symbol, }) @@ -160,7 +158,7 @@ func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, inter // QueryKLinesForward is used for querying klines to back-testing func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) { - tableName := s._targetKlineTable(exchange) + tableName := targetKlineTable(exchange) sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit" sql = strings.ReplaceAll(sql, "binance_klines", tableName) @@ -179,7 +177,7 @@ func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol } func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) { - tableName := s._targetKlineTable(exchange) + tableName := targetKlineTable(exchange) sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit" sql = strings.ReplaceAll(sql, "binance_klines", tableName) @@ -205,7 +203,7 @@ func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.E return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. ")) } - tableName := s._targetKlineTable(exchange.Name()) + tableName := targetKlineTable(exchange.Name()) sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) and exchange = :exchange ORDER BY end_time ASC" sql = strings.ReplaceAll(sql, "binance_klines", tableName) @@ -288,7 +286,7 @@ func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err e return klines, rows.Err() } -func (s *BacktestService) _targetKlineTable(exchangeName types.ExchangeName) string { +func targetKlineTable(exchangeName types.ExchangeName) string { return strings.ToLower(exchangeName.String()) + "_klines" } @@ -299,7 +297,7 @@ func (s *BacktestService) Insert(kline types.KLine) error { return errExchangeFieldIsUnset } - tableName := s._targetKlineTable(kline.Exchange) + tableName := targetKlineTable(kline.Exchange) sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+ "VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName) @@ -313,7 +311,7 @@ func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error { return errors.New("kline.Exchange field should not be empty") } - tableName := s._targetKlineTable(k.Exchange) + tableName := targetKlineTable(k.Exchange) sql := fmt.Sprintf("DELETE FROM `%s` WHERE gid = :gid ", tableName) _, err := s.DB.NamedExec(sql, k) return err @@ -346,14 +344,138 @@ func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange return nil } +type TimeRange struct { + Start time.Time + End time.Time +} + +// SyncPartial +// find the existing data time range (t1, t2) +// scan if there is a missing part +// create a time range slice []TimeRange +// iterate the []TimeRange slice to sync data. +func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error { + t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until) + if err != nil { + return err + } + + timeRanges, err := s.FindMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time()) + if err != nil { + return err + } + _ = timeRanges + return nil +} + +// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points. +// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time. +func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) ([]TimeRange, error) { + query := SelectKLineTimePoints(ex.Name(), symbol, interval, since, until) + sql, args, err := query.ToSql() + if err != nil { + return nil, err + } + + rows, err := s.DB.QueryContext(ctx, sql, args...) + if err != nil { + return nil, err + } + + var timeRanges []TimeRange + var timePoints = make(map[int64]struct{}, 1000) // we can use this to find duplicates + var lastTime time.Time + for rows.Next() { + var tt types.Time + if err := rows.Scan(&tt); err != nil { + return nil, err + } + + var t = time.Time(tt) + if lastTime != (time.Time{}) && t.Sub(lastTime) > interval.Duration() { + timeRanges = append(timeRanges, TimeRange{ + Start: lastTime.Add(interval.Duration()), + End: t, + }) + } + + lastTime = t + timePoints[t.Unix()] = struct{}{} + } + + return timeRanges, nil +} + +func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time) (start, end *types.Time, err error) { + sel := SelectKLineTimeRange(ex.Name(), symbol, interval, tArgs...) + sql, args, err := sel.ToSql() + if err != nil { + return nil, nil, err + } + + var t1, t2 types.Time + + row := s.DB.QueryRowContext(ctx, sql, args...) + if err := row.Scan(&t1, &t2); err != nil { + return nil, nil, err + } + + if err := row.Err(); err != nil { + return nil, nil, err + } + + return &t1, &t2, nil +} + +func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder { + conditions := sq.And{ + sq.Eq{"symbol": symbol}, + sq.Eq{"`interval`": interval.String()}, + } + + if len(args) == 2 { + since := args[0] + until := args[1] + conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until)) + } + + tableName := targetKlineTable(ex) + + return sq.Select("start_time"). + From(tableName). + Where(conditions). + OrderBy("start_time ASC") +} + +// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until) +func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder { + conditions := sq.And{ + sq.Eq{"symbol": symbol}, + sq.Eq{"`interval`": interval.String()}, + } + + if len(args) == 2 { + since := args[0] + until := args[1] + conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until)) + } + + tableName := targetKlineTable(ex) + + return sq.Select("MIN(start_time) AS t1, MAX(start_time) AS t2"). + From(tableName). + Where(conditions) +} + // TODO: add is_futures column since the klines data is different -func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, limit uint64) sq.SelectBuilder { +func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit uint64) sq.SelectBuilder { + tableName := targetKlineTable(ex) return sq.Select("*"). - From(strings.ToLower(ex.String()) + "_klines"). + From(tableName). Where(sq.And{ sq.Eq{"symbol": symbol}, - sq.Eq{"exchange": ex}, sq.Eq{"`interval`": interval.String()}, + sq.GtOrEq{"`start_time`": startTime}, }). OrderBy("start_time DESC"). Limit(limit) diff --git a/pkg/service/backtest_test.go b/pkg/service/backtest_test.go new file mode 100644 index 000000000..4d670d40c --- /dev/null +++ b/pkg/service/backtest_test.go @@ -0,0 +1,58 @@ +package service + +import ( + "context" + "testing" + "time" + + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/exchange" + "github.com/c9s/bbgo/pkg/types" +) + +func TestBacktestService(t *testing.T) { + db, err := prepareDB(t) + if err != nil { + t.Fatal(err) + } + + defer db.Close() + + ctx := context.Background() + dbx := sqlx.NewDb(db.DB, "sqlite3") + + ex, err := exchange.NewPublic(types.ExchangeBinance) + assert.NoError(t, err) + + service := &BacktestService{DB: dbx} + + symbol := "BTCUSDT" + now := time.Now() + startTime1 := now.AddDate(0, 0, -6).Truncate(time.Hour) + endTime1 := now.AddDate(0, 0, -5).Truncate(time.Hour) + + startTime2 := now.AddDate(0, 0, -4).Truncate(time.Hour) + endTime2 := now.AddDate(0, 0, -3).Truncate(time.Hour) + + // kline query is exclusive + err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime1.Add(-time.Second), endTime1.Add(time.Second)) + assert.NoError(t, err) + + err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime2.Add(-time.Second), endTime2.Add(time.Second)) + assert.NoError(t, err) + + t1, t2, err := service.QueryExistingDataRange(ctx, ex, symbol, types.Interval1h) + if assert.NoError(t, err) { + assert.Equal(t, startTime1, t1.Time(), "start time point should match") + assert.Equal(t, endTime2, t2.Time(), "end time point should match") + } + + timeRanges, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2) + if assert.NoError(t, err) { + assert.NotEmpty(t, timeRanges) + assert.Len(t, timeRanges, 1, "should find one missing time range") + t.Logf("found timeRanges: %+v", timeRanges) + } +}