2020-11-06 13:40:48 +00:00
package service
import (
"context"
2022-06-04 17:01:59 +00:00
"database/sql"
2021-12-13 23:15:18 +00:00
"fmt"
2021-12-07 07:21:37 +00:00
"strconv"
2020-11-06 13:40:48 +00:00
"strings"
"time"
2022-06-02 08:40:24 +00:00
sq "github.com/Masterminds/squirrel"
2020-11-06 13:40:48 +00:00
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2022-05-30 16:59:33 +00:00
"github.com/c9s/bbgo/pkg/exchange/batch"
2020-11-06 13:40:48 +00:00
"github.com/c9s/bbgo/pkg/types"
)
type BacktestService struct {
DB * sqlx . DB
}
2021-03-14 03:04:56 +00:00
func ( s * BacktestService ) SyncKLineByInterval ( ctx context . Context , exchange types . Exchange , symbol string , interval types . Interval , startTime , endTime time . Time ) error {
2022-06-03 18:41:22 +00:00
log . Infof ( "synchronizing %s klines with interval %s: %s <=> %s" , exchange . Name ( ) , interval , startTime , endTime )
2020-11-06 13:40:48 +00:00
2022-06-02 08:40:24 +00:00
// TODO: use isFutures here
_ , _ , isIsolated , isolatedSymbol := getExchangeAttributes ( exchange )
// override symbol if isolatedSymbol is not empty
if isIsolated && len ( isolatedSymbol ) > 0 {
symbol = isolatedSymbol
}
2022-06-06 06:53:37 +00:00
if s . DB . DriverName ( ) == "sqlite3" {
_ , _ = s . DB . Exec ( "PRAGMA journal_mode = WAL" )
_ , _ = s . DB . Exec ( "PRAGMA synchronous = NORMAL" )
}
2022-06-02 08:40:24 +00:00
tasks := [ ] SyncTask {
{
2022-06-03 18:23:23 +00:00
Type : types . KLine { } ,
2022-06-03 18:41:22 +00:00
Select : SelectLastKLines ( exchange . Name ( ) , symbol , interval , startTime , endTime , 100 ) ,
2022-06-02 08:40:24 +00:00
Time : func ( obj interface { } ) time . Time {
2022-06-06 16:48:13 +00:00
return obj . ( types . KLine ) . StartTime . Time ( )
2022-06-02 08:40:24 +00:00
} ,
ID : func ( obj interface { } ) string {
kline := obj . ( types . KLine )
2022-06-06 16:48:13 +00:00
return strconv . FormatInt ( kline . StartTime . UnixMilli ( ) , 10 )
// return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
2022-06-02 08:40:24 +00:00
} ,
BatchQuery : func ( ctx context . Context , startTime , endTime time . Time ) ( interface { } , chan error ) {
q := & batch . KLineBatchQuery { Exchange : exchange }
return q . Query ( ctx , symbol , interval , startTime , endTime )
} ,
2022-06-06 17:21:27 +00:00
BatchInsertBuffer : 500 ,
2022-06-07 04:27:32 +00:00
BatchInsert : func ( obj interface { } ) error {
kLines := obj . ( [ ] types . KLine )
return s . BatchInsert ( kLines )
} ,
2022-06-06 16:48:13 +00:00
Insert : func ( obj interface { } ) error {
kline := obj . ( types . KLine )
return s . Insert ( kline )
2022-06-02 08:40:24 +00:00
} ,
2022-06-06 04:24:18 +00:00
LogInsert : log . GetLevel ( ) == log . DebugLevel ,
2022-06-02 08:40:24 +00:00
} ,
}
2020-11-06 16:49:17 +00:00
2022-06-02 08:40:24 +00:00
for _ , sel := range tasks {
2022-06-03 18:23:23 +00:00
if err := sel . execute ( ctx , s . DB , startTime , endTime ) ; err != nil {
2021-03-14 03:04:56 +00:00
return err
2020-11-06 13:40:48 +00:00
}
2021-03-14 03:04:56 +00:00
}
2022-06-02 08:40:24 +00:00
return nil
2021-03-14 03:04:56 +00:00
}
2020-11-06 16:49:17 +00:00
2022-06-05 22:03:25 +00:00
func ( s * BacktestService ) Verify ( sourceExchange types . Exchange , symbols [ ] string , startTime time . Time , endTime time . Time ) error {
2021-12-14 16:59:28 +00:00
var corruptCnt = 0
for _ , symbol := range symbols {
for interval := range types . SupportedIntervals {
2022-06-05 22:03:25 +00:00
log . Infof ( "verifying %s %s backtesting data: %s to %s..." , symbol , interval , startTime , endTime )
2022-06-05 22:02:38 +00:00
2022-06-06 09:21:31 +00:00
timeRanges , err := s . FindMissingTimeRanges ( context . Background ( ) , sourceExchange , symbol , interval ,
startTime , endTime )
2022-06-05 22:02:38 +00:00
if err != nil {
return err
2021-12-14 16:59:28 +00:00
}
2021-12-13 23:15:18 +00:00
2022-06-05 22:02:38 +00:00
if len ( timeRanges ) == 0 {
continue
2021-12-14 16:59:28 +00:00
}
2021-12-13 23:15:18 +00:00
2022-06-06 04:15:06 +00:00
log . Warnf ( "%s %s found missing time ranges:" , symbol , interval )
2022-06-05 22:02:38 +00:00
corruptCnt += len ( timeRanges )
for _ , timeRange := range timeRanges {
2022-06-06 04:15:06 +00:00
log . Warnf ( "- %s" , timeRange . String ( ) )
2021-12-14 16:59:28 +00:00
}
}
2021-12-13 23:15:18 +00:00
}
2021-12-14 16:59:28 +00:00
log . Infof ( "backtest verification completed" )
if corruptCnt > 0 {
log . Errorf ( "found %d corruptions" , corruptCnt )
} else {
log . Infof ( "found %d corruptions" , corruptCnt )
2020-11-06 13:40:48 +00:00
}
2022-05-11 05:59:44 +00:00
return nil
2021-12-14 16:59:28 +00:00
}
2022-06-06 09:21:31 +00:00
func ( s * BacktestService ) SyncFresh ( ctx context . Context , exchange types . Exchange , symbol string , interval types . Interval , startTime , endTime time . Time ) error {
2022-06-08 06:38:47 +00:00
log . Infof ( "starting fresh sync %s %s %s: %s <=> %s" , exchange . Name ( ) , symbol , interval , startTime , endTime )
2022-06-06 09:21:31 +00:00
startTime = startTime . Truncate ( time . Minute ) . Add ( - 2 * time . Second )
endTime = endTime . Truncate ( time . Minute ) . Add ( 2 * time . Second )
2021-12-14 16:59:28 +00:00
return s . SyncKLineByInterval ( ctx , exchange , symbol , interval , startTime , endTime )
2020-11-06 13:40:48 +00:00
}
2021-12-07 07:21:37 +00:00
// QueryKLine queries the klines from the database
func ( s * BacktestService ) QueryKLine ( ex types . ExchangeName , symbol string , interval types . Interval , orderBy string , limit int ) ( * types . KLine , error ) {
2020-11-06 13:40:48 +00:00
log . Infof ( "querying last kline exchange = %s AND symbol = %s AND interval = %s" , ex , symbol , interval )
2022-06-03 18:23:23 +00:00
tableName := targetKlineTable ( ex )
2020-11-06 13:40:48 +00:00
// make the SQL syntax IDE friendly, so that it can analyze it.
2022-06-03 18:23:23 +00:00
sql := fmt . Sprintf ( "SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time " + orderBy + " LIMIT " + strconv . Itoa ( limit ) , tableName )
2020-11-06 13:40:48 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
"interval" : interval ,
"symbol" : symbol ,
} )
2022-06-06 09:57:24 +00:00
defer rows . Close ( )
2020-11-06 13:40:48 +00:00
if err != nil {
2021-12-07 07:21:37 +00:00
return nil , errors . Wrap ( err , "query kline error" )
2020-11-06 13:40:48 +00:00
}
if rows . Err ( ) != nil {
return nil , rows . Err ( )
}
if rows . Next ( ) {
var kline types . KLine
err = rows . StructScan ( & kline )
return & kline , err
}
return nil , rows . Err ( )
}
2022-06-02 09:24:54 +00:00
// QueryKLinesForward is used for querying klines to back-testing
2021-05-07 16:57:25 +00:00
func ( s * BacktestService ) QueryKLinesForward ( exchange types . ExchangeName , symbol string , interval types . Interval , startTime time . Time , limit int ) ( [ ] types . KLine , error ) {
2022-06-03 18:23:23 +00:00
tableName := targetKlineTable ( exchange )
2021-12-13 23:15:18 +00:00
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
sql = strings . ReplaceAll ( sql , "binance_klines" , tableName )
2020-11-08 04:13:34 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-05-07 16:57:25 +00:00
"start_time" : startTime ,
2021-10-16 05:49:00 +00:00
"limit" : limit ,
"symbol" : symbol ,
"interval" : interval ,
2021-12-13 23:15:18 +00:00
"exchange" : exchange . String ( ) ,
2020-11-08 04:13:34 +00:00
} )
if err != nil {
return nil , err
}
return s . scanRows ( rows )
}
2021-05-07 16:57:25 +00:00
func ( s * BacktestService ) QueryKLinesBackward ( exchange types . ExchangeName , symbol string , interval types . Interval , endTime time . Time , limit int ) ( [ ] types . KLine , error ) {
2022-06-03 18:23:23 +00:00
tableName := targetKlineTable ( exchange )
2021-12-13 23:15:18 +00:00
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
sql = strings . ReplaceAll ( sql , "binance_klines" , tableName )
2021-10-16 05:49:00 +00:00
sql = "SELECT t.* FROM (" + sql + ") AS t ORDER BY t.end_time ASC"
2020-11-08 04:13:34 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-05-07 16:57:25 +00:00
"limit" : limit ,
2021-10-16 05:49:00 +00:00
"end_time" : endTime ,
2020-11-08 04:13:34 +00:00
"symbol" : symbol ,
"interval" : interval ,
2021-12-13 23:15:18 +00:00
"exchange" : exchange . String ( ) ,
2020-11-08 04:13:34 +00:00
} )
if err != nil {
return nil , err
}
return s . scanRows ( rows )
}
2020-11-10 11:06:20 +00:00
func ( s * BacktestService ) QueryKLinesCh ( since , until time . Time , exchange types . Exchange , symbols [ ] string , intervals [ ] types . Interval ) ( chan types . KLine , chan error ) {
2021-12-13 23:15:18 +00:00
if len ( symbols ) == 0 {
2021-12-20 12:28:22 +00:00
return returnError ( errors . Errorf ( "symbols is empty when querying kline, plesae check your strategy setting. " ) )
2021-12-13 23:15:18 +00:00
}
2022-06-03 18:23:23 +00:00
tableName := targetKlineTable ( exchange . Name ( ) )
2022-06-09 07:50:23 +00:00
var query string
if len ( symbols ) == 1 {
query = "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` = :symbols AND `interval` IN (:intervals) ORDER BY end_time ASC"
} else {
query = "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) ORDER BY end_time ASC"
}
query = strings . ReplaceAll ( query , "binance_klines" , tableName )
2020-11-06 13:40:48 +00:00
2022-06-09 07:50:23 +00:00
sql , args , err := sqlx . Named ( query , map [ string ] interface { } {
2020-11-06 13:40:48 +00:00
"since" : since ,
2020-11-10 11:06:20 +00:00
"until" : until ,
2022-06-09 07:50:23 +00:00
"symbol" : symbols [ 0 ] ,
2020-11-07 12:11:07 +00:00
"symbols" : symbols ,
2020-11-06 16:49:17 +00:00
"intervals" : types . IntervalSlice ( intervals ) ,
2020-11-06 13:40:48 +00:00
} )
2020-11-10 11:06:20 +00:00
2020-11-06 16:49:17 +00:00
sql , args , err = sqlx . In ( sql , args ... )
2021-12-20 12:28:22 +00:00
if err != nil {
return returnError ( err )
}
2020-11-06 16:49:17 +00:00
sql = s . DB . Rebind ( sql )
rows , err := s . DB . Queryx ( sql , args ... )
2020-11-06 13:40:48 +00:00
if err != nil {
2021-12-20 12:28:22 +00:00
return returnError ( err )
2020-11-06 13:40:48 +00:00
}
2020-11-06 16:49:17 +00:00
return s . scanRowsCh ( rows )
2020-11-06 13:40:48 +00:00
}
2021-12-20 12:28:22 +00:00
func returnError ( err error ) ( chan types . KLine , chan error ) {
2022-06-17 11:19:51 +00:00
ch := make ( chan types . KLine )
2021-12-20 12:28:22 +00:00
close ( ch )
log . WithError ( err ) . Error ( "backtest query error" )
errC := make ( chan error , 1 )
// avoid blocking
go func ( ) {
errC <- err
close ( errC )
} ( )
return ch , errC
}
2020-11-06 13:40:48 +00:00
// scanRowsCh scan rows into channel
2020-11-06 16:49:17 +00:00
func ( s * BacktestService ) scanRowsCh ( rows * sqlx . Rows ) ( chan types . KLine , chan error ) {
2020-11-07 12:11:07 +00:00
ch := make ( chan types . KLine , 500 )
2020-11-06 16:49:17 +00:00
errC := make ( chan error , 1 )
2020-11-06 13:40:48 +00:00
go func ( ) {
2020-11-06 16:49:17 +00:00
defer close ( errC )
2020-11-07 12:11:07 +00:00
defer close ( ch )
2020-11-06 13:40:48 +00:00
defer rows . Close ( )
for rows . Next ( ) {
var kline types . KLine
if err := rows . StructScan ( & kline ) ; err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
2020-11-06 13:40:48 +00:00
}
ch <- kline
}
if err := rows . Err ( ) ; err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
2020-11-06 13:40:48 +00:00
}
2020-11-07 12:11:07 +00:00
2020-11-06 13:40:48 +00:00
} ( )
2020-11-06 16:49:17 +00:00
return ch , errC
2020-11-06 13:40:48 +00:00
}
func ( s * BacktestService ) scanRows ( rows * sqlx . Rows ) ( klines [ ] types . KLine , err error ) {
2022-06-06 09:57:24 +00:00
defer rows . Close ( )
2020-11-06 13:40:48 +00:00
for rows . Next ( ) {
var kline types . KLine
if err := rows . StructScan ( & kline ) ; err != nil {
return nil , err
}
klines = append ( klines , kline )
}
return klines , rows . Err ( )
}
2022-06-03 18:23:23 +00:00
func targetKlineTable ( exchangeName types . ExchangeName ) string {
2022-06-02 13:27:28 +00:00
return strings . ToLower ( exchangeName . String ( ) ) + "_klines"
2021-12-13 23:15:18 +00:00
}
2022-06-02 13:27:28 +00:00
var errExchangeFieldIsUnset = errors . New ( "kline.Exchange field should not be empty" )
2020-11-06 13:40:48 +00:00
func ( s * BacktestService ) Insert ( kline types . KLine ) error {
if len ( kline . Exchange ) == 0 {
2022-06-02 13:27:28 +00:00
return errExchangeFieldIsUnset
2020-11-06 13:40:48 +00:00
}
2022-06-03 18:23:23 +00:00
tableName := targetKlineTable ( kline . Exchange )
2021-12-13 23:15:18 +00:00
sql := fmt . Sprintf ( "INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)" +
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)" , tableName )
_ , err := s . DB . NamedExec ( sql , kline )
return err
}
2022-06-06 09:21:31 +00:00
// BatchInsert Note: all kline should be same exchange, or it will cause issue.
func ( s * BacktestService ) BatchInsert ( kline [ ] types . KLine ) error {
if len ( kline ) == 0 {
return nil
}
tableName := targetKlineTable ( kline [ 0 ] . Exchange )
sql := fmt . Sprintf ( "INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)" +
" VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); " , tableName )
tx := s . DB . MustBegin ( )
if _ , err := tx . NamedExec ( sql , kline ) ; err != nil {
2022-06-06 09:57:24 +00:00
if e := tx . Rollback ( ) ; e != nil {
log . WithError ( e ) . Fatalf ( "cannot rollback insertion %v" , err )
}
2022-06-06 09:21:31 +00:00
return err
}
return tx . Commit ( )
}
2022-06-03 18:23:23 +00:00
type TimeRange struct {
Start time . Time
End time . Time
}
2022-06-05 22:27:45 +00:00
func ( t * TimeRange ) String ( ) string {
return t . Start . String ( ) + " ~ " + t . End . String ( )
}
2022-06-08 06:38:47 +00:00
func ( s * BacktestService ) Sync ( ctx context . Context , ex types . Exchange , symbol string , interval types . Interval , since , until time . Time ) error {
t1 , t2 , err := s . QueryExistingDataRange ( ctx , ex , symbol , interval , since , until )
if err != nil && err != sql . ErrNoRows {
return err
}
if err == sql . ErrNoRows || t1 == nil || t2 == nil {
// fallback to fresh sync
return s . SyncFresh ( ctx , ex , symbol , interval , since , until )
}
return s . SyncPartial ( ctx , ex , symbol , interval , since , until )
}
2022-06-03 18:23:23 +00:00
// SyncPartial
// find the existing data time range (t1, t2)
// scan if there is a missing part
// create a time range slice []TimeRange
// iterate the []TimeRange slice to sync data.
func ( s * BacktestService ) SyncPartial ( ctx context . Context , ex types . Exchange , symbol string , interval types . Interval , since , until time . Time ) error {
2022-06-08 06:38:47 +00:00
log . Infof ( "starting partial sync %s %s %s: %s <=> %s" , ex . Name ( ) , symbol , interval , since , until )
2022-06-03 18:23:23 +00:00
t1 , t2 , err := s . QueryExistingDataRange ( ctx , ex , symbol , interval , since , until )
2022-06-04 17:01:59 +00:00
if err != nil && err != sql . ErrNoRows {
2022-06-03 18:23:23 +00:00
return err
}
2022-06-06 03:46:18 +00:00
if err == sql . ErrNoRows || t1 == nil || t2 == nil {
2022-06-04 17:01:59 +00:00
// fallback to fresh sync
2022-06-06 09:21:31 +00:00
return s . SyncFresh ( ctx , ex , symbol , interval , since , until )
2022-06-04 17:01:59 +00:00
}
2022-06-03 18:23:23 +00:00
timeRanges , err := s . FindMissingTimeRanges ( ctx , ex , symbol , interval , t1 . Time ( ) , t2 . Time ( ) )
if err != nil {
return err
}
2022-06-04 11:15:11 +00:00
2022-06-04 17:01:59 +00:00
if len ( timeRanges ) > 0 {
2022-06-08 06:38:47 +00:00
log . Infof ( "found missing data time ranges: %v" , timeRanges )
2022-06-04 17:01:59 +00:00
}
2022-06-04 11:15:11 +00:00
// there are few cases:
// t1 == since && t2 == until
2022-06-05 22:02:38 +00:00
// [since] ------- [t1] data [t2] ------ [until]
2022-06-06 16:48:13 +00:00
if since . Before ( t1 . Time ( ) ) && t1 . Time ( ) . Sub ( since ) > interval . Duration ( ) {
2022-06-04 11:15:11 +00:00
// shift slice
timeRanges = append ( [ ] TimeRange {
2022-06-06 16:48:13 +00:00
{ Start : since . Add ( - 2 * time . Second ) , End : t1 . Time ( ) } , // we should include since
2022-06-04 11:15:11 +00:00
} , timeRanges ... )
}
2022-06-06 16:48:13 +00:00
if t2 . Time ( ) . Before ( until ) && until . Sub ( t2 . Time ( ) ) > interval . Duration ( ) {
2022-06-04 11:15:11 +00:00
timeRanges = append ( timeRanges , TimeRange {
Start : t2 . Time ( ) ,
2022-06-06 16:48:13 +00:00
End : until . Add ( - interval . Duration ( ) ) , // include until
2022-06-04 11:15:11 +00:00
} )
}
for _ , timeRange := range timeRanges {
2022-06-06 17:21:27 +00:00
err = s . SyncKLineByInterval ( ctx , ex , symbol , interval , timeRange . Start . Add ( time . Second ) , timeRange . End . Add ( - time . Second ) )
2022-06-04 11:15:11 +00:00
if err != nil {
return err
}
}
2022-06-03 18:23:23 +00:00
return nil
}
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
func ( s * BacktestService ) FindMissingTimeRanges ( ctx context . Context , ex types . Exchange , symbol string , interval types . Interval , since , until time . Time ) ( [ ] TimeRange , error ) {
query := SelectKLineTimePoints ( ex . Name ( ) , symbol , interval , since , until )
sql , args , err := query . ToSql ( )
if err != nil {
return nil , err
}
rows , err := s . DB . QueryContext ( ctx , sql , args ... )
2022-06-06 09:57:24 +00:00
defer rows . Close ( )
2022-06-03 18:23:23 +00:00
if err != nil {
return nil , err
}
var timeRanges [ ] TimeRange
2022-06-06 05:25:11 +00:00
var lastTime = since
2022-06-03 18:41:22 +00:00
var intervalDuration = interval . Duration ( )
2022-06-03 18:23:23 +00:00
for rows . Next ( ) {
var tt types . Time
if err := rows . Scan ( & tt ) ; err != nil {
return nil , err
}
var t = time . Time ( tt )
2022-06-06 05:25:11 +00:00
if t . Sub ( lastTime ) > intervalDuration {
2022-06-03 18:23:23 +00:00
timeRanges = append ( timeRanges , TimeRange {
2022-06-03 18:41:22 +00:00
Start : lastTime ,
2022-06-03 18:23:23 +00:00
End : t ,
} )
}
lastTime = t
}
2022-06-06 10:15:36 +00:00
if lastTime . Before ( until ) && until . Sub ( lastTime ) > intervalDuration {
2022-06-06 05:25:11 +00:00
timeRanges = append ( timeRanges , TimeRange {
Start : lastTime ,
End : until ,
} )
}
2022-06-03 18:23:23 +00:00
return timeRanges , nil
}
func ( s * BacktestService ) QueryExistingDataRange ( ctx context . Context , ex types . Exchange , symbol string , interval types . Interval , tArgs ... time . Time ) ( start , end * types . Time , err error ) {
sel := SelectKLineTimeRange ( ex . Name ( ) , symbol , interval , tArgs ... )
sql , args , err := sel . ToSql ( )
if err != nil {
return nil , nil , err
}
var t1 , t2 types . Time
row := s . DB . QueryRowContext ( ctx , sql , args ... )
2022-06-04 17:01:59 +00:00
2022-06-03 18:23:23 +00:00
if err := row . Scan ( & t1 , & t2 ) ; err != nil {
return nil , nil , err
}
if err := row . Err ( ) ; err != nil {
return nil , nil , err
}
2022-06-06 03:46:18 +00:00
if t1 == ( types . Time { } ) || t2 == ( types . Time { } ) {
return nil , nil , nil
}
2022-06-03 18:23:23 +00:00
return & t1 , & t2 , nil
}
func SelectKLineTimePoints ( ex types . ExchangeName , symbol string , interval types . Interval , args ... time . Time ) sq . SelectBuilder {
conditions := sq . And {
sq . Eq { "symbol" : symbol } ,
sq . Eq { "`interval`" : interval . String ( ) } ,
}
if len ( args ) == 2 {
since := args [ 0 ]
until := args [ 1 ]
conditions = append ( conditions , sq . Expr ( "`start_time` BETWEEN ? AND ?" , since , until ) )
}
tableName := targetKlineTable ( ex )
return sq . Select ( "start_time" ) .
From ( tableName ) .
Where ( conditions ) .
OrderBy ( "start_time ASC" )
}
// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until)
func SelectKLineTimeRange ( ex types . ExchangeName , symbol string , interval types . Interval , args ... time . Time ) sq . SelectBuilder {
conditions := sq . And {
sq . Eq { "symbol" : symbol } ,
sq . Eq { "`interval`" : interval . String ( ) } ,
}
if len ( args ) == 2 {
2022-06-06 16:48:13 +00:00
// NOTE
// sqlite does not support timezone format, so we are converting to local timezone
// mysql works in this case, so this is a workaround
2022-06-03 18:23:23 +00:00
since := args [ 0 ]
until := args [ 1 ]
conditions = append ( conditions , sq . Expr ( "`start_time` BETWEEN ? AND ?" , since , until ) )
}
tableName := targetKlineTable ( ex )
return sq . Select ( "MIN(start_time) AS t1, MAX(start_time) AS t2" ) .
From ( tableName ) .
Where ( conditions )
}
2022-06-02 08:40:24 +00:00
// TODO: add is_futures column since the klines data is different
2022-06-03 18:41:22 +00:00
func SelectLastKLines ( ex types . ExchangeName , symbol string , interval types . Interval , startTime , endTime time . Time , limit uint64 ) sq . SelectBuilder {
2022-06-03 18:23:23 +00:00
tableName := targetKlineTable ( ex )
2022-06-02 08:40:24 +00:00
return sq . Select ( "*" ) .
2022-06-03 18:23:23 +00:00
From ( tableName ) .
2022-06-02 08:40:24 +00:00
Where ( sq . And {
sq . Eq { "symbol" : symbol } ,
sq . Eq { "`interval`" : interval . String ( ) } ,
2022-06-03 18:41:22 +00:00
sq . Expr ( "start_time BETWEEN ? AND ?" , startTime , endTime ) ,
2022-06-02 08:40:24 +00:00
} ) .
OrderBy ( "start_time DESC" ) .
Limit ( limit )
}