service: rewrite backtest verify

This commit is contained in:
c9s 2022-06-06 06:02:38 +08:00
parent 7a3d4e306a
commit 41191c4db5
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"os"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -66,42 +65,23 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error { func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error {
var corruptCnt = 0 var corruptCnt = 0
for _, symbol := range symbols { for _, symbol := range symbols {
log.Infof("verifying backtesting data...")
for interval := range types.SupportedIntervals { for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s kline data...", symbol, interval) log.Infof("verifying %s %s backtesting data...", symbol, interval)
klineC, errC := s.QueryKLinesCh(startTime, endTime, sourceExchange, []string{symbol}, []types.Interval{interval}) timeRanges, err := s.FindMissingTimeRanges(context.Background(), sourceExchange, symbol, interval, startTime, endTime)
var emptyKLine types.KLine if err != nil {
var prevKLine types.KLine
for k := range klineC {
if verboseCnt > 1 {
fmt.Fprint(os.Stderr, ".")
}
if prevKLine != emptyKLine {
if prevKLine.StartTime.Unix() == k.StartTime.Unix() {
s._deleteDuplicatedKLine(k)
log.Errorf("found kline data duplicated at time: %s kline: %+v , deleted it", k.StartTime, k)
} else if prevKLine.StartTime.Time().Add(interval.Duration()).Unix() != k.StartTime.Time().Unix() {
corruptCnt++
log.Errorf("found kline data corrupted at time: %s kline: %+v", k.StartTime, k)
log.Errorf("between %d and %d",
prevKLine.StartTime.Unix(),
k.StartTime.Unix())
}
}
prevKLine = k
}
if verboseCnt > 1 {
fmt.Fprintln(os.Stderr)
}
if err := <-errC; err != nil {
return err return err
} }
if len(timeRanges) == 0 {
continue
}
log.Warnf("found missing time ranges:")
corruptCnt += len(timeRanges)
for _, timeRange := range timeRanges {
log.Warnf("symbol %s interval: %s %v", symbol, interval, timeRange)
}
} }
} }
@ -328,6 +308,10 @@ type TimeRange struct {
// create a time range slice []TimeRange // create a time range slice []TimeRange
// iterate the []TimeRange slice to sync data. // iterate the []TimeRange slice to sync data.
func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error { func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error {
// truncate time point to minute
since = since.Truncate(time.Minute)
until = until.Truncate(time.Minute)
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until) t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
return err return err
@ -350,6 +334,7 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy
// there are few cases: // there are few cases:
// t1 == since && t2 == until // t1 == since && t2 == until
// [since] ------- [t1] data [t2] ------ [until]
if since.Before(t1.Time()) { if since.Before(t1.Time()) {
// shift slice // shift slice
timeRanges = append([]TimeRange{ timeRanges = append([]TimeRange{