service/backtest: implement backfill and time range scanner

This commit is contained in:
c9s 2022-06-04 02:41:22 +08:00
parent 9083881442
commit bf4d8d345e
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
2 changed files with 19 additions and 6 deletions

View File

@ -22,7 +22,7 @@ type BacktestService struct {
} }
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) log.Infof("synchronizing %s klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
// TODO: use isFutures here // TODO: use isFutures here
_, _, isIsolated, isolatedSymbol := getExchangeAttributes(exchange) _, _, isIsolated, isolatedSymbol := getExchangeAttributes(exchange)
@ -34,7 +34,7 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
tasks := []SyncTask{ tasks := []SyncTask{
{ {
Type: types.KLine{}, Type: types.KLine{},
Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, 100), Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100),
Time: func(obj interface{}) time.Time { Time: func(obj interface{}) time.Time {
return obj.(types.KLine).StartTime.Time().UTC() return obj.(types.KLine).StartTime.Time().UTC()
}, },
@ -385,6 +385,7 @@ func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Ex
var timeRanges []TimeRange var timeRanges []TimeRange
var timePoints = make(map[int64]struct{}, 1000) // we can use this to find duplicates var timePoints = make(map[int64]struct{}, 1000) // we can use this to find duplicates
var lastTime time.Time var lastTime time.Time
var intervalDuration = interval.Duration()
for rows.Next() { for rows.Next() {
var tt types.Time var tt types.Time
if err := rows.Scan(&tt); err != nil { if err := rows.Scan(&tt); err != nil {
@ -392,9 +393,9 @@ func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Ex
} }
var t = time.Time(tt) var t = time.Time(tt)
if lastTime != (time.Time{}) && t.Sub(lastTime) > interval.Duration() { if lastTime != (time.Time{}) && t.Sub(lastTime) > intervalDuration {
timeRanges = append(timeRanges, TimeRange{ timeRanges = append(timeRanges, TimeRange{
Start: lastTime.Add(interval.Duration()), Start: lastTime,
End: t, End: t,
}) })
} }
@ -468,14 +469,14 @@ func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.I
} }
// TODO: add is_futures column since the klines data is different // TODO: add is_futures column since the klines data is different
func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit uint64) sq.SelectBuilder { func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime, endTime time.Time, limit uint64) sq.SelectBuilder {
tableName := targetKlineTable(ex) tableName := targetKlineTable(ex)
return sq.Select("*"). return sq.Select("*").
From(tableName). From(tableName).
Where(sq.And{ Where(sq.And{
sq.Eq{"symbol": symbol}, sq.Eq{"symbol": symbol},
sq.Eq{"`interval`": interval.String()}, sq.Eq{"`interval`": interval.String()},
sq.GtOrEq{"`start_time`": startTime}, sq.Expr("start_time BETWEEN ? AND ?", startTime, endTime),
}). }).
OrderBy("start_time DESC"). OrderBy("start_time DESC").
Limit(limit) Limit(limit)

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/exchange" "github.com/c9s/bbgo/pkg/exchange"
@ -54,5 +55,16 @@ func TestBacktestService(t *testing.T) {
assert.NotEmpty(t, timeRanges) assert.NotEmpty(t, timeRanges)
assert.Len(t, timeRanges, 1, "should find one missing time range") assert.Len(t, timeRanges, 1, "should find one missing time range")
t.Logf("found timeRanges: %+v", timeRanges) t.Logf("found timeRanges: %+v", timeRanges)
log.SetLevel(log.DebugLevel)
for _, timeRange := range timeRanges {
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
assert.NoError(t, err)
}
timeRanges, err = service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
assert.NoError(t, err)
assert.Empty(t, timeRanges, "after partial sync, missing time ranges should be back-filled")
} }
} }