mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 23:05:15 +00:00
improve/db: save futures kilne to futures table
This commit is contained in:
parent
5b0b5428fb
commit
6809efa696
|
@ -19,12 +19,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type BacktestService struct {
|
type BacktestService struct {
|
||||||
DB *sqlx.DB
|
DB *sqlx.DB
|
||||||
|
Futures bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
|
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
|
||||||
log.Infof("synchronizing %s klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
|
|
||||||
|
|
||||||
_, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
|
_, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
|
||||||
|
|
||||||
// override symbol if isolatedSymbol is not empty
|
// override symbol if isolatedSymbol is not empty
|
||||||
|
@ -32,7 +31,8 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
|
||||||
symbol = isolatedSymbol
|
symbol = isolatedSymbol
|
||||||
}
|
}
|
||||||
|
|
||||||
if isFutures {
|
s.Futures = isFutures
|
||||||
|
if s.Futures {
|
||||||
futuresExchange, ok := exchange.(types.FuturesExchange)
|
futuresExchange, ok := exchange.(types.FuturesExchange)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("exchange %s does not support futures", exchange.Name())
|
return fmt.Errorf("exchange %s does not support futures", exchange.Name())
|
||||||
|
@ -43,6 +43,9 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
|
||||||
} else {
|
} else {
|
||||||
futuresExchange.UseFutures()
|
futuresExchange.UseFutures()
|
||||||
}
|
}
|
||||||
|
log.Infof("synchronizing %s futures klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
|
||||||
|
} else {
|
||||||
|
log.Infof("synchronizing %s klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.DB.DriverName() == "sqlite3" {
|
if s.DB.DriverName() == "sqlite3" {
|
||||||
|
@ -54,7 +57,7 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
|
||||||
tasks := []SyncTask{
|
tasks := []SyncTask{
|
||||||
{
|
{
|
||||||
Type: types.KLine{},
|
Type: types.KLine{},
|
||||||
Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100),
|
Select: s.SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100),
|
||||||
Time: func(obj interface{}) time.Time {
|
Time: func(obj interface{}) time.Time {
|
||||||
return obj.(types.KLine).StartTime.Time()
|
return obj.(types.KLine).StartTime.Time()
|
||||||
},
|
},
|
||||||
|
@ -147,7 +150,7 @@ func (s *BacktestService) SyncFresh(ctx context.Context, exchange types.Exchange
|
||||||
func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
|
func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
|
||||||
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
|
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
|
||||||
|
|
||||||
tableName := targetKlineTable(ex)
|
tableName := s.targetKlineTable(ex)
|
||||||
// make the SQL syntax IDE friendly, so that it can analyze it.
|
// make the SQL syntax IDE friendly, so that it can analyze it.
|
||||||
sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName)
|
sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName)
|
||||||
|
|
||||||
|
@ -176,7 +179,7 @@ func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, inter
|
||||||
|
|
||||||
// QueryKLinesForward is used for querying klines to back-testing
|
// QueryKLinesForward is used for querying klines to back-testing
|
||||||
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
|
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
|
||||||
tableName := targetKlineTable(exchange)
|
tableName := s.targetKlineTable(exchange)
|
||||||
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
|
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
|
||||||
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
||||||
|
|
||||||
|
@ -195,7 +198,7 @@ func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
|
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
|
||||||
tableName := targetKlineTable(exchange)
|
tableName := s.targetKlineTable(exchange)
|
||||||
|
|
||||||
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
|
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
|
||||||
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
||||||
|
@ -220,7 +223,7 @@ func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.E
|
||||||
return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. "))
|
return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. "))
|
||||||
}
|
}
|
||||||
|
|
||||||
tableName := targetKlineTable(exchange.Name())
|
tableName := s.targetKlineTable(exchange.Name())
|
||||||
var query string
|
var query string
|
||||||
|
|
||||||
// need to sort by start_time desc in order to let matching engine process 1m first
|
// need to sort by start_time desc in order to let matching engine process 1m first
|
||||||
|
@ -313,8 +316,13 @@ func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err e
|
||||||
return klines, rows.Err()
|
return klines, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func targetKlineTable(exchangeName types.ExchangeName) string {
|
func (s *BacktestService) targetKlineTable(exchangeName types.ExchangeName) string {
|
||||||
return strings.ToLower(exchangeName.String()) + "_klines"
|
tableName := strings.ToLower(exchangeName.String())
|
||||||
|
if s.Futures {
|
||||||
|
return tableName + "_futures_klines"
|
||||||
|
} else {
|
||||||
|
return tableName + "_klines"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var errExchangeFieldIsUnset = errors.New("kline.Exchange field should not be empty")
|
var errExchangeFieldIsUnset = errors.New("kline.Exchange field should not be empty")
|
||||||
|
@ -324,7 +332,7 @@ func (s *BacktestService) Insert(kline types.KLine) error {
|
||||||
return errExchangeFieldIsUnset
|
return errExchangeFieldIsUnset
|
||||||
}
|
}
|
||||||
|
|
||||||
tableName := targetKlineTable(kline.Exchange)
|
tableName := s.targetKlineTable(kline.Exchange)
|
||||||
|
|
||||||
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
|
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
|
||||||
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName)
|
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName)
|
||||||
|
@ -339,7 +347,7 @@ func (s *BacktestService) BatchInsert(kline []types.KLine) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tableName := targetKlineTable(kline[0].Exchange)
|
tableName := s.targetKlineTable(kline[0].Exchange)
|
||||||
|
|
||||||
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
|
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
|
||||||
" VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName)
|
" VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName)
|
||||||
|
@ -434,7 +442,7 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy
|
||||||
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
|
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
|
||||||
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
|
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
|
||||||
func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) ([]TimeRange, error) {
|
func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) ([]TimeRange, error) {
|
||||||
query := SelectKLineTimePoints(ex.Name(), symbol, interval, since, until)
|
query := s.SelectKLineTimePoints(ex.Name(), symbol, interval, since, until)
|
||||||
sql, args, err := query.ToSql()
|
sql, args, err := query.ToSql()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -477,7 +485,7 @@ func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Ex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time) (start, end *types.Time, err error) {
|
func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time) (start, end *types.Time, err error) {
|
||||||
sel := SelectKLineTimeRange(ex.Name(), symbol, interval, tArgs...)
|
sel := s.SelectKLineTimeRange(ex.Name(), symbol, interval, tArgs...)
|
||||||
sql, args, err := sel.ToSql()
|
sql, args, err := sel.ToSql()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -502,7 +510,7 @@ func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.E
|
||||||
return &t1, &t2, nil
|
return &t1, &t2, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
|
func (s *BacktestService) SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
|
||||||
conditions := sq.And{
|
conditions := sq.And{
|
||||||
sq.Eq{"symbol": symbol},
|
sq.Eq{"symbol": symbol},
|
||||||
sq.Eq{"`interval`": interval.String()},
|
sq.Eq{"`interval`": interval.String()},
|
||||||
|
@ -514,7 +522,7 @@ func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.
|
||||||
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
|
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
|
||||||
}
|
}
|
||||||
|
|
||||||
tableName := targetKlineTable(ex)
|
tableName := s.targetKlineTable(ex)
|
||||||
|
|
||||||
return sq.Select("start_time").
|
return sq.Select("start_time").
|
||||||
From(tableName).
|
From(tableName).
|
||||||
|
@ -523,7 +531,7 @@ func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until)
|
// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until)
|
||||||
func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
|
func (s *BacktestService) SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
|
||||||
conditions := sq.And{
|
conditions := sq.And{
|
||||||
sq.Eq{"symbol": symbol},
|
sq.Eq{"symbol": symbol},
|
||||||
sq.Eq{"`interval`": interval.String()},
|
sq.Eq{"`interval`": interval.String()},
|
||||||
|
@ -538,7 +546,7 @@ func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.I
|
||||||
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
|
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
|
||||||
}
|
}
|
||||||
|
|
||||||
tableName := targetKlineTable(ex)
|
tableName := s.targetKlineTable(ex)
|
||||||
|
|
||||||
return sq.Select("MIN(start_time) AS t1, MAX(start_time) AS t2").
|
return sq.Select("MIN(start_time) AS t1, MAX(start_time) AS t2").
|
||||||
From(tableName).
|
From(tableName).
|
||||||
|
@ -546,8 +554,8 @@ func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.I
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add is_futures column since the klines data is different
|
// TODO: add is_futures column since the klines data is different
|
||||||
func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime, endTime time.Time, limit uint64) sq.SelectBuilder {
|
func (s *BacktestService) SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime, endTime time.Time, limit uint64) sq.SelectBuilder {
|
||||||
tableName := targetKlineTable(ex)
|
tableName := s.targetKlineTable(ex)
|
||||||
return sq.Select("*").
|
return sq.Select("*").
|
||||||
From(tableName).
|
From(tableName).
|
||||||
Where(sq.And{
|
Where(sq.And{
|
||||||
|
|
Loading…
Reference in New Issue
Block a user