bbgo_origin/pkg/service/backtest.go

216 lines
6.0 KiB
Go
Raw Normal View History

package service
import (
"context"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
type BacktestService struct {
DB *sqlx.DB
}
2021-03-14 03:04:56 +00:00
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
2021-03-14 03:04:56 +00:00
lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval)
if err != nil {
return err
}
2021-03-14 03:04:56 +00:00
if lastKLine != nil {
log.Infof("found last checkpoint %s", lastKLine.EndTime)
startTime = lastKLine.StartTime.Add(time.Minute)
}
2020-11-06 16:49:17 +00:00
2021-03-14 03:04:56 +00:00
batch := &batch2.KLineBatchQuery{Exchange: exchange}
2020-11-06 16:49:17 +00:00
2021-03-14 03:04:56 +00:00
// should use channel here
klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime)
// var previousKLine types.KLine
for k := range klineC {
if err := s.Insert(k); err != nil {
return err
}
2021-03-14 03:04:56 +00:00
}
if err := <-errC; err != nil {
return err
}
return nil
}
2020-11-06 16:49:17 +00:00
2021-03-14 03:04:56 +00:00
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
endTime := time.Now()
for interval := range types.SupportedIntervals {
2021-05-07 16:57:25 +00:00
if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil {
2020-11-06 16:49:17 +00:00
return err
}
}
return nil
}
// QueryLast queries the last order from the database
2020-11-06 16:49:17 +00:00
func (s *BacktestService) QueryLast(ex types.ExchangeName, symbol string, interval types.Interval) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
// make the SQL syntax IDE friendly, so that it can analyze it.
2020-11-06 16:49:17 +00:00
sql := "SELECT * FROM binance_klines WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT 1"
sql = strings.ReplaceAll(sql, "binance_klines", ex.String()+"_klines")
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex,
"interval": interval,
"symbol": symbol,
})
if err != nil {
return nil, errors.Wrap(err, "query last order error")
}
if rows.Err() != nil {
return nil, rows.Err()
}
defer rows.Close()
if rows.Next() {
var kline types.KLine
err = rows.StructScan(&kline)
return &kline, err
}
return nil, rows.Err()
}
2021-05-07 16:57:25 +00:00
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time ASC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", exchange.String()+"_klines")
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
2021-05-07 16:57:25 +00:00
"start_time": startTime,
"limit": limit,
"symbol": symbol,
"interval": interval,
})
if err != nil {
return nil, err
}
return s.scanRows(rows)
}
2021-05-07 16:57:25 +00:00
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", exchange.String()+"_klines")
sql = "SELECT t.* FROM (" + sql + ") AS t ORDER BY t.end_time ASC"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
2021-05-07 16:57:25 +00:00
"limit": limit,
"end_time": endTime,
"symbol": symbol,
"interval": interval,
})
if err != nil {
return nil, err
}
return s.scanRows(rows)
}
2020-11-10 11:06:20 +00:00
func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval) (chan types.KLine, chan error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) ORDER BY end_time ASC"
2020-11-06 16:49:17 +00:00
sql = strings.ReplaceAll(sql, "binance_klines", exchange.Name().String()+"_klines")
2020-11-06 16:49:17 +00:00
sql, args, err := sqlx.Named(sql, map[string]interface{}{
"since": since,
2020-11-10 11:06:20 +00:00
"until": until,
2020-11-07 12:11:07 +00:00
"symbols": symbols,
2020-11-06 16:49:17 +00:00
"intervals": types.IntervalSlice(intervals),
})
2020-11-10 11:06:20 +00:00
2020-11-06 16:49:17 +00:00
sql, args, err = sqlx.In(sql, args...)
sql = s.DB.Rebind(sql)
rows, err := s.DB.Queryx(sql, args...)
if err != nil {
2020-11-06 16:49:17 +00:00
log.WithError(err).Error("query error")
errC := make(chan error, 1)
// avoid blocking
go func() {
errC <- err
close(errC)
}()
return nil, errC
}
2020-11-06 16:49:17 +00:00
return s.scanRowsCh(rows)
}
// scanRowsCh scan rows into channel
2020-11-06 16:49:17 +00:00
func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) (chan types.KLine, chan error) {
2020-11-07 12:11:07 +00:00
ch := make(chan types.KLine, 500)
2020-11-06 16:49:17 +00:00
errC := make(chan error, 1)
go func() {
2020-11-06 16:49:17 +00:00
defer close(errC)
2020-11-07 12:11:07 +00:00
defer close(ch)
defer rows.Close()
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
}
ch <- kline
}
if err := rows.Err(); err != nil {
2020-11-06 16:49:17 +00:00
errC <- err
return
}
2020-11-07 12:11:07 +00:00
}()
2020-11-06 16:49:17 +00:00
return ch, errC
}
func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err error) {
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
return nil, err
}
klines = append(klines, kline)
}
return klines, rows.Err()
}
func (s *BacktestService) Insert(kline types.KLine) error {
if len(kline.Exchange) == 0 {
return errors.New("kline.Exchange field should not be empty")
}
2021-05-31 17:08:02 +00:00
sql := "INSERT INTO `binance_klines` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)" +
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)"
sql = strings.ReplaceAll(sql, "binance_klines", kline.Exchange.String()+"_klines")
_, err := s.DB.NamedExec(sql, kline)
return err
}