2020-11-06 13:40:48 +00:00
package service
import (
"context"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2021-02-18 09:37:49 +00:00
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
2020-11-06 13:40:48 +00:00
"github.com/c9s/bbgo/pkg/types"
)
type BacktestService struct {
DB * sqlx . DB
}
2021-03-14 03:04:56 +00:00
func ( s * BacktestService ) SyncKLineByInterval ( ctx context . Context , exchange types . Exchange , symbol string , interval types . Interval , startTime , endTime time . Time ) error {
log . Infof ( "synchronizing lastKLine for interval %s from exchange %s" , interval , exchange . Name ( ) )
2020-11-06 13:40:48 +00:00
2021-03-14 03:04:56 +00:00
lastKLine , err := s . QueryLast ( exchange . Name ( ) , symbol , interval )
if err != nil {
return err
}
2020-11-06 13:40:48 +00:00
2021-03-14 03:04:56 +00:00
if lastKLine != nil {
log . Infof ( "found last checkpoint %s" , lastKLine . EndTime )
startTime = lastKLine . StartTime . Add ( time . Minute )
}
2020-11-06 16:49:17 +00:00
2021-03-14 03:04:56 +00:00
batch := & batch2 . KLineBatchQuery { Exchange : exchange }
2020-11-06 16:49:17 +00:00
2021-03-14 03:04:56 +00:00
// should use channel here
klineC , errC := batch . Query ( ctx , symbol , interval , startTime , endTime )
// var previousKLine types.KLine
for k := range klineC {
if err := s . Insert ( k ) ; err != nil {
return err
2020-11-06 13:40:48 +00:00
}
2021-03-14 03:04:56 +00:00
}
if err := <- errC ; err != nil {
return err
}
return nil
}
2020-11-06 16:49:17 +00:00
2021-03-14 03:04:56 +00:00
func ( s * BacktestService ) Sync ( ctx context . Context , exchange types . Exchange , symbol string , startTime time . Time ) error {
endTime := time . Now ( )
for interval := range types . SupportedIntervals {
2021-05-07 16:57:25 +00:00
if err := s . SyncKLineByInterval ( ctx , exchange , symbol , interval , startTime , endTime ) ; err != nil {
2020-11-06 16:49:17 +00:00
return err
}
2020-11-06 13:40:48 +00:00
}
return nil
}
// QueryLast queries the last order from the database
2020-11-06 16:49:17 +00:00
func ( s * BacktestService ) QueryLast ( ex types . ExchangeName , symbol string , interval types . Interval ) ( * types . KLine , error ) {
2020-11-06 13:40:48 +00:00
log . Infof ( "querying last kline exchange = %s AND symbol = %s AND interval = %s" , ex , symbol , interval )
// make the SQL syntax IDE friendly, so that it can analyze it.
2020-11-06 16:49:17 +00:00
sql := "SELECT * FROM binance_klines WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT 1"
2020-11-06 13:40:48 +00:00
sql = strings . ReplaceAll ( sql , "binance_klines" , ex . String ( ) + "_klines" )
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
"exchange" : ex ,
"interval" : interval ,
"symbol" : symbol ,
} )
if err != nil {
return nil , errors . Wrap ( err , "query last order error" )
}
if rows . Err ( ) != nil {
return nil , rows . Err ( )
}
defer rows . Close ( )
if rows . Next ( ) {
var kline types . KLine
err = rows . StructScan ( & kline )
return & kline , err
}
return nil , rows . Err ( )
}
2021-05-07 16:57:25 +00:00
func ( s * BacktestService ) QueryKLinesForward ( exchange types . ExchangeName , symbol string , interval types . Interval , startTime time . Time , limit int ) ( [ ] types . KLine , error ) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time ASC LIMIT :limit"
2020-11-08 04:13:34 +00:00
sql = strings . ReplaceAll ( sql , "binance_klines" , exchange . String ( ) + "_klines" )
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-05-07 16:57:25 +00:00
"start_time" : startTime ,
"limit" : limit ,
2020-11-08 04:13:34 +00:00
"symbol" : symbol ,
"interval" : interval ,
} )
if err != nil {
return nil , err
}
return s . scanRows ( rows )
}
2021-05-07 16:57:25 +00:00
func ( s * BacktestService ) QueryKLinesBackward ( exchange types . ExchangeName , symbol string , interval types . Interval , endTime time . Time , limit int ) ( [ ] types . KLine , error ) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time ASC LIMIT :limit"
2020-11-08 04:13:34 +00:00
sql = strings . ReplaceAll ( sql , "binance_klines" , exchange . String ( ) + "_klines" )
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-05-07 16:57:25 +00:00
"limit" : limit ,
"end_time" : endTime ,
2020-11-08 04:13:34 +00:00
"symbol" : symbol ,
"interval" : interval ,
} )
if err != nil {
return nil , err
}
return s . scanRows ( rows )
}
2020-11-10 11:06:20 +00:00
func ( s * BacktestService ) QueryKLinesCh ( since , until time . Time , exchange types . Exchange , symbols [ ] string , intervals [ ] types . Interval ) ( chan types . KLine , chan error ) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) ORDER BY end_time ASC"
2020-11-06 16:49:17 +00:00
sql = strings . ReplaceAll ( sql , "binance_klines" , exchange . Name ( ) . String ( ) + "_klines" )
2020-11-06 13:40:48 +00:00
2020-11-06 16:49:17 +00:00
sql , args , err := sqlx . Named ( sql , map [ string ] interface { } {
2020-11-06 13:40:48 +00:00
"since" : since ,
2020-11-10 11:06:20 +00:00
"until" : until ,
2020-11-07 12:11:07 +00:00
"symbols" : symbols ,
2020-11-06 16:49:17 +00:00
"intervals" : types . IntervalSlice ( intervals ) ,
2020-11-06 13:40:48 +00:00
} )
2020-11-10 11:06:20 +00:00
2020-11-06 16:49:17 +00:00
sql , args , err = sqlx . In ( sql , args ... )
sql = s . DB . Rebind ( sql )
rows , err := s . DB . Queryx ( sql , args ... )
2020-11-06 13:40:48 +00:00
if err != nil {
2020-11-06 16:49:17 +00:00
log . WithError ( err ) . Error ( "query error" )
errC := make ( chan error , 1 )
// avoid blocking
go func ( ) {
errC <- err
close ( errC )
} ( )
return nil , errC
2020-11-06 13:40:48 +00:00
}
2020-11-06 16:49:17 +00:00
return s . scanRowsCh ( rows )
2020-11-06 13:40:48 +00:00
}
// scanRowsCh scan rows into channel
2020-11-06 16:49:17 +00:00
func ( s * BacktestService ) scanRowsCh ( rows * sqlx . Rows ) ( chan types . KLine , chan error ) {
2020-11-07 12:11:07 +00:00
ch := make ( chan types . KLine , 500 )
2020-11-06 16:49:17 +00:00
errC := make ( chan error , 1 )
2020-11-06 13:40:48 +00:00
go func ( ) {
2020-11-06 16:49:17 +00:00
defer close ( errC )
2020-11-07 12:11:07 +00:00
defer close ( ch )
2020-11-06 13:40:48 +00:00
defer rows . Close ( )
for rows . Next ( ) {
var kline types . KLine
if err := rows . StructScan ( & kline ) ; err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
2020-11-06 13:40:48 +00:00
}
ch <- kline
}
if err := rows . Err ( ) ; err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
2020-11-06 13:40:48 +00:00
}
2020-11-07 12:11:07 +00:00
2020-11-06 13:40:48 +00:00
} ( )
2020-11-06 16:49:17 +00:00
return ch , errC
2020-11-06 13:40:48 +00:00
}
func ( s * BacktestService ) scanRows ( rows * sqlx . Rows ) ( klines [ ] types . KLine , err error ) {
for rows . Next ( ) {
var kline types . KLine
if err := rows . StructScan ( & kline ) ; err != nil {
return nil , err
}
klines = append ( klines , kline )
}
return klines , rows . Err ( )
}
func ( s * BacktestService ) Insert ( kline types . KLine ) error {
if len ( kline . Exchange ) == 0 {
return errors . New ( "kline.Exchange field should not be empty" )
}
2020-11-08 04:13:34 +00:00
sql := "INSERT INTO `binance_klines` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`)" +
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume)"
2021-05-28 12:51:10 +00:00
sql = strings . ReplaceAll ( sql , "binance_klines" , kline . Exchange . String ( ) + "_klines" )
2020-11-06 13:40:48 +00:00
_ , err := s . DB . NamedExec ( sql , kline )
return err
}