2020-11-06 13:40:48 +00:00
package service
import (
"context"
2022-06-04 17:01:59 +00:00
"database/sql"
2021-12-13 23:15:18 +00:00
"fmt"
2021-12-07 07:21:37 +00:00
"strconv"
2020-11-06 13:40:48 +00:00
"strings"
"time"
2022-06-02 08:40:24 +00:00
sq "github.com/Masterminds/squirrel"
2020-11-06 13:40:48 +00:00
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2022-07-14 09:36:16 +00:00
exchange2 "github.com/c9s/bbgo/pkg/exchange"
2022-05-30 16:59:33 +00:00
"github.com/c9s/bbgo/pkg/exchange/batch"
2020-11-06 13:40:48 +00:00
"github.com/c9s/bbgo/pkg/types"
)
2023-11-02 17:03:16 +00:00
type BackTestable interface {
Verify ( sourceExchange types . Exchange , symbols [ ] string , startTime time . Time , endTime time . Time ) error
Sync ( ctx context . Context , ex types . Exchange , symbol string , intervals [ ] types . Interval , since , until time . Time ) error
QueryKLine ( ex types . ExchangeName , symbol string , interval types . Interval , orderBy string , limit int ) ( * types . KLine , error )
QueryKLinesForward ( exchange types . ExchangeName , symbol string , interval types . Interval , startTime time . Time , limit int ) ( [ ] types . KLine , error )
QueryKLinesBackward ( exchange types . ExchangeName , symbol string , interval types . Interval , endTime time . Time , limit int ) ( [ ] types . KLine , error )
QueryKLinesCh ( since , until time . Time , exchange types . Exchange , symbols [ ] string , intervals [ ] types . Interval ) ( chan types . KLine , chan error )
}
2020-11-06 13:40:48 +00:00
type BacktestService struct {
2023-12-21 10:19:28 +00:00
DB * sqlx . DB
2020-11-06 13:40:48 +00:00
}
2023-11-02 17:03:16 +00:00
func NewBacktestService ( db * sqlx . DB ) * BacktestService {
return & BacktestService { DB : db }
}
func ( s * BacktestService ) syncKLineByInterval ( ctx context . Context , exchange types . Exchange , symbol string , interval types . Interval , startTime , endTime time . Time ) error {
2023-12-21 07:47:24 +00:00
_ , isFutures , isIsolated , isolatedSymbol := exchange2 . GetSessionAttributes ( exchange )
2023-11-02 17:03:16 +00:00
log . Infof ( "synchronizing %s klines with interval %s: %s <=> %s" , exchange . Name ( ) , interval , startTime , endTime )
2023-12-21 07:47:24 +00:00
2022-06-02 08:40:24 +00:00
// override symbol if isolatedSymbol is not empty
if isIsolated && len ( isolatedSymbol ) > 0 {
symbol = isolatedSymbol
}
2023-12-21 10:19:28 +00:00
if isFutures {
2023-12-21 08:19:32 +00:00
log . Infof ( "synchronizing %s futures klines with interval %s: %s <=> %s" , exchange . Name ( ) , interval , startTime , endTime )
} else {
log . Infof ( "synchronizing %s klines with interval %s: %s <=> %s" , exchange . Name ( ) , interval , startTime , endTime )
2023-12-21 07:47:24 +00:00
}
2022-06-06 06:53:37 +00:00
if s . DB . DriverName ( ) == "sqlite3" {
_ , _ = s . DB . Exec ( "PRAGMA journal_mode = WAL" )
_ , _ = s . DB . Exec ( "PRAGMA synchronous = NORMAL" )
}
2022-08-19 09:55:48 +00:00
now := time . Now ( )
2022-06-02 08:40:24 +00:00
tasks := [ ] SyncTask {
{
2022-06-03 18:23:23 +00:00
Type : types . KLine { } ,
2023-12-21 10:19:28 +00:00
Select : s . SelectLastKLines ( exchange , symbol , interval , startTime , endTime , 100 ) ,
2022-06-02 08:40:24 +00:00
Time : func ( obj interface { } ) time . Time {
2022-06-06 16:48:13 +00:00
return obj . ( types . KLine ) . StartTime . Time ( )
2022-06-02 08:40:24 +00:00
} ,
2022-08-19 09:28:55 +00:00
Filter : func ( obj interface { } ) bool {
k := obj . ( types . KLine )
2022-08-19 09:55:48 +00:00
if k . EndTime . Before ( k . StartTime . Time ( ) . Add ( k . Interval . Duration ( ) - time . Second ) ) {
return false
}
// Filter klines that has the endTime closed in the future
if k . EndTime . After ( now ) {
return false
}
return true
2022-08-19 09:28:55 +00:00
} ,
2022-06-02 08:40:24 +00:00
ID : func ( obj interface { } ) string {
kline := obj . ( types . KLine )
2022-06-06 16:48:13 +00:00
return strconv . FormatInt ( kline . StartTime . UnixMilli ( ) , 10 )
// return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
2022-06-02 08:40:24 +00:00
} ,
BatchQuery : func ( ctx context . Context , startTime , endTime time . Time ) ( interface { } , chan error ) {
q := & batch . KLineBatchQuery { Exchange : exchange }
return q . Query ( ctx , symbol , interval , startTime , endTime )
} ,
2022-06-19 09:41:52 +00:00
BatchInsertBuffer : 1000 ,
2022-06-07 04:27:32 +00:00
BatchInsert : func ( obj interface { } ) error {
kLines := obj . ( [ ] types . KLine )
2023-12-21 10:19:28 +00:00
return s . BatchInsert ( kLines , exchange )
2022-06-07 04:27:32 +00:00
} ,
2022-06-06 16:48:13 +00:00
Insert : func ( obj interface { } ) error {
kline := obj . ( types . KLine )
2023-12-21 10:19:28 +00:00
return s . Insert ( kline , exchange )
2022-06-02 08:40:24 +00:00
} ,
2022-06-06 04:24:18 +00:00
LogInsert : log . GetLevel ( ) == log . DebugLevel ,
2022-06-02 08:40:24 +00:00
} ,
}
2020-11-06 16:49:17 +00:00
2022-06-02 08:40:24 +00:00
for _ , sel := range tasks {
2022-06-03 18:23:23 +00:00
if err := sel . execute ( ctx , s . DB , startTime , endTime ) ; err != nil {
2021-03-14 03:04:56 +00:00
return err
2020-11-06 13:40:48 +00:00
}
2021-03-14 03:04:56 +00:00
}
2022-06-02 08:40:24 +00:00
return nil
2021-03-14 03:04:56 +00:00
}
2020-11-06 16:49:17 +00:00
2022-06-05 22:03:25 +00:00
func ( s * BacktestService ) Verify ( sourceExchange types . Exchange , symbols [ ] string , startTime time . Time , endTime time . Time ) error {
2021-12-14 16:59:28 +00:00
var corruptCnt = 0
for _ , symbol := range symbols {
for interval := range types . SupportedIntervals {
2022-06-05 22:03:25 +00:00
log . Infof ( "verifying %s %s backtesting data: %s to %s..." , symbol , interval , startTime , endTime )
2022-06-05 22:02:38 +00:00
2023-11-02 17:03:16 +00:00
timeRanges , err := s . findMissingTimeRanges ( context . Background ( ) , sourceExchange , symbol , interval ,
2022-06-06 09:21:31 +00:00
startTime , endTime )
2022-06-05 22:02:38 +00:00
if err != nil {
return err
2021-12-14 16:59:28 +00:00
}
2021-12-13 23:15:18 +00:00
2022-06-05 22:02:38 +00:00
if len ( timeRanges ) == 0 {
continue
2021-12-14 16:59:28 +00:00
}
2021-12-13 23:15:18 +00:00
2022-06-06 04:15:06 +00:00
log . Warnf ( "%s %s found missing time ranges:" , symbol , interval )
2022-06-05 22:02:38 +00:00
corruptCnt += len ( timeRanges )
for _ , timeRange := range timeRanges {
2022-06-06 04:15:06 +00:00
log . Warnf ( "- %s" , timeRange . String ( ) )
2021-12-14 16:59:28 +00:00
}
}
2021-12-13 23:15:18 +00:00
}
2021-12-14 16:59:28 +00:00
log . Infof ( "backtest verification completed" )
if corruptCnt > 0 {
log . Errorf ( "found %d corruptions" , corruptCnt )
} else {
log . Infof ( "found %d corruptions" , corruptCnt )
2020-11-06 13:40:48 +00:00
}
2022-05-11 05:59:44 +00:00
return nil
2021-12-14 16:59:28 +00:00
}
2023-11-02 17:03:16 +00:00
func ( s * BacktestService ) SyncFresh ( ctx context . Context , exchange types . Exchange , symbol string , interval types . Interval , startTime , endTime time . Time ) error {
2022-06-08 06:38:47 +00:00
log . Infof ( "starting fresh sync %s %s %s: %s <=> %s" , exchange . Name ( ) , symbol , interval , startTime , endTime )
2022-06-06 09:21:31 +00:00
startTime = startTime . Truncate ( time . Minute ) . Add ( - 2 * time . Second )
endTime = endTime . Truncate ( time . Minute ) . Add ( 2 * time . Second )
2023-11-02 17:03:16 +00:00
return s . syncKLineByInterval ( ctx , exchange , symbol , interval , startTime , endTime )
2020-11-06 13:40:48 +00:00
}
2021-12-07 07:21:37 +00:00
// QueryKLine queries the klines from the database
2024-01-18 16:34:02 +00:00
func ( s * BacktestService ) QueryKLine (
ex types . Exchange , symbol string , interval types . Interval , orderBy string , limit int ,
) ( * types . KLine , error ) {
2020-11-06 13:40:48 +00:00
log . Infof ( "querying last kline exchange = %s AND symbol = %s AND interval = %s" , ex , symbol , interval )
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( ex )
2020-11-06 13:40:48 +00:00
// make the SQL syntax IDE friendly, so that it can analyze it.
2022-06-03 18:23:23 +00:00
sql := fmt . Sprintf ( "SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time " + orderBy + " LIMIT " + strconv . Itoa ( limit ) , tableName )
2020-11-06 13:40:48 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
"interval" : interval ,
"symbol" : symbol ,
} )
2022-06-06 09:57:24 +00:00
defer rows . Close ( )
2020-11-06 13:40:48 +00:00
if err != nil {
2021-12-07 07:21:37 +00:00
return nil , errors . Wrap ( err , "query kline error" )
2020-11-06 13:40:48 +00:00
}
if rows . Err ( ) != nil {
return nil , rows . Err ( )
}
if rows . Next ( ) {
var kline types . KLine
err = rows . StructScan ( & kline )
return & kline , err
}
return nil , rows . Err ( )
}
2022-06-02 09:24:54 +00:00
// QueryKLinesForward is used for querying klines to back-testing
2024-01-18 16:34:02 +00:00
func ( s * BacktestService ) QueryKLinesForward (
exchange types . Exchange , symbol string , interval types . Interval , startTime time . Time , limit int ,
) ( [ ] types . KLine , error ) {
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( exchange )
2021-12-13 23:15:18 +00:00
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
sql = strings . ReplaceAll ( sql , "binance_klines" , tableName )
2020-11-08 04:13:34 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-05-07 16:57:25 +00:00
"start_time" : startTime ,
2021-10-16 05:49:00 +00:00
"limit" : limit ,
"symbol" : symbol ,
"interval" : interval ,
2023-12-21 10:19:28 +00:00
"exchange" : exchange . Name ( ) . String ( ) ,
2020-11-08 04:13:34 +00:00
} )
if err != nil {
return nil , err
}
return s . scanRows ( rows )
}
2024-01-18 16:34:02 +00:00
func ( s * BacktestService ) QueryKLinesBackward (
exchange types . Exchange , symbol string , interval types . Interval , endTime time . Time , limit int ,
) ( [ ] types . KLine , error ) {
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( exchange )
2021-12-13 23:15:18 +00:00
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
sql = strings . ReplaceAll ( sql , "binance_klines" , tableName )
2021-10-16 05:49:00 +00:00
sql = "SELECT t.* FROM (" + sql + ") AS t ORDER BY t.end_time ASC"
2020-11-08 04:13:34 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-05-07 16:57:25 +00:00
"limit" : limit ,
2021-10-16 05:49:00 +00:00
"end_time" : endTime ,
2020-11-08 04:13:34 +00:00
"symbol" : symbol ,
"interval" : interval ,
2023-12-21 10:19:28 +00:00
"exchange" : exchange . Name ( ) . String ( ) ,
2020-11-08 04:13:34 +00:00
} )
if err != nil {
return nil , err
}
return s . scanRows ( rows )
}
2024-01-18 16:34:02 +00:00
func ( s * BacktestService ) QueryKLinesCh (
since , until time . Time , exchange types . Exchange , symbols [ ] string , intervals [ ] types . Interval ,
) ( chan types . KLine , chan error ) {
2021-12-13 23:15:18 +00:00
if len ( symbols ) == 0 {
2021-12-20 12:28:22 +00:00
return returnError ( errors . Errorf ( "symbols is empty when querying kline, plesae check your strategy setting. " ) )
2021-12-13 23:15:18 +00:00
}
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( exchange )
2022-06-09 07:50:23 +00:00
var query string
2022-08-29 05:11:02 +00:00
// need to sort by start_time desc in order to let matching engine process 1m first
// otherwise any other close event could peek on the final close price
2022-06-09 07:50:23 +00:00
if len ( symbols ) == 1 {
2022-08-29 05:11:02 +00:00
query = "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` = :symbols AND `interval` IN (:intervals) ORDER BY end_time ASC, start_time DESC"
2022-06-09 07:50:23 +00:00
} else {
2022-08-29 05:11:02 +00:00
query = "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) ORDER BY end_time ASC, start_time DESC"
2022-06-09 07:50:23 +00:00
}
query = strings . ReplaceAll ( query , "binance_klines" , tableName )
2020-11-06 13:40:48 +00:00
2022-06-09 07:50:23 +00:00
sql , args , err := sqlx . Named ( query , map [ string ] interface { } {
2020-11-06 13:40:48 +00:00
"since" : since ,
2020-11-10 11:06:20 +00:00
"until" : until ,
2022-06-09 07:50:23 +00:00
"symbol" : symbols [ 0 ] ,
2020-11-07 12:11:07 +00:00
"symbols" : symbols ,
2020-11-06 16:49:17 +00:00
"intervals" : types . IntervalSlice ( intervals ) ,
2020-11-06 13:40:48 +00:00
} )
2020-11-10 11:06:20 +00:00
2020-11-06 16:49:17 +00:00
sql , args , err = sqlx . In ( sql , args ... )
2021-12-20 12:28:22 +00:00
if err != nil {
return returnError ( err )
}
2020-11-06 16:49:17 +00:00
sql = s . DB . Rebind ( sql )
rows , err := s . DB . Queryx ( sql , args ... )
2020-11-06 13:40:48 +00:00
if err != nil {
2021-12-20 12:28:22 +00:00
return returnError ( err )
2020-11-06 13:40:48 +00:00
}
2020-11-06 16:49:17 +00:00
return s . scanRowsCh ( rows )
2020-11-06 13:40:48 +00:00
}
2021-12-20 12:28:22 +00:00
func returnError ( err error ) ( chan types . KLine , chan error ) {
2022-06-17 11:19:51 +00:00
ch := make ( chan types . KLine )
2021-12-20 12:28:22 +00:00
close ( ch )
log . WithError ( err ) . Error ( "backtest query error" )
errC := make ( chan error , 1 )
// avoid blocking
go func ( ) {
errC <- err
close ( errC )
} ( )
return ch , errC
}
2020-11-06 13:40:48 +00:00
// scanRowsCh scan rows into channel
2020-11-06 16:49:17 +00:00
func ( s * BacktestService ) scanRowsCh ( rows * sqlx . Rows ) ( chan types . KLine , chan error ) {
2020-11-07 12:11:07 +00:00
ch := make ( chan types . KLine , 500 )
2020-11-06 16:49:17 +00:00
errC := make ( chan error , 1 )
2020-11-06 13:40:48 +00:00
go func ( ) {
2020-11-06 16:49:17 +00:00
defer close ( errC )
2020-11-07 12:11:07 +00:00
defer close ( ch )
2020-11-06 13:40:48 +00:00
defer rows . Close ( )
for rows . Next ( ) {
var kline types . KLine
if err := rows . StructScan ( & kline ) ; err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
2020-11-06 13:40:48 +00:00
}
ch <- kline
}
if err := rows . Err ( ) ; err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
2020-11-06 13:40:48 +00:00
}
2020-11-07 12:11:07 +00:00
2020-11-06 13:40:48 +00:00
} ( )
2020-11-06 16:49:17 +00:00
return ch , errC
2020-11-06 13:40:48 +00:00
}
func ( s * BacktestService ) scanRows ( rows * sqlx . Rows ) ( klines [ ] types . KLine , err error ) {
2022-06-06 09:57:24 +00:00
defer rows . Close ( )
2020-11-06 13:40:48 +00:00
for rows . Next ( ) {
var kline types . KLine
if err := rows . StructScan ( & kline ) ; err != nil {
return nil , err
}
klines = append ( klines , kline )
}
return klines , rows . Err ( )
}
2023-12-21 10:19:28 +00:00
func targetKlineTable ( exchange types . Exchange ) string {
_ , isFutures , _ , _ := exchange2 . GetSessionAttributes ( exchange )
tableName := strings . ToLower ( exchange . Name ( ) . String ( ) )
if isFutures {
2023-12-21 08:19:32 +00:00
return tableName + "_futures_klines"
} else {
return tableName + "_klines"
}
2021-12-13 23:15:18 +00:00
}
2022-06-02 13:27:28 +00:00
var errExchangeFieldIsUnset = errors . New ( "kline.Exchange field should not be empty" )
2023-12-21 10:19:28 +00:00
func ( s * BacktestService ) Insert ( kline types . KLine , ex types . Exchange ) error {
2020-11-06 13:40:48 +00:00
if len ( kline . Exchange ) == 0 {
2022-06-02 13:27:28 +00:00
return errExchangeFieldIsUnset
2020-11-06 13:40:48 +00:00
}
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( ex )
2021-12-13 23:15:18 +00:00
sql := fmt . Sprintf ( "INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)" +
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)" , tableName )
_ , err := s . DB . NamedExec ( sql , kline )
return err
}
2022-06-06 09:21:31 +00:00
// BatchInsert Note: all kline should be same exchange, or it will cause issue.
2023-12-21 10:19:28 +00:00
func ( s * BacktestService ) BatchInsert ( kline [ ] types . KLine , ex types . Exchange ) error {
2022-06-06 09:21:31 +00:00
if len ( kline ) == 0 {
return nil
}
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( ex )
2022-06-06 09:21:31 +00:00
sql := fmt . Sprintf ( "INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)" +
" VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); " , tableName )
tx := s . DB . MustBegin ( )
if _ , err := tx . NamedExec ( sql , kline ) ; err != nil {
2022-06-06 09:57:24 +00:00
if e := tx . Rollback ( ) ; e != nil {
log . WithError ( e ) . Fatalf ( "cannot rollback insertion %v" , err )
}
2022-06-06 09:21:31 +00:00
return err
}
return tx . Commit ( )
}
2022-06-03 18:23:23 +00:00
type TimeRange struct {
Start time . Time
End time . Time
}
2022-06-05 22:27:45 +00:00
func ( t * TimeRange ) String ( ) string {
return t . Start . String ( ) + " ~ " + t . End . String ( )
}
2023-11-02 17:03:16 +00:00
func ( s * BacktestService ) Sync ( ctx context . Context , ex types . Exchange , symbol string , intervals [ ] types . Interval , since , until time . Time ) error {
for _ , interval := range intervals {
t1 , t2 , err := s . queryExistingDataRange ( ctx , ex , symbol , interval , since , until )
if err != nil && err != sql . ErrNoRows {
return err
}
2022-06-08 06:38:47 +00:00
2023-11-02 17:03:16 +00:00
if err == sql . ErrNoRows || t1 == nil || t2 == nil {
// fallback to fresh sync
err := s . syncFresh ( ctx , ex , symbol , interval , since , until )
if err != nil {
return err
}
} else {
err := s . syncPartial ( ctx , ex , symbol , interval , since , until )
if err != nil {
return err
}
}
2022-06-08 06:38:47 +00:00
}
2023-11-02 17:03:16 +00:00
return nil
2022-06-08 06:38:47 +00:00
}
2022-06-03 18:23:23 +00:00
// SyncPartial
// find the existing data time range (t1, t2)
// scan if there is a missing part
// create a time range slice []TimeRange
// iterate the []TimeRange slice to sync data.
2023-11-02 17:03:16 +00:00
func ( s * BacktestService ) syncPartial ( ctx context . Context , ex types . Exchange , symbol string , interval types . Interval , since , until time . Time ) error {
2022-06-08 06:38:47 +00:00
log . Infof ( "starting partial sync %s %s %s: %s <=> %s" , ex . Name ( ) , symbol , interval , since , until )
2023-11-02 17:03:16 +00:00
t1 , t2 , err := s . queryExistingDataRange ( ctx , ex , symbol , interval , since , until )
2022-06-04 17:01:59 +00:00
if err != nil && err != sql . ErrNoRows {
2022-06-03 18:23:23 +00:00
return err
}
2022-06-06 03:46:18 +00:00
if err == sql . ErrNoRows || t1 == nil || t2 == nil {
2022-06-04 17:01:59 +00:00
// fallback to fresh sync
2023-11-02 17:03:16 +00:00
return s . syncFresh ( ctx , ex , symbol , interval , since , until )
2022-06-04 17:01:59 +00:00
}
2023-11-02 17:03:16 +00:00
timeRanges , err := s . findMissingTimeRanges ( ctx , ex , symbol , interval , t1 . Time ( ) , t2 . Time ( ) )
2022-06-03 18:23:23 +00:00
if err != nil {
return err
}
2022-06-04 11:15:11 +00:00
2022-06-04 17:01:59 +00:00
if len ( timeRanges ) > 0 {
2022-06-08 06:38:47 +00:00
log . Infof ( "found missing data time ranges: %v" , timeRanges )
2022-06-04 17:01:59 +00:00
}
2022-06-04 11:15:11 +00:00
// there are few cases:
// t1 == since && t2 == until
2022-06-05 22:02:38 +00:00
// [since] ------- [t1] data [t2] ------ [until]
2022-06-06 16:48:13 +00:00
if since . Before ( t1 . Time ( ) ) && t1 . Time ( ) . Sub ( since ) > interval . Duration ( ) {
2022-06-04 11:15:11 +00:00
// shift slice
timeRanges = append ( [ ] TimeRange {
2022-06-06 16:48:13 +00:00
{ Start : since . Add ( - 2 * time . Second ) , End : t1 . Time ( ) } , // we should include since
2022-06-04 11:15:11 +00:00
} , timeRanges ... )
}
2022-06-06 16:48:13 +00:00
if t2 . Time ( ) . Before ( until ) && until . Sub ( t2 . Time ( ) ) > interval . Duration ( ) {
2022-06-04 11:15:11 +00:00
timeRanges = append ( timeRanges , TimeRange {
Start : t2 . Time ( ) ,
2022-06-06 16:48:13 +00:00
End : until . Add ( - interval . Duration ( ) ) , // include until
2022-06-04 11:15:11 +00:00
} )
}
for _ , timeRange := range timeRanges {
2023-11-02 17:03:16 +00:00
err = s . syncKLineByInterval ( ctx , ex , symbol , interval , timeRange . Start . Add ( time . Second ) , timeRange . End . Add ( - time . Second ) )
2022-06-04 11:15:11 +00:00
if err != nil {
return err
}
}
2022-06-03 18:23:23 +00:00
return nil
}
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
2023-11-02 17:03:16 +00:00
func ( s * BacktestService ) findMissingTimeRanges ( ctx context . Context , ex types . Exchange , symbol string , interval types . Interval , since , until time . Time ) ( [ ] TimeRange , error ) {
2023-12-21 10:19:28 +00:00
query := s . SelectKLineTimePoints ( ex , symbol , interval , since , until )
2022-06-03 18:23:23 +00:00
sql , args , err := query . ToSql ( )
if err != nil {
return nil , err
}
rows , err := s . DB . QueryContext ( ctx , sql , args ... )
2022-06-06 09:57:24 +00:00
defer rows . Close ( )
2022-06-03 18:23:23 +00:00
if err != nil {
return nil , err
}
var timeRanges [ ] TimeRange
2022-06-06 05:25:11 +00:00
var lastTime = since
2022-06-03 18:41:22 +00:00
var intervalDuration = interval . Duration ( )
2022-06-03 18:23:23 +00:00
for rows . Next ( ) {
var tt types . Time
if err := rows . Scan ( & tt ) ; err != nil {
return nil , err
}
var t = time . Time ( tt )
2022-06-06 05:25:11 +00:00
if t . Sub ( lastTime ) > intervalDuration {
2022-06-03 18:23:23 +00:00
timeRanges = append ( timeRanges , TimeRange {
2022-06-03 18:41:22 +00:00
Start : lastTime ,
2022-06-03 18:23:23 +00:00
End : t ,
} )
}
lastTime = t
}
2022-06-06 10:15:36 +00:00
if lastTime . Before ( until ) && until . Sub ( lastTime ) > intervalDuration {
2022-06-06 05:25:11 +00:00
timeRanges = append ( timeRanges , TimeRange {
Start : lastTime ,
End : until ,
} )
}
2022-06-03 18:23:23 +00:00
return timeRanges , nil
}
2023-11-02 17:03:16 +00:00
func ( s * BacktestService ) queryExistingDataRange ( ctx context . Context , ex types . Exchange , symbol string , interval types . Interval , tArgs ... time . Time ) ( start , end * types . Time , err error ) {
2023-12-21 10:19:28 +00:00
sel := s . SelectKLineTimeRange ( ex , symbol , interval , tArgs ... )
2022-06-03 18:23:23 +00:00
sql , args , err := sel . ToSql ( )
if err != nil {
return nil , nil , err
}
var t1 , t2 types . Time
row := s . DB . QueryRowContext ( ctx , sql , args ... )
2022-06-04 17:01:59 +00:00
2022-06-03 18:23:23 +00:00
if err := row . Scan ( & t1 , & t2 ) ; err != nil {
return nil , nil , err
}
if err := row . Err ( ) ; err != nil {
return nil , nil , err
}
2024-01-18 16:34:02 +00:00
if t1 . Time ( ) . IsZero ( ) || t2 . Time ( ) . IsZero ( ) {
2022-06-06 03:46:18 +00:00
return nil , nil , nil
}
2022-06-03 18:23:23 +00:00
return & t1 , & t2 , nil
}
2024-01-18 16:34:02 +00:00
func ( s * BacktestService ) SelectKLineTimePoints (
ex types . Exchange , symbol string , interval types . Interval , args ... time . Time ,
) sq . SelectBuilder {
2022-06-03 18:23:23 +00:00
conditions := sq . And {
sq . Eq { "symbol" : symbol } ,
sq . Eq { "`interval`" : interval . String ( ) } ,
}
if len ( args ) == 2 {
since := args [ 0 ]
until := args [ 1 ]
conditions = append ( conditions , sq . Expr ( "`start_time` BETWEEN ? AND ?" , since , until ) )
}
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( ex )
2022-06-03 18:23:23 +00:00
return sq . Select ( "start_time" ) .
From ( tableName ) .
Where ( conditions ) .
OrderBy ( "start_time ASC" )
}
// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until)
2024-01-18 16:34:02 +00:00
func ( s * BacktestService ) SelectKLineTimeRange (
ex types . Exchange , symbol string , interval types . Interval , args ... time . Time ,
) sq . SelectBuilder {
2022-06-03 18:23:23 +00:00
conditions := sq . And {
sq . Eq { "symbol" : symbol } ,
sq . Eq { "`interval`" : interval . String ( ) } ,
}
if len ( args ) == 2 {
2022-06-06 16:48:13 +00:00
// NOTE
// sqlite does not support timezone format, so we are converting to local timezone
// mysql works in this case, so this is a workaround
2022-06-03 18:23:23 +00:00
since := args [ 0 ]
until := args [ 1 ]
conditions = append ( conditions , sq . Expr ( "`start_time` BETWEEN ? AND ?" , since , until ) )
}
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( ex )
2022-06-03 18:23:23 +00:00
return sq . Select ( "MIN(start_time) AS t1, MAX(start_time) AS t2" ) .
From ( tableName ) .
Where ( conditions )
}
2022-06-02 08:40:24 +00:00
// TODO: add is_futures column since the klines data is different
2024-01-18 16:34:02 +00:00
func ( s * BacktestService ) SelectLastKLines (
ex types . Exchange , symbol string , interval types . Interval , startTime , endTime time . Time , limit uint64 ,
) sq . SelectBuilder {
2023-12-21 10:19:28 +00:00
tableName := targetKlineTable ( ex )
2022-06-02 08:40:24 +00:00
return sq . Select ( "*" ) .
2022-06-03 18:23:23 +00:00
From ( tableName ) .
2022-06-02 08:40:24 +00:00
Where ( sq . And {
sq . Eq { "symbol" : symbol } ,
sq . Eq { "`interval`" : interval . String ( ) } ,
2022-06-03 18:41:22 +00:00
sq . Expr ( "start_time BETWEEN ? AND ?" , startTime , endTime ) ,
2022-06-02 08:40:24 +00:00
} ) .
OrderBy ( "start_time DESC" ) .
Limit ( limit )
}