backtest : auto sync missing range

This commit is contained in:
TonyQ 2021-12-15 00:59:28 +08:00
parent 20b03fe4a5
commit 16933555b8
5 changed files with 121 additions and 93 deletions

View File

@ -194,74 +194,44 @@ var BacktestCmd = &cobra.Command{
log.Info("starting synchronization...")
for _, symbol := range userConfig.Backtest.Symbols {
firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, types.Interval1m)
if err != nil {
return errors.Wrapf(err, "failed to query backtest kline")
exCustom, ok := sourceExchange.(types.CustomIntervalProvider)
var supportIntervals map[types.Interval]int
if ok {
supportIntervals = exCustom.SupportedInterval()
} else {
supportIntervals = types.SupportedIntervals
}
// if we don't have klines before the start time endpoint, the back-test will fail.
// because the last price will be missing.
if firstKLine != nil && syncFromTime.Before(firstKLine.StartTime) {
return fmt.Errorf("the sync-from-time you gave %s is earlier than the previous sync-start-time %s. "+
"re-syncing data from the earlier date before your first sync is not support,"+
"please clean up the kline table and restart a new sync",
syncFromTime,
firstKLine.EndTime)
}
for interval := range supportIntervals {
//if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil {
// return err
//}
firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, interval)
if err != nil {
return errors.Wrapf(err, "failed to query backtest kline")
}
if err := backtestService.Sync(ctx, sourceExchange, symbol, syncFromTime); err != nil {
return err
// if we don't have klines before the start time endpoint, the back-test will fail.
// because the last price will be missing.
if firstKLine != nil {
if err := backtestService.SyncExist(ctx, sourceExchange, symbol, syncFromTime, time.Now(), interval); err != nil {
return err
}
} else {
if err := backtestService.Sync(ctx, sourceExchange, symbol, syncFromTime, time.Now(), interval); err != nil {
return err
}
}
}
}
log.Info("synchronization done")
if shouldVerify {
var corruptCnt = 0
for _, symbol := range userConfig.Backtest.Symbols {
log.Infof("verifying backtesting data...")
for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s kline data...", symbol, interval)
klineC, errC := backtestService.QueryKLinesCh(startTime, time.Now(), sourceExchange, []string{symbol}, []types.Interval{interval})
var emptyKLine types.KLine
var prevKLine types.KLine
for k := range klineC {
if verboseCnt > 1 {
fmt.Fprint(os.Stderr, ".")
}
if prevKLine != emptyKLine {
if prevKLine.StartTime.Unix() == k.StartTime.Unix() {
backtestService.DeleteDuplicatedKLine(k)
log.Errorf("found kline data duplicated at time: %s kline: %+v , deleted it", k.StartTime, k)
} else if prevKLine.StartTime.Add(interval.Duration()) != k.StartTime {
corruptCnt++
log.Errorf("found kline data corrupted at time: %s kline: %+v", k.StartTime, k)
log.Errorf("between %d and %d",
prevKLine.StartTime.Unix(),
k.StartTime.Unix())
}
}
prevKLine = k
}
if verboseCnt > 1 {
fmt.Fprintln(os.Stderr)
}
if err := <-errC; err != nil {
return err
}
}
}
log.Infof("backtest verification completed")
if corruptCnt > 0 {
log.Errorf("found %d corruptions", corruptCnt)
} else {
log.Infof("found %d corruptions", corruptCnt)
err2, done := backtestService.Verify(userConfig.Backtest.Symbols, startTime, time.Now(), sourceExchange, verboseCnt)
if done {
return err2
}
}

View File

@ -86,10 +86,10 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
tryQueryKlineTimes := 0
var nowStartTime = startTime
for nowStartTime.Before(endTime) {
var currentTime = startTime
for currentTime.Before(endTime) {
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &nowStartTime,
StartTime: &currentTime,
EndTime: &endTime,
})
sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() })
@ -108,7 +108,7 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
var batchKLines = make([]types.KLine, 0, BatchSize)
for _, kline := range kLines {
// ignore any kline before the given start time
if nowStartTime.Unix() != startTime.Unix() && kline.StartTime.Unix() <= nowStartTime.Unix() {
if currentTime.Unix() != startTime.Unix() && kline.StartTime.Unix() <= currentTime.Unix() {
continue
}
@ -128,7 +128,7 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
}
//The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
nowStartTime = kline.StartTime
currentTime = kline.StartTime
tryQueryKlineTimes = 0
}

View File

@ -268,7 +268,11 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
}
}
outBound := currentEnd.Add(interval.Duration()*-1).Unix() < since.Unix()
if len(lines) == 1 && lines[0].StartTime.Unix() == currentEnd.Unix() {
break
}
outBound := currentEnd.Add(interval.Duration()*-1).Unix() <= since.Unix()
if since.IsZero() || currentEnd.Unix() == since.Unix() || outBound {
break
}

View File

@ -45,9 +45,16 @@ func Test_Batch(t *testing.T) {
defer cancel()
// should use channel here
starttime, _ := time.Parse("2006-1-2 15:04", "2021-08-01 00:00")
endtime, _ := time.Parse("2006-1-2 15:04", "2021-08-04 00:19")
klineC, _ := batch.Query(ctx, "XRPUSDT", types.Interval1d, starttime, endtime)
starttime, err := time.Parse("2006-1-2 15:04", "2021-08-01 00:00")
assert.NoError(t, err)
endtime, err := time.Parse("2006-1-2 15:04", "2021-08-04 00:19")
assert.NoError(t, err)
klineC, errC := batch.Query(ctx, "XRPUSDT", types.Interval1d, starttime, endtime)
if err := <-errC; err != nil {
assert.NoError(t, err)
}
var lastmintime time.Time
var lastmaxtime time.Time

View File

@ -3,6 +3,7 @@ package service
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"time"
@ -22,16 +23,6 @@ type BacktestService struct {
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
lastKLine, err := s.QueryKLine(exchange.Name(), symbol, interval, "DESC", 1)
if err != nil {
return err
}
if lastKLine != nil {
log.Infof("found the last %s kline data checkpoint %s", symbol, lastKLine.EndTime)
startTime = lastKLine.StartTime.Add(time.Minute)
}
batch := &batch2.KLineBatchQuery{Exchange: exchange}
// should use channel here
@ -54,25 +45,62 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
return nil
}
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
endTime := time.Now()
func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) (error, bool) {
var corruptCnt = 0
for _, symbol := range symbols {
log.Infof("verifying backtesting data...")
exCustom, ok := exchange.(types.CustomIntervalProvider)
for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s kline data...", symbol, interval)
var supportIntervals map[types.Interval]int
if ok {
supportIntervals = exCustom.SupportedInterval()
} else {
supportIntervals = types.SupportedIntervals
}
klineC, errC := s.QueryKLinesCh(startTime, time.Now(), sourceExchange, []string{symbol}, []types.Interval{interval})
var emptyKLine types.KLine
var prevKLine types.KLine
for k := range klineC {
if verboseCnt > 1 {
fmt.Fprint(os.Stderr, ".")
}
for interval := range supportIntervals {
if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil {
return err
if prevKLine != emptyKLine {
if prevKLine.StartTime.Unix() == k.StartTime.Unix() {
s._deleteDuplicatedKLine(k)
log.Errorf("found kline data duplicated at time: %s kline: %+v , deleted it", k.StartTime, k)
} else if prevKLine.StartTime.Add(interval.Duration()) != k.StartTime {
corruptCnt++
log.Errorf("found kline data corrupted at time: %s kline: %+v", k.StartTime, k)
log.Errorf("between %d and %d",
prevKLine.StartTime.Unix(),
k.StartTime.Unix())
}
}
prevKLine = k
}
if verboseCnt > 1 {
fmt.Fprintln(os.Stderr)
}
if err := <-errC; err != nil {
return err, true
}
}
}
return nil
log.Infof("backtest verification completed")
if corruptCnt > 0 {
log.Errorf("found %d corruptions", corruptCnt)
} else {
log.Infof("found %d corruptions", corruptCnt)
}
return nil, false
}
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string,
startTime time.Time, endTime time.Time, interval types.Interval) error {
return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
}
func (s *BacktestService) QueryFirstKLine(ex types.ExchangeName, symbol string, interval types.Interval) (*types.KLine, error) {
@ -291,7 +319,7 @@ func (s *BacktestService) BatchInsert(kline []types.KLine) error {
return err
}
func (s *BacktestService) DeleteDuplicatedKLine(k types.KLine) error {
func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error {
if len(k.Exchange) == 0 {
return errors.New("kline.Exchange field should not be empty")
@ -302,3 +330,22 @@ func (s *BacktestService) DeleteDuplicatedKLine(k types.KLine) error {
_, err := s.DB.NamedExec(sql, k)
return err
}
func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange, symbol string,
fromTime time.Time, endTime time.Time, interval types.Interval) error {
klineC, errC := s.QueryKLinesCh(fromTime, endTime, exchange, []string{symbol}, []types.Interval{interval})
nowStartTime := fromTime
for k := range klineC {
if nowStartTime.Add(interval.Duration()).Unix() < k.StartTime.Unix() {
log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime)
s.Sync(ctx, exchange, symbol, nowStartTime.Add(interval.Duration()), k.EndTime.Add(-1*interval.Duration()), interval)
}
nowStartTime = k.StartTime
}
if err := <-errC; err != nil {
return err
}
return nil
}