bbgo_origin/pkg/service/backtest.go

531 lines
16 KiB
Go
Raw Normal View History

package service
import (
"context"
"database/sql"
2021-12-13 23:15:18 +00:00
"fmt"
"strconv"
"strings"
"time"
2022-06-02 08:40:24 +00:00
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2022-05-30 16:59:33 +00:00
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
type BacktestService struct {
DB *sqlx.DB
}
2021-03-14 03:04:56 +00:00
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
log.Infof("synchronizing %s klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
2022-06-02 08:40:24 +00:00
// TODO: use isFutures here
_, _, isIsolated, isolatedSymbol := getExchangeAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
symbol = isolatedSymbol
}
2022-06-06 06:53:37 +00:00
if s.DB.DriverName() == "sqlite3" {
_, _ = s.DB.Exec("PRAGMA journal_mode = WAL")
_, _ = s.DB.Exec("PRAGMA synchronous = NORMAL")
}
2022-06-02 08:40:24 +00:00
tasks := []SyncTask{
{
Type: types.KLine{},
Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100),
2022-06-02 08:40:24 +00:00
Time: func(obj interface{}) time.Time {
return obj.(types.KLine).StartTime.Time()
2022-06-02 08:40:24 +00:00
},
ID: func(obj interface{}) string {
kline := obj.(types.KLine)
return strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
// return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
2022-06-02 08:40:24 +00:00
},
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
q := &batch.KLineBatchQuery{Exchange: exchange}
return q.Query(ctx, symbol, interval, startTime, endTime)
},
2022-06-06 17:21:27 +00:00
BatchInsertBuffer: 500,
2022-06-07 04:27:32 +00:00
BatchInsert: func(obj interface{}) error {
kLines := obj.([]types.KLine)
return s.BatchInsert(kLines)
},
Insert: func(obj interface{}) error {
kline := obj.(types.KLine)
return s.Insert(kline)
2022-06-02 08:40:24 +00:00
},
2022-06-06 04:24:18 +00:00
LogInsert: log.GetLevel() == log.DebugLevel,
2022-06-02 08:40:24 +00:00
},
}
2020-11-06 16:49:17 +00:00
2022-06-02 08:40:24 +00:00
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil {
2021-03-14 03:04:56 +00:00
return err
}
2021-03-14 03:04:56 +00:00
}
2022-06-02 08:40:24 +00:00
return nil
2021-03-14 03:04:56 +00:00
}
2020-11-06 16:49:17 +00:00
func (s *BacktestService) Verify(sourceExchange types.Exchange, symbols []string, startTime time.Time, endTime time.Time) error {
2021-12-14 16:59:28 +00:00
var corruptCnt = 0
for _, symbol := range symbols {
for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s backtesting data: %s to %s...", symbol, interval, startTime, endTime)
2022-06-05 22:02:38 +00:00
2022-06-06 09:21:31 +00:00
timeRanges, err := s.FindMissingTimeRanges(context.Background(), sourceExchange, symbol, interval,
startTime, endTime)
2022-06-05 22:02:38 +00:00
if err != nil {
return err
2021-12-14 16:59:28 +00:00
}
2021-12-13 23:15:18 +00:00
2022-06-05 22:02:38 +00:00
if len(timeRanges) == 0 {
continue
2021-12-14 16:59:28 +00:00
}
2021-12-13 23:15:18 +00:00
log.Warnf("%s %s found missing time ranges:", symbol, interval)
2022-06-05 22:02:38 +00:00
corruptCnt += len(timeRanges)
for _, timeRange := range timeRanges {
log.Warnf("- %s", timeRange.String())
2021-12-14 16:59:28 +00:00
}
}
2021-12-13 23:15:18 +00:00
}
2021-12-14 16:59:28 +00:00
log.Infof("backtest verification completed")
if corruptCnt > 0 {
log.Errorf("found %d corruptions", corruptCnt)
} else {
log.Infof("found %d corruptions", corruptCnt)
}
2022-05-11 05:59:44 +00:00
return nil
2021-12-14 16:59:28 +00:00
}
2022-06-06 09:21:31 +00:00
func (s *BacktestService) SyncFresh(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
2022-06-08 06:38:47 +00:00
log.Infof("starting fresh sync %s %s %s: %s <=> %s", exchange.Name(), symbol, interval, startTime, endTime)
2022-06-06 09:21:31 +00:00
startTime = startTime.Truncate(time.Minute).Add(-2 * time.Second)
endTime = endTime.Truncate(time.Minute).Add(2 * time.Second)
2021-12-14 16:59:28 +00:00
return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
}
// QueryKLine queries the klines from the database
func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
tableName := targetKlineTable(ex)
// make the SQL syntax IDE friendly, so that it can analyze it.
sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName)
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"interval": interval,
"symbol": symbol,
})
defer rows.Close()
if err != nil {
return nil, errors.Wrap(err, "query kline error")
}
if rows.Err() != nil {
return nil, rows.Err()
}
if rows.Next() {
var kline types.KLine
err = rows.StructScan(&kline)
return &kline, err
}
return nil, rows.Err()
}
2022-06-02 09:24:54 +00:00
// QueryKLinesForward is used for querying klines to back-testing
2021-05-07 16:57:25 +00:00
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
tableName := targetKlineTable(exchange)
2021-12-13 23:15:18 +00:00
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
2021-05-07 16:57:25 +00:00
"start_time": startTime,
"limit": limit,
"symbol": symbol,
"interval": interval,
2021-12-13 23:15:18 +00:00
"exchange": exchange.String(),
})
if err != nil {
return nil, err
}
return s.scanRows(rows)
}
2021-05-07 16:57:25 +00:00
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
tableName := targetKlineTable(exchange)
2021-12-13 23:15:18 +00:00
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
sql = "SELECT t.* FROM (" + sql + ") AS t ORDER BY t.end_time ASC"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
2021-05-07 16:57:25 +00:00
"limit": limit,
"end_time": endTime,
"symbol": symbol,
"interval": interval,
2021-12-13 23:15:18 +00:00
"exchange": exchange.String(),
})
if err != nil {
return nil, err
}
return s.scanRows(rows)
}
2020-11-10 11:06:20 +00:00
func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval) (chan types.KLine, chan error) {
2021-12-13 23:15:18 +00:00
if len(symbols) == 0 {
return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. "))
2021-12-13 23:15:18 +00:00
}
tableName := targetKlineTable(exchange.Name())
var query string
if len(symbols) == 1 {
query = "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` = :symbols AND `interval` IN (:intervals) ORDER BY end_time ASC"
} else {
query = "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) ORDER BY end_time ASC"
}
query = strings.ReplaceAll(query, "binance_klines", tableName)
sql, args, err := sqlx.Named(query, map[string]interface{}{
"since": since,
2020-11-10 11:06:20 +00:00
"until": until,
"symbol": symbols[0],
2020-11-07 12:11:07 +00:00
"symbols": symbols,
2020-11-06 16:49:17 +00:00
"intervals": types.IntervalSlice(intervals),
})
2020-11-10 11:06:20 +00:00
2020-11-06 16:49:17 +00:00
sql, args, err = sqlx.In(sql, args...)
if err != nil {
return returnError(err)
}
2020-11-06 16:49:17 +00:00
sql = s.DB.Rebind(sql)
rows, err := s.DB.Queryx(sql, args...)
if err != nil {
return returnError(err)
}
2020-11-06 16:49:17 +00:00
return s.scanRowsCh(rows)
}
func returnError(err error) (chan types.KLine, chan error) {
ch := make(chan types.KLine, 0)
close(ch)
log.WithError(err).Error("backtest query error")
errC := make(chan error, 1)
// avoid blocking
go func() {
errC <- err
close(errC)
}()
return ch, errC
}
// scanRowsCh scan rows into channel
2020-11-06 16:49:17 +00:00
func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) (chan types.KLine, chan error) {
2020-11-07 12:11:07 +00:00
ch := make(chan types.KLine, 500)
2020-11-06 16:49:17 +00:00
errC := make(chan error, 1)
go func() {
2020-11-06 16:49:17 +00:00
defer close(errC)
2020-11-07 12:11:07 +00:00
defer close(ch)
defer rows.Close()
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
}
ch <- kline
}
if err := rows.Err(); err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
}
2020-11-07 12:11:07 +00:00
}()
2020-11-06 16:49:17 +00:00
return ch, errC
}
func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err error) {
defer rows.Close()
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
return nil, err
}
klines = append(klines, kline)
}
return klines, rows.Err()
}
func targetKlineTable(exchangeName types.ExchangeName) string {
2022-06-02 13:27:28 +00:00
return strings.ToLower(exchangeName.String()) + "_klines"
2021-12-13 23:15:18 +00:00
}
2022-06-02 13:27:28 +00:00
var errExchangeFieldIsUnset = errors.New("kline.Exchange field should not be empty")
func (s *BacktestService) Insert(kline types.KLine) error {
if len(kline.Exchange) == 0 {
2022-06-02 13:27:28 +00:00
return errExchangeFieldIsUnset
}
tableName := targetKlineTable(kline.Exchange)
2021-12-13 23:15:18 +00:00
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName)
_, err := s.DB.NamedExec(sql, kline)
return err
}
2022-06-06 09:21:31 +00:00
// BatchInsert Note: all kline should be same exchange, or it will cause issue.
func (s *BacktestService) BatchInsert(kline []types.KLine) error {
if len(kline) == 0 {
return nil
}
tableName := targetKlineTable(kline[0].Exchange)
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
" VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName)
tx := s.DB.MustBegin()
if _, err := tx.NamedExec(sql, kline); err != nil {
if e := tx.Rollback(); e != nil {
log.WithError(e).Fatalf("cannot rollback insertion %v", err)
}
2022-06-06 09:21:31 +00:00
return err
}
return tx.Commit()
}
type TimeRange struct {
Start time.Time
End time.Time
}
2022-06-05 22:27:45 +00:00
func (t *TimeRange) String() string {
return t.Start.String() + " ~ " + t.End.String()
}
2022-06-08 06:38:47 +00:00
func (s *BacktestService) Sync(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error {
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil && err != sql.ErrNoRows {
return err
}
if err == sql.ErrNoRows || t1 == nil || t2 == nil {
// fallback to fresh sync
return s.SyncFresh(ctx, ex, symbol, interval, since, until)
}
return s.SyncPartial(ctx, ex, symbol, interval, since, until)
}
// SyncPartial
// find the existing data time range (t1, t2)
// scan if there is a missing part
// create a time range slice []TimeRange
// iterate the []TimeRange slice to sync data.
func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error {
2022-06-08 06:38:47 +00:00
log.Infof("starting partial sync %s %s %s: %s <=> %s", ex.Name(), symbol, interval, since, until)
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil && err != sql.ErrNoRows {
return err
}
2022-06-06 03:46:18 +00:00
if err == sql.ErrNoRows || t1 == nil || t2 == nil {
// fallback to fresh sync
2022-06-06 09:21:31 +00:00
return s.SyncFresh(ctx, ex, symbol, interval, since, until)
}
timeRanges, err := s.FindMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time())
if err != nil {
return err
}
2022-06-04 11:15:11 +00:00
if len(timeRanges) > 0 {
2022-06-08 06:38:47 +00:00
log.Infof("found missing data time ranges: %v", timeRanges)
}
2022-06-04 11:15:11 +00:00
// there are few cases:
// t1 == since && t2 == until
2022-06-05 22:02:38 +00:00
// [since] ------- [t1] data [t2] ------ [until]
if since.Before(t1.Time()) && t1.Time().Sub(since) > interval.Duration() {
2022-06-04 11:15:11 +00:00
// shift slice
timeRanges = append([]TimeRange{
{Start: since.Add(-2 * time.Second), End: t1.Time()}, // we should include since
2022-06-04 11:15:11 +00:00
}, timeRanges...)
}
if t2.Time().Before(until) && until.Sub(t2.Time()) > interval.Duration() {
2022-06-04 11:15:11 +00:00
timeRanges = append(timeRanges, TimeRange{
Start: t2.Time(),
End: until.Add(-interval.Duration()), // include until
2022-06-04 11:15:11 +00:00
})
}
for _, timeRange := range timeRanges {
2022-06-06 17:21:27 +00:00
err = s.SyncKLineByInterval(ctx, ex, symbol, interval, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
2022-06-04 11:15:11 +00:00
if err != nil {
return err
}
}
return nil
}
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) ([]TimeRange, error) {
query := SelectKLineTimePoints(ex.Name(), symbol, interval, since, until)
sql, args, err := query.ToSql()
if err != nil {
return nil, err
}
rows, err := s.DB.QueryContext(ctx, sql, args...)
defer rows.Close()
if err != nil {
return nil, err
}
var timeRanges []TimeRange
var lastTime = since
var intervalDuration = interval.Duration()
for rows.Next() {
var tt types.Time
if err := rows.Scan(&tt); err != nil {
return nil, err
}
var t = time.Time(tt)
if t.Sub(lastTime) > intervalDuration {
timeRanges = append(timeRanges, TimeRange{
Start: lastTime,
End: t,
})
}
lastTime = t
}
if lastTime.Before(until) && until.Sub(lastTime) > intervalDuration {
timeRanges = append(timeRanges, TimeRange{
Start: lastTime,
End: until,
})
}
return timeRanges, nil
}
func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time) (start, end *types.Time, err error) {
sel := SelectKLineTimeRange(ex.Name(), symbol, interval, tArgs...)
sql, args, err := sel.ToSql()
if err != nil {
return nil, nil, err
}
var t1, t2 types.Time
row := s.DB.QueryRowContext(ctx, sql, args...)
if err := row.Scan(&t1, &t2); err != nil {
return nil, nil, err
}
if err := row.Err(); err != nil {
return nil, nil, err
}
2022-06-06 03:46:18 +00:00
if t1 == (types.Time{}) || t2 == (types.Time{}) {
return nil, nil, nil
}
return &t1, &t2, nil
}
func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
conditions := sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"`interval`": interval.String()},
}
if len(args) == 2 {
since := args[0]
until := args[1]
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
}
tableName := targetKlineTable(ex)
return sq.Select("start_time").
From(tableName).
Where(conditions).
OrderBy("start_time ASC")
}
// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until)
func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
conditions := sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"`interval`": interval.String()},
}
if len(args) == 2 {
// NOTE
// sqlite does not support timezone format, so we are converting to local timezone
// mysql works in this case, so this is a workaround
since := args[0]
until := args[1]
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
}
tableName := targetKlineTable(ex)
return sq.Select("MIN(start_time) AS t1, MAX(start_time) AS t2").
From(tableName).
Where(conditions)
}
2022-06-02 08:40:24 +00:00
// TODO: add is_futures column since the klines data is different
func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime, endTime time.Time, limit uint64) sq.SelectBuilder {
tableName := targetKlineTable(ex)
2022-06-02 08:40:24 +00:00
return sq.Select("*").
From(tableName).
2022-06-02 08:40:24 +00:00
Where(sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"`interval`": interval.String()},
sq.Expr("start_time BETWEEN ? AND ?", startTime, endTime),
2022-06-02 08:40:24 +00:00
}).
OrderBy("start_time DESC").
Limit(limit)
}