Merge pull request #1467 from andycheng123/feature/sync-futures

WIP: feature: sync futures data and backtest with them
This commit is contained in:
Andy Cheng 2024-01-03 10:41:39 +08:00 committed by GitHub
commit 22a9ab068d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 199 additions and 27 deletions

View File

@ -256,11 +256,11 @@ func (e *Exchange) QueryKLines(
ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions,
) ([]types.KLine, error) {
if options.EndTime != nil {
return e.srv.QueryKLinesBackward(e.sourceName, symbol, interval, *options.EndTime, 1000)
return e.srv.QueryKLinesBackward(e, symbol, interval, *options.EndTime, 1000)
}
if options.StartTime != nil {
return e.srv.QueryKLinesForward(e.sourceName, symbol, interval, *options.StartTime, 1000)
return e.srv.QueryKLinesForward(e, symbol, interval, *options.StartTime, 1000)
}
return nil, errors.New("endTime or startTime can not be nil")
@ -381,7 +381,19 @@ func (e *Exchange) SubscribeMarketData(
intervals = append(intervals, interval)
}
var isFutures bool
if futuresExchange, ok := e.publicExchange.(types.FuturesExchange); ok {
isFutures = futuresExchange.GetFuturesSettings().IsFutures
} else {
isFutures = false
}
if isFutures {
log.Infof("querying futures klines from database with exchange: %v symbols: %v and intervals: %v for back-testing", e.Name(), symbols, intervals)
} else {
log.Infof("querying klines from database with exchange: %v symbols: %v and intervals: %v for back-testing", e.Name(), symbols, intervals)
}
if len(symbols) == 0 {
log.Warnf("empty symbols, will not query kline data from the database")
@ -390,7 +402,7 @@ func (e *Exchange) SubscribeMarketData(
return c, nil
}
klineC, errC := e.srv.QueryKLinesCh(startTime, endTime, e, symbols, intervals)
klineC, errC := e.srv.QueryKLinesCh(startTime, endTime, e.publicExchange, symbols, intervals)
go func() {
if err := <-errC; err != nil {
log.WithError(err).Error("backtest data feed error")

View File

@ -192,6 +192,16 @@ var BacktestCmd = &cobra.Command{
return err
}
sourceExchanges[exName] = publicExchange
// Set exchange to use futures
if userConfig.Sessions[exName.String()].Futures {
futuresExchange, ok := publicExchange.(types.FuturesExchange)
if !ok {
return fmt.Errorf("exchange %s does not support futures", publicExchange.Name())
}
futuresExchange.UseFutures()
}
}
var syncFromTime time.Time
@ -269,6 +279,7 @@ var BacktestCmd = &cobra.Command{
exchangeFromConfig := userConfig.Sessions[name.String()]
if exchangeFromConfig != nil {
session.UseHeikinAshi = exchangeFromConfig.UseHeikinAshi
session.Futures = exchangeFromConfig.Futures
}
}

View File

@ -0,0 +1,64 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upAddFuturesKlines, downAddFuturesKlines)
}
func upAddFuturesKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `binance_futures_klines` LIKE `binance_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `bybit_futures_klines` LIKE `bybit_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `okex_futures_klines` LIKE `okex_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `max_futures_klines` LIKE `max_klines`;")
if err != nil {
return err
}
return err
}
func downAddFuturesKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `binance_futures_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `bybit_futures_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `okex_futures_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `max_futures_klines`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,74 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upAddFuturesKlines, downAddFuturesKlines)
}
func upAddFuturesKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `bybit_futures_klines`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(10) NOT NULL,\n `start_time` DATETIME(3) NOT NULL,\n `end_time` DATETIME(3) NOT NULL,\n `interval` VARCHAR(3) NOT NULL,\n `symbol` VARCHAR(7) NOT NULL,\n `open` DECIMAL(16, 8) NOT NULL,\n `high` DECIMAL(16, 8) NOT NULL,\n `low` DECIMAL(16, 8) NOT NULL,\n `close` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `volume` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `closed` BOOLEAN NOT NULL DEFAULT TRUE,\n `last_trade_id` INT NOT NULL DEFAULT 0,\n `num_trades` INT NOT NULL DEFAULT 0,\n `quote_volume` DECIMAL NOT NULL DEFAULT 0.0,\n `taker_buy_base_volume` DECIMAL NOT NULL DEFAULT 0.0,\n `taker_buy_quote_volume` DECIMAL NOT NULL DEFAULT 0.0\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `okex_futures_klines`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(10) NOT NULL,\n `start_time` DATETIME(3) NOT NULL,\n `end_time` DATETIME(3) NOT NULL,\n `interval` VARCHAR(3) NOT NULL,\n `symbol` VARCHAR(7) NOT NULL,\n `open` DECIMAL(16, 8) NOT NULL,\n `high` DECIMAL(16, 8) NOT NULL,\n `low` DECIMAL(16, 8) NOT NULL,\n `close` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `volume` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `closed` BOOLEAN NOT NULL DEFAULT TRUE,\n `last_trade_id` INT NOT NULL DEFAULT 0,\n `num_trades` INT NOT NULL DEFAULT 0\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `binance_futures_klines`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(10) NOT NULL,\n `start_time` DATETIME(3) NOT NULL,\n `end_time` DATETIME(3) NOT NULL,\n `interval` VARCHAR(3) NOT NULL,\n `symbol` VARCHAR(7) NOT NULL,\n `open` DECIMAL(16, 8) NOT NULL,\n `high` DECIMAL(16, 8) NOT NULL,\n `low` DECIMAL(16, 8) NOT NULL,\n `close` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `volume` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `closed` BOOLEAN NOT NULL DEFAULT TRUE,\n `last_trade_id` INT NOT NULL DEFAULT 0,\n `num_trades` INT NOT NULL DEFAULT 0\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `max_futures_klines`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(10) NOT NULL,\n `start_time` DATETIME(3) NOT NULL,\n `end_time` DATETIME(3) NOT NULL,\n `interval` VARCHAR(3) NOT NULL,\n `symbol` VARCHAR(7) NOT NULL,\n `open` DECIMAL(16, 8) NOT NULL,\n `high` DECIMAL(16, 8) NOT NULL,\n `low` DECIMAL(16, 8) NOT NULL,\n `close` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `volume` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `closed` BOOLEAN NOT NULL DEFAULT TRUE,\n `last_trade_id` INT NOT NULL DEFAULT 0,\n `num_trades` INT NOT NULL DEFAULT 0\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX `bybit_futures_klines_end_time_symbol_interval` ON `bybit_futures_klines` (`end_time`, `symbol`, `interval`);\nCREATE INDEX `okex_futures_klines_end_time_symbol_interval` ON `okex_futures_klines` (`end_time`, `symbol`, `interval`);\nCREATE INDEX `binance_futures_klines_end_time_symbol_interval` ON `binance_futures_klines` (`end_time`, `symbol`, `interval`);\nCREATE INDEX `max_futures_klines_end_time_symbol_interval` ON `max_futures_klines` (`end_time`, `symbol`, `interval`);")
if err != nil {
return err
}
return err
}
func downAddFuturesKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `bybit_futures_klines_end_time_symbol_interval`;\nDROP INDEX IF EXISTS `okex_futures_klines_end_time_symbol_interval`;\nDROP INDEX IF EXISTS `binance_futures_klines_end_time_symbol_interval`;\nDROP INDEX IF EXISTS `max_futures_klines_end_time_symbol_interval`;\n")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `bybit_futures_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `okex_futures_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `binance_futures_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `max_futures_klines`;")
if err != nil {
return err
}
return err
}

View File

@ -23,15 +23,19 @@ type BacktestService struct {
}
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
log.Infof("synchronizing %s klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
_, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
// TODO: use isFutures here
_, _, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
symbol = isolatedSymbol
}
if isFutures {
log.Infof("synchronizing %s futures klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
} else {
log.Infof("synchronizing %s klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
}
if s.DB.DriverName() == "sqlite3" {
_, _ = s.DB.Exec("PRAGMA journal_mode = WAL")
_, _ = s.DB.Exec("PRAGMA synchronous = NORMAL")
@ -41,7 +45,7 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
tasks := []SyncTask{
{
Type: types.KLine{},
Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100),
Select: s.SelectLastKLines(exchange, symbol, interval, startTime, endTime, 100),
Time: func(obj interface{}) time.Time {
return obj.(types.KLine).StartTime.Time()
},
@ -70,11 +74,11 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
BatchInsertBuffer: 1000,
BatchInsert: func(obj interface{}) error {
kLines := obj.([]types.KLine)
return s.BatchInsert(kLines)
return s.BatchInsert(kLines, exchange)
},
Insert: func(obj interface{}) error {
kline := obj.(types.KLine)
return s.Insert(kline)
return s.Insert(kline, exchange)
},
LogInsert: log.GetLevel() == log.DebugLevel,
},
@ -131,7 +135,7 @@ func (s *BacktestService) SyncFresh(ctx context.Context, exchange types.Exchange
}
// QueryKLine queries the klines from the database
func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
func (s *BacktestService) QueryKLine(ex types.Exchange, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
tableName := targetKlineTable(ex)
@ -162,7 +166,7 @@ func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, inter
}
// QueryKLinesForward is used for querying klines to back-testing
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
func (s *BacktestService) QueryKLinesForward(exchange types.Exchange, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
tableName := targetKlineTable(exchange)
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
@ -172,7 +176,7 @@ func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol
"limit": limit,
"symbol": symbol,
"interval": interval,
"exchange": exchange.String(),
"exchange": exchange.Name().String(),
})
if err != nil {
return nil, err
@ -181,7 +185,7 @@ func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol
return s.scanRows(rows)
}
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
func (s *BacktestService) QueryKLinesBackward(exchange types.Exchange, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
tableName := targetKlineTable(exchange)
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
@ -193,7 +197,7 @@ func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbo
"end_time": endTime,
"symbol": symbol,
"interval": interval,
"exchange": exchange.String(),
"exchange": exchange.Name().String(),
})
if err != nil {
return nil, err
@ -207,7 +211,7 @@ func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.E
return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. "))
}
tableName := targetKlineTable(exchange.Name())
tableName := targetKlineTable(exchange)
var query string
// need to sort by start_time desc in order to let matching engine process 1m first
@ -300,18 +304,25 @@ func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err e
return klines, rows.Err()
}
func targetKlineTable(exchangeName types.ExchangeName) string {
return strings.ToLower(exchangeName.String()) + "_klines"
func targetKlineTable(exchange types.Exchange) string {
_, isFutures, _, _ := exchange2.GetSessionAttributes(exchange)
tableName := strings.ToLower(exchange.Name().String())
if isFutures {
return tableName + "_futures_klines"
} else {
return tableName + "_klines"
}
}
var errExchangeFieldIsUnset = errors.New("kline.Exchange field should not be empty")
func (s *BacktestService) Insert(kline types.KLine) error {
func (s *BacktestService) Insert(kline types.KLine, ex types.Exchange) error {
if len(kline.Exchange) == 0 {
return errExchangeFieldIsUnset
}
tableName := targetKlineTable(kline.Exchange)
tableName := targetKlineTable(ex)
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName)
@ -321,12 +332,12 @@ func (s *BacktestService) Insert(kline types.KLine) error {
}
// BatchInsert Note: all kline should be same exchange, or it will cause issue.
func (s *BacktestService) BatchInsert(kline []types.KLine) error {
func (s *BacktestService) BatchInsert(kline []types.KLine, ex types.Exchange) error {
if len(kline) == 0 {
return nil
}
tableName := targetKlineTable(kline[0].Exchange)
tableName := targetKlineTable(ex)
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
" VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName)
@ -421,7 +432,7 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) ([]TimeRange, error) {
query := SelectKLineTimePoints(ex.Name(), symbol, interval, since, until)
query := s.SelectKLineTimePoints(ex, symbol, interval, since, until)
sql, args, err := query.ToSql()
if err != nil {
return nil, err
@ -464,7 +475,7 @@ func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Ex
}
func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time) (start, end *types.Time, err error) {
sel := SelectKLineTimeRange(ex.Name(), symbol, interval, tArgs...)
sel := s.SelectKLineTimeRange(ex, symbol, interval, tArgs...)
sql, args, err := sel.ToSql()
if err != nil {
return nil, nil, err
@ -489,7 +500,7 @@ func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.E
return &t1, &t2, nil
}
func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
func (s *BacktestService) SelectKLineTimePoints(ex types.Exchange, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
conditions := sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"`interval`": interval.String()},
@ -510,7 +521,7 @@ func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.
}
// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until)
func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
func (s *BacktestService) SelectKLineTimeRange(ex types.Exchange, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
conditions := sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"`interval`": interval.String()},
@ -533,7 +544,7 @@ func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.I
}
// TODO: add is_futures column since the klines data is different
func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime, endTime time.Time, limit uint64) sq.SelectBuilder {
func (s *BacktestService) SelectLastKLines(ex types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time, limit uint64) sq.SelectBuilder {
tableName := targetKlineTable(ex)
return sq.Select("*").
From(tableName).