diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index 89f7f1dd7..133ceffbd 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -194,71 +194,44 @@ var BacktestCmd = &cobra.Command{ log.Info("starting synchronization...") for _, symbol := range userConfig.Backtest.Symbols { - firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, types.Interval1m) - if err != nil { - return errors.Wrapf(err, "failed to query backtest kline") + + exCustom, ok := sourceExchange.(types.CustomIntervalProvider) + + var supportIntervals map[types.Interval]int + if ok { + supportIntervals = exCustom.SupportedInterval() + } else { + supportIntervals = types.SupportedIntervals } - // if we don't have klines before the start time endpoint, the back-test will fail. - // because the last price will be missing. - if firstKLine != nil && syncFromTime.Before(firstKLine.StartTime) { - return fmt.Errorf("the sync-from-time you gave %s is earlier than the previous sync-start-time %s. "+ - "re-syncing data from the earlier date before your first sync is not support,"+ - "please clean up the kline table and restart a new sync", - syncFromTime, - firstKLine.EndTime) - } + for interval := range supportIntervals { + //if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil { + // return err + //} + firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, interval) + if err != nil { + return errors.Wrapf(err, "failed to query backtest kline") + } - if err := backtestService.Sync(ctx, sourceExchange, symbol, syncFromTime); err != nil { - return err + // if we don't have klines before the start time endpoint, the back-test will fail. + // because the last price will be missing. + if firstKLine != nil { + if err := backtestService.SyncExist(ctx, sourceExchange, symbol, syncFromTime, time.Now(), interval); err != nil { + return err + } + } else { + if err := backtestService.Sync(ctx, sourceExchange, symbol, syncFromTime, time.Now(), interval); err != nil { + return err + } + } } } log.Info("synchronization done") if shouldVerify { - var corruptCnt = 0 - for _, symbol := range userConfig.Backtest.Symbols { - log.Infof("verifying backtesting data...") - - for interval := range types.SupportedIntervals { - log.Infof("verifying %s %s kline data...", symbol, interval) - - klineC, errC := backtestService.QueryKLinesCh(startTime, time.Now(), sourceExchange, []string{symbol}, []types.Interval{interval}) - var emptyKLine types.KLine - var prevKLine types.KLine - for k := range klineC { - if verboseCnt > 1 { - fmt.Fprint(os.Stderr, ".") - } - - if prevKLine != emptyKLine { - if prevKLine.StartTime.Add(interval.Duration()) != k.StartTime { - corruptCnt++ - log.Errorf("found kline data corrupted at time: %s kline: %+v", k.StartTime, k) - log.Errorf("between %d and %d", - prevKLine.StartTime.Unix(), - k.StartTime.Unix()) - } - } - - prevKLine = k - } - - if verboseCnt > 1 { - fmt.Fprintln(os.Stderr) - } - - if err := <-errC; err != nil { - return err - } - } - } - - log.Infof("backtest verification completed") - if corruptCnt > 0 { - log.Errorf("found %d corruptions", corruptCnt) - } else { - log.Infof("found %d corruptions", corruptCnt) + err2, done := backtestService.Verify(userConfig.Backtest.Symbols, startTime, time.Now(), sourceExchange, verboseCnt) + if done { + return err2 } } diff --git a/pkg/exchange/batch/batch.go b/pkg/exchange/batch/batch.go index 5e8fc293e..93d34a036 100644 --- a/pkg/exchange/batch/batch.go +++ b/pkg/exchange/batch/batch.go @@ -85,9 +85,12 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type defer close(errC) tryQueryKlineTimes := 0 - for startTime.Before(endTime) { + + var currentTime = startTime + for currentTime.Before(endTime) { kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ - StartTime: &startTime, + StartTime: ¤tTime, + EndTime: &endTime, }) sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() }) tryQueryKlineTimes++ @@ -105,11 +108,15 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type var batchKLines = make([]types.KLine, 0, BatchSize) for _, kline := range kLines { // ignore any kline before the given start time - if kline.StartTime.Before(startTime) { + if currentTime.Unix() != startTime.Unix() && kline.StartTime.Unix() <= currentTime.Unix() { continue } - if kline.StartTime.After(endTime) { + if kline.StartTime.After(endTime) || kline.EndTime.After(endTime) { + if len(batchKLines) != 0 { + c <- batchKLines + batchKLines = nil + } return } @@ -117,15 +124,18 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type if len(batchKLines) == BatchSize { c <- batchKLines - batchKLines = batchKLines[:0] + batchKLines = nil } //The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever. - startTime = kline.EndTime // .Add(time.Millisecond) + currentTime = kline.StartTime tryQueryKlineTimes = 0 } - c <- batchKLines + if len(batchKLines) != 0 { + c <- batchKLines + batchKLines = nil + } if tryQueryKlineTimes > 10 { // it means loop 10 times errC <- errors.Errorf("There's a dead loop in batch.go#Query , symbol: %s , interval: %s, startTime :%s ", symbol, interval, startTime.String()) diff --git a/pkg/exchange/binance/stream_test.go b/pkg/exchange/binance/stream_test.go new file mode 100644 index 000000000..3a82f3d3b --- /dev/null +++ b/pkg/exchange/binance/stream_test.go @@ -0,0 +1,58 @@ +package binance + +import ( + "context" + batch2 "github.com/c9s/bbgo/pkg/exchange/batch" + "github.com/c9s/bbgo/pkg/types" + "github.com/stretchr/testify/assert" + "os" + "testing" + "time" +) + +func Test_Batch(t *testing.T) { + key := os.Getenv("BINANCE_API_KEY") + secret := os.Getenv("BINANCE_API_SECRET") + if len(key) == 0 && len(secret) == 0 { + t.Skip("api key/secret are not configured") + } + + e := New(key, secret) + //stream := NewStream(key, secret, subAccount, e) + + batch := &batch2.KLineBatchQuery{Exchange: e} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // should use channel here + + starttime, _ := time.Parse("2006-1-2 15:04", "2021-08-01 00:00") + endtime, _ := time.Parse("2006-1-2 15:04", "2021-12-14 00:19") + klineC, _ := batch.Query(ctx, "XRPUSDT", types.Interval1m, starttime, endtime) + + var lastmintime time.Time + var lastmaxtime time.Time + for klines := range klineC { + assert.NotEmpty(t, klines) + + var nowMinTime = klines[0].StartTime + var nowMaxTime = klines[0].StartTime + for _, item := range klines { + if nowMaxTime.Unix() < item.StartTime.Unix() { + nowMaxTime = item.StartTime + } + if nowMinTime.Unix() > item.StartTime.Unix() { + nowMinTime = item.StartTime + } + } + assert.True(t, nowMinTime.Unix() <= nowMaxTime.Unix()) + assert.True(t, nowMinTime.Unix() > lastmaxtime.Unix()) + assert.True(t, nowMaxTime.Unix() > lastmaxtime.Unix()) + + lastmintime = nowMinTime + lastmaxtime = nowMaxTime + assert.True(t, lastmintime.Unix() <= lastmaxtime.Unix()) + + } + +} diff --git a/pkg/exchange/ftx/exchange.go b/pkg/exchange/ftx/exchange.go index f1e0cb8b9..fc578332b 100644 --- a/pkg/exchange/ftx/exchange.go +++ b/pkg/exchange/ftx/exchange.go @@ -227,7 +227,7 @@ func (e *Exchange) IsSupportedInterval(interval types.Interval) bool { func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { var klines []types.KLine - var since, until, current time.Time + var since, until, currentEnd time.Time if options.StartTime != nil { since = *options.StartTime } @@ -237,13 +237,17 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type until = time.Now() } - current = until + currentEnd = until for { - endTime := current.Add(interval.Duration()) + //the fetch result is from newest to oldest + endTime := currentEnd.Add(interval.Duration()) options.EndTime = &endTime - lines, err := e._queryKLines(ctx, symbol, interval, options) + lines, err := e._queryKLines(ctx, symbol, interval, types.KLineQueryOptions{ + StartTime: &since, + EndTime: ¤tEnd, + }) if err != nil { return nil, err @@ -255,21 +259,37 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type for _, line := range lines { - if line.EndTime.Unix() < current.Unix() { - current = line.StartTime + if line.StartTime.Unix() < currentEnd.Unix() { + currentEnd = line.StartTime } - if line.EndTime.Unix() > since.Unix() { + if line.StartTime.Unix() > since.Unix() { klines = append(klines, line) } } - if since.IsZero() || current.Unix() == since.Unix() { + if len(lines) == 1 && lines[0].StartTime.Unix() == currentEnd.Unix() { + break + } + + outBound := currentEnd.Add(interval.Duration()*-1).Unix() <= since.Unix() + if since.IsZero() || currentEnd.Unix() == since.Unix() || outBound { + break + } + + if options.Limit != 0 && options.Limit <= len(lines) { break } } sort.Slice(klines, func(i, j int) bool { return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() }) + if options.Limit != 0 { + limitedItems := len(klines) - options.Limit + if limitedItems > 0 { + return klines[limitedItems:], nil + } + } + return klines, nil } @@ -294,7 +314,7 @@ func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval typ return nil, err } - resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), since, until) + resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, 0, since, until) if err != nil { return nil, err } diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index f372a5fd2..dec0f5d33 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -137,7 +137,7 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su func (s *Stream) pollKLines(ctx context.Context) { // get current kline candle for _, sub := range s.klineSubscriptions { - klines := getLastKLine(s.exchange, ctx, sub.symbol, sub.interval) + klines := getLastClosedKLine(s.exchange, ctx, sub.symbol, sub.interval) if len(klines) > 0 { // handle mutiple klines, get the latest one @@ -166,7 +166,7 @@ func (s *Stream) pollKLines(ctx context.Context) { // not in the checking time slot, check next subscription continue } - klines := getLastKLine(s.exchange, ctx, sub.symbol, sub.interval) + klines := getLastClosedKLine(s.exchange, ctx, sub.symbol, sub.interval) if len(klines) > 0 { // handle mutiple klines, get the latest one @@ -179,18 +179,19 @@ func (s *Stream) pollKLines(ctx context.Context) { } } -func getLastKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine { +func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine { // set since to more 30s ago to avoid getting no kline candle - since := time.Now().Add(time.Duration(-1*(interval.Minutes()*60+30)) * time.Second) + since := time.Now().Add(time.Duration(interval.Minutes()*-3) * time.Minute) klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ StartTime: &since, + Limit: 2, }) if err != nil { logger.WithError(err).Errorf("failed to get kline data") return klines } - return klines + return []types.KLine{klines[0]} } func (s *Stream) Close() error { diff --git a/pkg/exchange/ftx/stream_test.go b/pkg/exchange/ftx/stream_test.go new file mode 100644 index 000000000..4a4949aaf --- /dev/null +++ b/pkg/exchange/ftx/stream_test.go @@ -0,0 +1,89 @@ +package ftx + +import ( + "context" + batch2 "github.com/c9s/bbgo/pkg/exchange/batch" + "github.com/c9s/bbgo/pkg/types" + "github.com/stretchr/testify/assert" + "os" + "testing" + "time" +) + +func TestLastKline(t *testing.T) { + key := os.Getenv("FTX_API_KEY") + secret := os.Getenv("FTX_API_SECRET") + subAccount := os.Getenv("FTX_SUBACCOUNT") + if len(key) == 0 && len(secret) == 0 { + t.Skip("api key/secret are not configured") + } + + e := NewExchange(key, secret, subAccount) + //stream := NewStream(key, secret, subAccount, e) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + klines := getLastClosedKLine(e, ctx, "XRPUSD", types.Interval1m) + assert.Equal(t, 1, len(klines)) + +} + +func Test_Batch(t *testing.T) { + key := os.Getenv("FTX_API_KEY") + secret := os.Getenv("FTX_API_SECRET") + subAccount := os.Getenv("FTX_SUBACCOUNT") + if len(key) == 0 && len(secret) == 0 { + t.Skip("api key/secret are not configured") + } + + e := NewExchange(key, secret, subAccount) + //stream := NewStream(key, secret, subAccount, e) + + batch := &batch2.KLineBatchQuery{Exchange: e} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // should use channel here + + starttime, err := time.Parse("2006-1-2 15:04", "2021-08-01 00:00") + assert.NoError(t, err) + endtime, err := time.Parse("2006-1-2 15:04", "2021-08-04 00:19") + assert.NoError(t, err) + + klineC, errC := batch.Query(ctx, "XRPUSDT", types.Interval1d, starttime, endtime) + + if err := <-errC; err != nil { + assert.NoError(t, err) + } + + var lastmintime time.Time + var lastmaxtime time.Time + + for klines := range klineC { + assert.NotEmpty(t, klines) + + var nowMinTime = klines[0].StartTime + var nowMaxTime = klines[0].StartTime + for _, item := range klines { + + if nowMaxTime.Unix() < item.StartTime.Unix() { + nowMaxTime = item.StartTime + } + if nowMinTime.Unix() > item.StartTime.Unix() { + nowMinTime = item.StartTime + } + + } + + if !lastmintime.IsZero() { + assert.True(t, nowMinTime.Unix() <= nowMaxTime.Unix()) + assert.True(t, nowMinTime.Unix() > lastmaxtime.Unix()) + assert.True(t, nowMaxTime.Unix() > lastmaxtime.Unix()) + } + lastmintime = nowMinTime + lastmaxtime = nowMaxTime + assert.True(t, lastmintime.Unix() <= lastmaxtime.Unix()) + + } + +} diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index 7ebfa919f..890b1fe72 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "os" "strconv" "strings" "time" @@ -22,16 +23,6 @@ type BacktestService struct { func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) - lastKLine, err := s.QueryKLine(exchange.Name(), symbol, interval, "DESC", 1) - if err != nil { - return err - } - - if lastKLine != nil { - log.Infof("found the last %s kline data checkpoint %s", symbol, lastKLine.EndTime) - startTime = lastKLine.StartTime.Add(time.Minute) - } - batch := &batch2.KLineBatchQuery{Exchange: exchange} // should use channel here @@ -54,25 +45,62 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type return nil } -func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - endTime := time.Now() +func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) (error, bool) { + var corruptCnt = 0 + for _, symbol := range symbols { + log.Infof("verifying backtesting data...") - exCustom, ok := exchange.(types.CustomIntervalProvider) + for interval := range types.SupportedIntervals { + log.Infof("verifying %s %s kline data...", symbol, interval) - var supportIntervals map[types.Interval]int - if ok { - supportIntervals = exCustom.SupportedInterval() - } else { - supportIntervals = types.SupportedIntervals - } + klineC, errC := s.QueryKLinesCh(startTime, time.Now(), sourceExchange, []string{symbol}, []types.Interval{interval}) + var emptyKLine types.KLine + var prevKLine types.KLine + for k := range klineC { + if verboseCnt > 1 { + fmt.Fprint(os.Stderr, ".") + } - for interval := range supportIntervals { - if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil { - return err + if prevKLine != emptyKLine { + if prevKLine.StartTime.Unix() == k.StartTime.Unix() { + s._deleteDuplicatedKLine(k) + log.Errorf("found kline data duplicated at time: %s kline: %+v , deleted it", k.StartTime, k) + } else if prevKLine.StartTime.Add(interval.Duration()) != k.StartTime { + corruptCnt++ + log.Errorf("found kline data corrupted at time: %s kline: %+v", k.StartTime, k) + log.Errorf("between %d and %d", + prevKLine.StartTime.Unix(), + k.StartTime.Unix()) + } + } + + prevKLine = k + } + + if verboseCnt > 1 { + fmt.Fprintln(os.Stderr) + } + + if err := <-errC; err != nil { + return err, true + } } } - return nil + log.Infof("backtest verification completed") + if corruptCnt > 0 { + log.Errorf("found %d corruptions", corruptCnt) + } else { + log.Infof("found %d corruptions", corruptCnt) + } + return nil, false +} + +func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, + startTime time.Time, endTime time.Time, interval types.Interval) error { + + return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime) + } func (s *BacktestService) QueryFirstKLine(ex types.ExchangeName, symbol string, interval types.Interval) (*types.KLine, error) { @@ -290,3 +318,34 @@ func (s *BacktestService) BatchInsert(kline []types.KLine) error { _, err := s.DB.NamedExec(sql, kline) return err } + +func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error { + + if len(k.Exchange) == 0 { + return errors.New("kline.Exchange field should not be empty") + } + + tableName := s._targetKlineTable(k.Exchange) + sql := fmt.Sprintf("delete from `%s` where gid = :gid ", tableName) + _, err := s.DB.NamedExec(sql, k) + return err +} + +func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange, symbol string, + fromTime time.Time, endTime time.Time, interval types.Interval) error { + klineC, errC := s.QueryKLinesCh(fromTime, endTime, exchange, []string{symbol}, []types.Interval{interval}) + + nowStartTime := fromTime + for k := range klineC { + if nowStartTime.Add(interval.Duration()).Unix() < k.StartTime.Unix() { + log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime) + s.Sync(ctx, exchange, symbol, nowStartTime.Add(interval.Duration()), k.EndTime.Add(-1*interval.Duration()), interval) + } + nowStartTime = k.StartTime + } + + if err := <-errC; err != nil { + return err + } + return nil +}