Merge pull request #368 from tony1223/feature/355-update-sync

backtest : auto sync
This commit is contained in:
Yo-An Lin 2021-12-15 01:39:19 +08:00 committed by GitHub
commit 05323f211f
7 changed files with 311 additions and 101 deletions

View File

@ -194,71 +194,44 @@ var BacktestCmd = &cobra.Command{
log.Info("starting synchronization...") log.Info("starting synchronization...")
for _, symbol := range userConfig.Backtest.Symbols { for _, symbol := range userConfig.Backtest.Symbols {
firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, types.Interval1m)
exCustom, ok := sourceExchange.(types.CustomIntervalProvider)
var supportIntervals map[types.Interval]int
if ok {
supportIntervals = exCustom.SupportedInterval()
} else {
supportIntervals = types.SupportedIntervals
}
for interval := range supportIntervals {
//if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil {
// return err
//}
firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, interval)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to query backtest kline") return errors.Wrapf(err, "failed to query backtest kline")
} }
// if we don't have klines before the start time endpoint, the back-test will fail. // if we don't have klines before the start time endpoint, the back-test will fail.
// because the last price will be missing. // because the last price will be missing.
if firstKLine != nil && syncFromTime.Before(firstKLine.StartTime) { if firstKLine != nil {
return fmt.Errorf("the sync-from-time you gave %s is earlier than the previous sync-start-time %s. "+ if err := backtestService.SyncExist(ctx, sourceExchange, symbol, syncFromTime, time.Now(), interval); err != nil {
"re-syncing data from the earlier date before your first sync is not support,"+
"please clean up the kline table and restart a new sync",
syncFromTime,
firstKLine.EndTime)
}
if err := backtestService.Sync(ctx, sourceExchange, symbol, syncFromTime); err != nil {
return err return err
} }
} else {
if err := backtestService.Sync(ctx, sourceExchange, symbol, syncFromTime, time.Now(), interval); err != nil {
return err
}
}
}
} }
log.Info("synchronization done") log.Info("synchronization done")
if shouldVerify { if shouldVerify {
var corruptCnt = 0 err2, done := backtestService.Verify(userConfig.Backtest.Symbols, startTime, time.Now(), sourceExchange, verboseCnt)
for _, symbol := range userConfig.Backtest.Symbols { if done {
log.Infof("verifying backtesting data...") return err2
for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s kline data...", symbol, interval)
klineC, errC := backtestService.QueryKLinesCh(startTime, time.Now(), sourceExchange, []string{symbol}, []types.Interval{interval})
var emptyKLine types.KLine
var prevKLine types.KLine
for k := range klineC {
if verboseCnt > 1 {
fmt.Fprint(os.Stderr, ".")
}
if prevKLine != emptyKLine {
if prevKLine.StartTime.Add(interval.Duration()) != k.StartTime {
corruptCnt++
log.Errorf("found kline data corrupted at time: %s kline: %+v", k.StartTime, k)
log.Errorf("between %d and %d",
prevKLine.StartTime.Unix(),
k.StartTime.Unix())
}
}
prevKLine = k
}
if verboseCnt > 1 {
fmt.Fprintln(os.Stderr)
}
if err := <-errC; err != nil {
return err
}
}
}
log.Infof("backtest verification completed")
if corruptCnt > 0 {
log.Errorf("found %d corruptions", corruptCnt)
} else {
log.Infof("found %d corruptions", corruptCnt)
} }
} }

View File

@ -85,9 +85,12 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
defer close(errC) defer close(errC)
tryQueryKlineTimes := 0 tryQueryKlineTimes := 0
for startTime.Before(endTime) {
var currentTime = startTime
for currentTime.Before(endTime) {
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &startTime, StartTime: &currentTime,
EndTime: &endTime,
}) })
sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() }) sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() })
tryQueryKlineTimes++ tryQueryKlineTimes++
@ -105,11 +108,15 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
var batchKLines = make([]types.KLine, 0, BatchSize) var batchKLines = make([]types.KLine, 0, BatchSize)
for _, kline := range kLines { for _, kline := range kLines {
// ignore any kline before the given start time // ignore any kline before the given start time
if kline.StartTime.Before(startTime) { if currentTime.Unix() != startTime.Unix() && kline.StartTime.Unix() <= currentTime.Unix() {
continue continue
} }
if kline.StartTime.After(endTime) { if kline.StartTime.After(endTime) || kline.EndTime.After(endTime) {
if len(batchKLines) != 0 {
c <- batchKLines
batchKLines = nil
}
return return
} }
@ -117,15 +124,18 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
if len(batchKLines) == BatchSize { if len(batchKLines) == BatchSize {
c <- batchKLines c <- batchKLines
batchKLines = batchKLines[:0] batchKLines = nil
} }
//The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever. //The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
startTime = kline.EndTime // .Add(time.Millisecond) currentTime = kline.StartTime
tryQueryKlineTimes = 0 tryQueryKlineTimes = 0
} }
if len(batchKLines) != 0 {
c <- batchKLines c <- batchKLines
batchKLines = nil
}
if tryQueryKlineTimes > 10 { // it means loop 10 times if tryQueryKlineTimes > 10 { // it means loop 10 times
errC <- errors.Errorf("There's a dead loop in batch.go#Query , symbol: %s , interval: %s, startTime :%s ", symbol, interval, startTime.String()) errC <- errors.Errorf("There's a dead loop in batch.go#Query , symbol: %s , interval: %s, startTime :%s ", symbol, interval, startTime.String())

View File

@ -0,0 +1,58 @@
package binance
import (
"context"
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
"os"
"testing"
"time"
)
func Test_Batch(t *testing.T) {
key := os.Getenv("BINANCE_API_KEY")
secret := os.Getenv("BINANCE_API_SECRET")
if len(key) == 0 && len(secret) == 0 {
t.Skip("api key/secret are not configured")
}
e := New(key, secret)
//stream := NewStream(key, secret, subAccount, e)
batch := &batch2.KLineBatchQuery{Exchange: e}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// should use channel here
starttime, _ := time.Parse("2006-1-2 15:04", "2021-08-01 00:00")
endtime, _ := time.Parse("2006-1-2 15:04", "2021-12-14 00:19")
klineC, _ := batch.Query(ctx, "XRPUSDT", types.Interval1m, starttime, endtime)
var lastmintime time.Time
var lastmaxtime time.Time
for klines := range klineC {
assert.NotEmpty(t, klines)
var nowMinTime = klines[0].StartTime
var nowMaxTime = klines[0].StartTime
for _, item := range klines {
if nowMaxTime.Unix() < item.StartTime.Unix() {
nowMaxTime = item.StartTime
}
if nowMinTime.Unix() > item.StartTime.Unix() {
nowMinTime = item.StartTime
}
}
assert.True(t, nowMinTime.Unix() <= nowMaxTime.Unix())
assert.True(t, nowMinTime.Unix() > lastmaxtime.Unix())
assert.True(t, nowMaxTime.Unix() > lastmaxtime.Unix())
lastmintime = nowMinTime
lastmaxtime = nowMaxTime
assert.True(t, lastmintime.Unix() <= lastmaxtime.Unix())
}
}

View File

@ -227,7 +227,7 @@ func (e *Exchange) IsSupportedInterval(interval types.Interval) bool {
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
var klines []types.KLine var klines []types.KLine
var since, until, current time.Time var since, until, currentEnd time.Time
if options.StartTime != nil { if options.StartTime != nil {
since = *options.StartTime since = *options.StartTime
} }
@ -237,13 +237,17 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
until = time.Now() until = time.Now()
} }
current = until currentEnd = until
for { for {
endTime := current.Add(interval.Duration()) //the fetch result is from newest to oldest
endTime := currentEnd.Add(interval.Duration())
options.EndTime = &endTime options.EndTime = &endTime
lines, err := e._queryKLines(ctx, symbol, interval, options) lines, err := e._queryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &since,
EndTime: &currentEnd,
})
if err != nil { if err != nil {
return nil, err return nil, err
@ -255,21 +259,37 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
for _, line := range lines { for _, line := range lines {
if line.EndTime.Unix() < current.Unix() { if line.StartTime.Unix() < currentEnd.Unix() {
current = line.StartTime currentEnd = line.StartTime
} }
if line.EndTime.Unix() > since.Unix() { if line.StartTime.Unix() > since.Unix() {
klines = append(klines, line) klines = append(klines, line)
} }
} }
if since.IsZero() || current.Unix() == since.Unix() { if len(lines) == 1 && lines[0].StartTime.Unix() == currentEnd.Unix() {
break
}
outBound := currentEnd.Add(interval.Duration()*-1).Unix() <= since.Unix()
if since.IsZero() || currentEnd.Unix() == since.Unix() || outBound {
break
}
if options.Limit != 0 && options.Limit <= len(lines) {
break break
} }
} }
sort.Slice(klines, func(i, j int) bool { return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() }) sort.Slice(klines, func(i, j int) bool { return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() })
if options.Limit != 0 {
limitedItems := len(klines) - options.Limit
if limitedItems > 0 {
return klines[limitedItems:], nil
}
}
return klines, nil return klines, nil
} }
@ -294,7 +314,7 @@ func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval typ
return nil, err return nil, err
} }
resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), since, until) resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, 0, since, until)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -137,7 +137,7 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su
func (s *Stream) pollKLines(ctx context.Context) { func (s *Stream) pollKLines(ctx context.Context) {
// get current kline candle // get current kline candle
for _, sub := range s.klineSubscriptions { for _, sub := range s.klineSubscriptions {
klines := getLastKLine(s.exchange, ctx, sub.symbol, sub.interval) klines := getLastClosedKLine(s.exchange, ctx, sub.symbol, sub.interval)
if len(klines) > 0 { if len(klines) > 0 {
// handle mutiple klines, get the latest one // handle mutiple klines, get the latest one
@ -166,7 +166,7 @@ func (s *Stream) pollKLines(ctx context.Context) {
// not in the checking time slot, check next subscription // not in the checking time slot, check next subscription
continue continue
} }
klines := getLastKLine(s.exchange, ctx, sub.symbol, sub.interval) klines := getLastClosedKLine(s.exchange, ctx, sub.symbol, sub.interval)
if len(klines) > 0 { if len(klines) > 0 {
// handle mutiple klines, get the latest one // handle mutiple klines, get the latest one
@ -179,18 +179,19 @@ func (s *Stream) pollKLines(ctx context.Context) {
} }
} }
func getLastKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine { func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine {
// set since to more 30s ago to avoid getting no kline candle // set since to more 30s ago to avoid getting no kline candle
since := time.Now().Add(time.Duration(-1*(interval.Minutes()*60+30)) * time.Second) since := time.Now().Add(time.Duration(interval.Minutes()*-3) * time.Minute)
klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &since, StartTime: &since,
Limit: 2,
}) })
if err != nil { if err != nil {
logger.WithError(err).Errorf("failed to get kline data") logger.WithError(err).Errorf("failed to get kline data")
return klines return klines
} }
return klines return []types.KLine{klines[0]}
} }
func (s *Stream) Close() error { func (s *Stream) Close() error {

View File

@ -0,0 +1,89 @@
package ftx
import (
"context"
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
"os"
"testing"
"time"
)
func TestLastKline(t *testing.T) {
key := os.Getenv("FTX_API_KEY")
secret := os.Getenv("FTX_API_SECRET")
subAccount := os.Getenv("FTX_SUBACCOUNT")
if len(key) == 0 && len(secret) == 0 {
t.Skip("api key/secret are not configured")
}
e := NewExchange(key, secret, subAccount)
//stream := NewStream(key, secret, subAccount, e)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
klines := getLastClosedKLine(e, ctx, "XRPUSD", types.Interval1m)
assert.Equal(t, 1, len(klines))
}
func Test_Batch(t *testing.T) {
key := os.Getenv("FTX_API_KEY")
secret := os.Getenv("FTX_API_SECRET")
subAccount := os.Getenv("FTX_SUBACCOUNT")
if len(key) == 0 && len(secret) == 0 {
t.Skip("api key/secret are not configured")
}
e := NewExchange(key, secret, subAccount)
//stream := NewStream(key, secret, subAccount, e)
batch := &batch2.KLineBatchQuery{Exchange: e}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// should use channel here
starttime, err := time.Parse("2006-1-2 15:04", "2021-08-01 00:00")
assert.NoError(t, err)
endtime, err := time.Parse("2006-1-2 15:04", "2021-08-04 00:19")
assert.NoError(t, err)
klineC, errC := batch.Query(ctx, "XRPUSDT", types.Interval1d, starttime, endtime)
if err := <-errC; err != nil {
assert.NoError(t, err)
}
var lastmintime time.Time
var lastmaxtime time.Time
for klines := range klineC {
assert.NotEmpty(t, klines)
var nowMinTime = klines[0].StartTime
var nowMaxTime = klines[0].StartTime
for _, item := range klines {
if nowMaxTime.Unix() < item.StartTime.Unix() {
nowMaxTime = item.StartTime
}
if nowMinTime.Unix() > item.StartTime.Unix() {
nowMinTime = item.StartTime
}
}
if !lastmintime.IsZero() {
assert.True(t, nowMinTime.Unix() <= nowMaxTime.Unix())
assert.True(t, nowMinTime.Unix() > lastmaxtime.Unix())
assert.True(t, nowMaxTime.Unix() > lastmaxtime.Unix())
}
lastmintime = nowMinTime
lastmaxtime = nowMaxTime
assert.True(t, lastmintime.Unix() <= lastmaxtime.Unix())
}
}

View File

@ -3,6 +3,7 @@ package service
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -22,16 +23,6 @@ type BacktestService struct {
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
lastKLine, err := s.QueryKLine(exchange.Name(), symbol, interval, "DESC", 1)
if err != nil {
return err
}
if lastKLine != nil {
log.Infof("found the last %s kline data checkpoint %s", symbol, lastKLine.EndTime)
startTime = lastKLine.StartTime.Add(time.Minute)
}
batch := &batch2.KLineBatchQuery{Exchange: exchange} batch := &batch2.KLineBatchQuery{Exchange: exchange}
// should use channel here // should use channel here
@ -54,25 +45,62 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
return nil return nil
} }
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) (error, bool) {
endTime := time.Now() var corruptCnt = 0
for _, symbol := range symbols {
log.Infof("verifying backtesting data...")
exCustom, ok := exchange.(types.CustomIntervalProvider) for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s kline data...", symbol, interval)
var supportIntervals map[types.Interval]int klineC, errC := s.QueryKLinesCh(startTime, time.Now(), sourceExchange, []string{symbol}, []types.Interval{interval})
if ok { var emptyKLine types.KLine
supportIntervals = exCustom.SupportedInterval() var prevKLine types.KLine
for k := range klineC {
if verboseCnt > 1 {
fmt.Fprint(os.Stderr, ".")
}
if prevKLine != emptyKLine {
if prevKLine.StartTime.Unix() == k.StartTime.Unix() {
s._deleteDuplicatedKLine(k)
log.Errorf("found kline data duplicated at time: %s kline: %+v , deleted it", k.StartTime, k)
} else if prevKLine.StartTime.Add(interval.Duration()) != k.StartTime {
corruptCnt++
log.Errorf("found kline data corrupted at time: %s kline: %+v", k.StartTime, k)
log.Errorf("between %d and %d",
prevKLine.StartTime.Unix(),
k.StartTime.Unix())
}
}
prevKLine = k
}
if verboseCnt > 1 {
fmt.Fprintln(os.Stderr)
}
if err := <-errC; err != nil {
return err, true
}
}
}
log.Infof("backtest verification completed")
if corruptCnt > 0 {
log.Errorf("found %d corruptions", corruptCnt)
} else { } else {
supportIntervals = types.SupportedIntervals log.Infof("found %d corruptions", corruptCnt)
} }
return nil, false
}
for interval := range supportIntervals { func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string,
if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil { startTime time.Time, endTime time.Time, interval types.Interval) error {
return err
} return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
}
return nil
} }
func (s *BacktestService) QueryFirstKLine(ex types.ExchangeName, symbol string, interval types.Interval) (*types.KLine, error) { func (s *BacktestService) QueryFirstKLine(ex types.ExchangeName, symbol string, interval types.Interval) (*types.KLine, error) {
@ -290,3 +318,34 @@ func (s *BacktestService) BatchInsert(kline []types.KLine) error {
_, err := s.DB.NamedExec(sql, kline) _, err := s.DB.NamedExec(sql, kline)
return err return err
} }
func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error {
if len(k.Exchange) == 0 {
return errors.New("kline.Exchange field should not be empty")
}
tableName := s._targetKlineTable(k.Exchange)
sql := fmt.Sprintf("delete from `%s` where gid = :gid ", tableName)
_, err := s.DB.NamedExec(sql, k)
return err
}
func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange, symbol string,
fromTime time.Time, endTime time.Time, interval types.Interval) error {
klineC, errC := s.QueryKLinesCh(fromTime, endTime, exchange, []string{symbol}, []types.Interval{interval})
nowStartTime := fromTime
for k := range klineC {
if nowStartTime.Add(interval.Duration()).Unix() < k.StartTime.Unix() {
log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime)
s.Sync(ctx, exchange, symbol, nowStartTime.Add(interval.Duration()), k.EndTime.Add(-1*interval.Duration()), interval)
}
nowStartTime = k.StartTime
}
if err := <-errC; err != nil {
return err
}
return nil
}