2020-11-06 13:40:48 +00:00
package service
import (
"context"
2021-12-13 23:15:18 +00:00
"fmt"
2021-12-14 16:59:28 +00:00
"os"
2021-12-07 07:21:37 +00:00
"strconv"
2020-11-06 13:40:48 +00:00
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2021-02-18 09:37:49 +00:00
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
2020-11-06 13:40:48 +00:00
"github.com/c9s/bbgo/pkg/types"
)
type BacktestService struct {
DB * sqlx . DB
}
2021-03-14 03:04:56 +00:00
func ( s * BacktestService ) SyncKLineByInterval ( ctx context . Context , exchange types . Exchange , symbol string , interval types . Interval , startTime , endTime time . Time ) error {
log . Infof ( "synchronizing lastKLine for interval %s from exchange %s" , interval , exchange . Name ( ) )
2020-11-06 13:40:48 +00:00
2021-03-14 03:04:56 +00:00
batch := & batch2 . KLineBatchQuery { Exchange : exchange }
2020-11-06 16:49:17 +00:00
2021-03-14 03:04:56 +00:00
// should use channel here
klineC , errC := batch . Query ( ctx , symbol , interval , startTime , endTime )
2021-12-13 23:15:18 +00:00
2021-03-14 03:04:56 +00:00
// var previousKLine types.KLine
2021-12-13 23:15:18 +00:00
count := 0
for klines := range klineC {
if err := s . BatchInsert ( klines ) ; err != nil {
2021-03-14 03:04:56 +00:00
return err
2020-11-06 13:40:48 +00:00
}
2021-12-13 23:15:18 +00:00
count += len ( klines )
2021-03-14 03:04:56 +00:00
}
2021-12-13 23:15:18 +00:00
log . Infof ( "found %s kline %s data count: %d" , symbol , interval . String ( ) , count )
2021-03-14 03:04:56 +00:00
if err := <- errC ; err != nil {
return err
}
return nil
}
2020-11-06 16:49:17 +00:00
2021-12-14 16:59:28 +00:00
func ( s * BacktestService ) Verify ( symbols [ ] string , startTime time . Time , endTime time . Time , sourceExchange types . Exchange , verboseCnt int ) ( error , bool ) {
var corruptCnt = 0
for _ , symbol := range symbols {
log . Infof ( "verifying backtesting data..." )
for interval := range types . SupportedIntervals {
log . Infof ( "verifying %s %s kline data..." , symbol , interval )
klineC , errC := s . QueryKLinesCh ( startTime , time . Now ( ) , sourceExchange , [ ] string { symbol } , [ ] types . Interval { interval } )
var emptyKLine types . KLine
var prevKLine types . KLine
for k := range klineC {
if verboseCnt > 1 {
fmt . Fprint ( os . Stderr , "." )
}
if prevKLine != emptyKLine {
if prevKLine . StartTime . Unix ( ) == k . StartTime . Unix ( ) {
s . _deleteDuplicatedKLine ( k )
log . Errorf ( "found kline data duplicated at time: %s kline: %+v , deleted it" , k . StartTime , k )
} else if prevKLine . StartTime . Add ( interval . Duration ( ) ) != k . StartTime {
corruptCnt ++
log . Errorf ( "found kline data corrupted at time: %s kline: %+v" , k . StartTime , k )
log . Errorf ( "between %d and %d" ,
prevKLine . StartTime . Unix ( ) ,
k . StartTime . Unix ( ) )
}
}
prevKLine = k
}
2021-12-13 23:15:18 +00:00
2021-12-14 16:59:28 +00:00
if verboseCnt > 1 {
fmt . Fprintln ( os . Stderr )
}
2021-12-13 23:15:18 +00:00
2021-12-14 16:59:28 +00:00
if err := <- errC ; err != nil {
return err , true
}
}
2021-12-13 23:15:18 +00:00
}
2021-12-14 16:59:28 +00:00
log . Infof ( "backtest verification completed" )
if corruptCnt > 0 {
log . Errorf ( "found %d corruptions" , corruptCnt )
} else {
log . Infof ( "found %d corruptions" , corruptCnt )
2020-11-06 13:40:48 +00:00
}
2021-12-14 16:59:28 +00:00
return nil , false
}
func ( s * BacktestService ) Sync ( ctx context . Context , exchange types . Exchange , symbol string ,
startTime time . Time , endTime time . Time , interval types . Interval ) error {
return s . SyncKLineByInterval ( ctx , exchange , symbol , interval , startTime , endTime )
2020-11-06 13:40:48 +00:00
}
2021-12-07 07:21:37 +00:00
func ( s * BacktestService ) QueryFirstKLine ( ex types . ExchangeName , symbol string , interval types . Interval ) ( * types . KLine , error ) {
return s . QueryKLine ( ex , symbol , interval , "ASC" , 1 )
}
// QueryLastKLine queries the last kline from the database
func ( s * BacktestService ) QueryLastKLine ( ex types . ExchangeName , symbol string , interval types . Interval ) ( * types . KLine , error ) {
return s . QueryKLine ( ex , symbol , interval , "DESC" , 1 )
}
// QueryKLine queries the klines from the database
func ( s * BacktestService ) QueryKLine ( ex types . ExchangeName , symbol string , interval types . Interval , orderBy string , limit int ) ( * types . KLine , error ) {
2020-11-06 13:40:48 +00:00
log . Infof ( "querying last kline exchange = %s AND symbol = %s AND interval = %s" , ex , symbol , interval )
2021-12-13 23:15:18 +00:00
tableName := s . _targetKlineTable ( ex )
2020-11-06 13:40:48 +00:00
// make the SQL syntax IDE friendly, so that it can analyze it.
2021-12-13 23:15:18 +00:00
sql := fmt . Sprintf ( "SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time " + orderBy + " LIMIT " + strconv . Itoa ( limit ) , tableName )
2020-11-06 13:40:48 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-12-13 23:15:18 +00:00
"exchange" : ex . String ( ) ,
2020-11-06 13:40:48 +00:00
"interval" : interval ,
"symbol" : symbol ,
} )
if err != nil {
2021-12-07 07:21:37 +00:00
return nil , errors . Wrap ( err , "query kline error" )
2020-11-06 13:40:48 +00:00
}
if rows . Err ( ) != nil {
return nil , rows . Err ( )
}
defer rows . Close ( )
if rows . Next ( ) {
var kline types . KLine
err = rows . StructScan ( & kline )
return & kline , err
}
return nil , rows . Err ( )
}
2021-05-07 16:57:25 +00:00
func ( s * BacktestService ) QueryKLinesForward ( exchange types . ExchangeName , symbol string , interval types . Interval , startTime time . Time , limit int ) ( [ ] types . KLine , error ) {
2021-12-13 23:15:18 +00:00
tableName := s . _targetKlineTable ( exchange )
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
sql = strings . ReplaceAll ( sql , "binance_klines" , tableName )
2020-11-08 04:13:34 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-05-07 16:57:25 +00:00
"start_time" : startTime ,
2021-10-16 05:49:00 +00:00
"limit" : limit ,
"symbol" : symbol ,
"interval" : interval ,
2021-12-13 23:15:18 +00:00
"exchange" : exchange . String ( ) ,
2020-11-08 04:13:34 +00:00
} )
if err != nil {
return nil , err
}
return s . scanRows ( rows )
}
2021-05-07 16:57:25 +00:00
func ( s * BacktestService ) QueryKLinesBackward ( exchange types . ExchangeName , symbol string , interval types . Interval , endTime time . Time , limit int ) ( [ ] types . KLine , error ) {
2021-12-13 23:15:18 +00:00
tableName := s . _targetKlineTable ( exchange )
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
sql = strings . ReplaceAll ( sql , "binance_klines" , tableName )
2021-10-16 05:49:00 +00:00
sql = "SELECT t.* FROM (" + sql + ") AS t ORDER BY t.end_time ASC"
2020-11-08 04:13:34 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-05-07 16:57:25 +00:00
"limit" : limit ,
2021-10-16 05:49:00 +00:00
"end_time" : endTime ,
2020-11-08 04:13:34 +00:00
"symbol" : symbol ,
"interval" : interval ,
2021-12-13 23:15:18 +00:00
"exchange" : exchange . String ( ) ,
2020-11-08 04:13:34 +00:00
} )
if err != nil {
return nil , err
}
return s . scanRows ( rows )
}
2020-11-10 11:06:20 +00:00
func ( s * BacktestService ) QueryKLinesCh ( since , until time . Time , exchange types . Exchange , symbols [ ] string , intervals [ ] types . Interval ) ( chan types . KLine , chan error ) {
2021-12-13 23:15:18 +00:00
if len ( symbols ) == 0 {
errC := make ( chan error , 1 )
// avoid blocking
go func ( ) {
errC <- errors . Errorf ( "symbols is empty when querying kline, plesae check your strategy setting. " )
close ( errC )
} ( )
return nil , errC
}
tableName := s . _targetKlineTable ( exchange . Name ( ) )
sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) and exchange = :exchange ORDER BY end_time ASC"
sql = strings . ReplaceAll ( sql , "binance_klines" , tableName )
2020-11-06 13:40:48 +00:00
2020-11-06 16:49:17 +00:00
sql , args , err := sqlx . Named ( sql , map [ string ] interface { } {
2020-11-06 13:40:48 +00:00
"since" : since ,
2020-11-10 11:06:20 +00:00
"until" : until ,
2020-11-07 12:11:07 +00:00
"symbols" : symbols ,
2020-11-06 16:49:17 +00:00
"intervals" : types . IntervalSlice ( intervals ) ,
2021-12-13 23:15:18 +00:00
"exchange" : exchange . Name ( ) . String ( ) ,
2020-11-06 13:40:48 +00:00
} )
2020-11-10 11:06:20 +00:00
2020-11-06 16:49:17 +00:00
sql , args , err = sqlx . In ( sql , args ... )
sql = s . DB . Rebind ( sql )
rows , err := s . DB . Queryx ( sql , args ... )
2020-11-06 13:40:48 +00:00
if err != nil {
2021-12-13 23:15:18 +00:00
log . WithError ( err ) . Error ( "backtest query error" )
2020-11-06 16:49:17 +00:00
errC := make ( chan error , 1 )
// avoid blocking
go func ( ) {
errC <- err
close ( errC )
} ( )
return nil , errC
2020-11-06 13:40:48 +00:00
}
2020-11-06 16:49:17 +00:00
return s . scanRowsCh ( rows )
2020-11-06 13:40:48 +00:00
}
// scanRowsCh scan rows into channel
2020-11-06 16:49:17 +00:00
func ( s * BacktestService ) scanRowsCh ( rows * sqlx . Rows ) ( chan types . KLine , chan error ) {
2020-11-07 12:11:07 +00:00
ch := make ( chan types . KLine , 500 )
2020-11-06 16:49:17 +00:00
errC := make ( chan error , 1 )
2020-11-06 13:40:48 +00:00
go func ( ) {
2020-11-06 16:49:17 +00:00
defer close ( errC )
2020-11-07 12:11:07 +00:00
defer close ( ch )
2020-11-06 13:40:48 +00:00
defer rows . Close ( )
for rows . Next ( ) {
var kline types . KLine
if err := rows . StructScan ( & kline ) ; err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
2020-11-06 13:40:48 +00:00
}
ch <- kline
}
if err := rows . Err ( ) ; err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
2020-11-06 13:40:48 +00:00
}
2020-11-07 12:11:07 +00:00
2020-11-06 13:40:48 +00:00
} ( )
2020-11-06 16:49:17 +00:00
return ch , errC
2020-11-06 13:40:48 +00:00
}
func ( s * BacktestService ) scanRows ( rows * sqlx . Rows ) ( klines [ ] types . KLine , err error ) {
for rows . Next ( ) {
var kline types . KLine
if err := rows . StructScan ( & kline ) ; err != nil {
return nil , err
}
klines = append ( klines , kline )
}
return klines , rows . Err ( )
}
2021-12-13 23:15:18 +00:00
func ( s * BacktestService ) _targetKlineTable ( exchangeName types . ExchangeName ) string {
switch exchangeName {
case types . ExchangeBinance :
return "binance_klines"
case types . ExchangeFTX :
return "ftx_klines"
case types . ExchangeMax :
return "max_klines"
case types . ExchangeOKEx :
return "okex_klines"
default :
return "klines"
}
}
2020-11-06 13:40:48 +00:00
func ( s * BacktestService ) Insert ( kline types . KLine ) error {
if len ( kline . Exchange ) == 0 {
return errors . New ( "kline.Exchange field should not be empty" )
}
2021-12-13 23:15:18 +00:00
tableName := s . _targetKlineTable ( kline . Exchange )
sql := fmt . Sprintf ( "INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)" +
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)" , tableName )
_ , err := s . DB . NamedExec ( sql , kline )
return err
}
// BatchInsert Note: all kline should be same exchange, or it will cause issue.
func ( s * BacktestService ) BatchInsert ( kline [ ] types . KLine ) error {
if len ( kline ) == 0 {
return nil
}
if len ( kline [ 0 ] . Exchange ) == 0 {
return errors . New ( "kline.Exchange field should not be empty" )
}
tableName := s . _targetKlineTable ( kline [ 0 ] . Exchange )
sql := fmt . Sprintf ( "INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)" +
" values (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); " , tableName )
2020-11-06 13:40:48 +00:00
_ , err := s . DB . NamedExec ( sql , kline )
return err
}
2021-12-14 11:34:43 +00:00
2021-12-14 16:59:28 +00:00
func ( s * BacktestService ) _deleteDuplicatedKLine ( k types . KLine ) error {
2021-12-14 11:34:43 +00:00
if len ( k . Exchange ) == 0 {
return errors . New ( "kline.Exchange field should not be empty" )
}
tableName := s . _targetKlineTable ( k . Exchange )
sql := fmt . Sprintf ( "delete from `%s` where gid = :gid " , tableName )
_ , err := s . DB . NamedExec ( sql , k )
return err
}
2021-12-14 16:59:28 +00:00
func ( s * BacktestService ) SyncExist ( ctx context . Context , exchange types . Exchange , symbol string ,
fromTime time . Time , endTime time . Time , interval types . Interval ) error {
klineC , errC := s . QueryKLinesCh ( fromTime , endTime , exchange , [ ] string { symbol } , [ ] types . Interval { interval } )
nowStartTime := fromTime
for k := range klineC {
if nowStartTime . Add ( interval . Duration ( ) ) . Unix ( ) < k . StartTime . Unix ( ) {
log . Infof ( "syncing %s interval %s syncing %s ~ %s " , symbol , interval , nowStartTime , k . EndTime )
s . Sync ( ctx , exchange , symbol , nowStartTime . Add ( interval . Duration ( ) ) , k . EndTime . Add ( - 1 * interval . Duration ( ) ) , interval )
}
nowStartTime = k . StartTime
}
if err := <- errC ; err != nil {
return err
}
return nil
}