diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index ebe501722..89af3dfd4 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "sort" "strings" "syscall" "time" @@ -214,13 +215,13 @@ var BacktestCmd = &cobra.Command{ if wantSync { log.Infof("starting synchronization: %v", userConfig.Backtest.Symbols) - if err := sync(ctx, userConfig, backtestService, sourceExchanges, syncFromTime, endTime); err != nil { + if err := sync(ctx, userConfig, backtestService, sourceExchanges, syncFromTime.Local(), endTime.Local()); err != nil { return err } log.Info("synchronization done") if shouldVerify { - err := verify(userConfig, backtestService, sourceExchanges, syncFromTime, endTime) + err := verify(userConfig, backtestService, sourceExchanges, syncFromTime.Local(), endTime.Local()) if err != nil { return err } @@ -662,7 +663,16 @@ func sync(ctx context.Context, userConfig *bbgo.Config, backtestService *service supportIntervals = types.SupportedIntervals } + // sort intervals + var intervals []types.Interval for interval := range supportIntervals { + intervals = append(intervals, interval) + } + sort.Slice(intervals, func(i, j int) bool { + return intervals[i].Duration() < intervals[j].Duration() + }) + + for _, interval := range intervals { firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, interval) if err != nil { return errors.Wrapf(err, "failed to query backtest kline") diff --git a/pkg/exchange/batch/kline.go b/pkg/exchange/batch/kline.go index 2e9c65981..a4053fe34 100644 --- a/pkg/exchange/batch/kline.go +++ b/pkg/exchange/batch/kline.go @@ -2,6 +2,7 @@ package batch import ( "context" + "strconv" "time" "github.com/c9s/bbgo/pkg/types" @@ -22,11 +23,11 @@ func (e *KLineBatchQuery) Query(ctx context.Context, symbol string, interval typ }) }, T: func(obj interface{}) time.Time { - return time.Time(obj.(types.KLine).StartTime).UTC() + return time.Time(obj.(types.KLine).StartTime) }, ID: func(obj interface{}) string { kline := obj.(types.KLine) - return kline.StartTime.String() + return strconv.FormatInt(kline.StartTime.UnixMilli(), 10) }, } diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 7c9dd96b4..ccfcdc3b9 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -1240,11 +1240,18 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type var kLines []types.KLine for _, k := range resp { + startTime := types.NewTimeFromUnix(0, k.OpenTime*int64(time.Millisecond)) + if options.EndTime != nil { + if !startTime.Before(*options.EndTime) { + continue + } + } + kLines = append(kLines, types.KLine{ Exchange: types.ExchangeBinance, Symbol: symbol, Interval: interval, - StartTime: types.NewTimeFromUnix(0, k.OpenTime*int64(time.Millisecond)), + StartTime: startTime, EndTime: types.NewTimeFromUnix(0, k.CloseTime*int64(time.Millisecond)), Open: fixedpoint.MustNewFromString(k.Open), Close: fixedpoint.MustNewFromString(k.Close), @@ -1259,6 +1266,11 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type Closed: true, }) } + + kLines = types.SortKLinesAscending(kLines) + for _, k := range kLines { + log.Info(k) + } return kLines, nil } diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index 7859c38a3..cb9b28d51 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -41,20 +41,25 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type Type: types.KLine{}, Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100), Time: func(obj interface{}) time.Time { - return obj.(types.KLine).StartTime.Time().UTC() + return obj.(types.KLine).StartTime.Time() }, ID: func(obj interface{}) string { kline := obj.(types.KLine) - return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10) + return strconv.FormatInt(kline.StartTime.UnixMilli(), 10) + // return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10) }, BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { q := &batch.KLineBatchQuery{Exchange: exchange} return q.Query(ctx, symbol, interval, startTime, endTime) }, - BatchInsertBuffer: 500, - BatchInsert: func(obj interface{}) error { - kLines := obj.([]types.KLine) - return s.BatchInsert(kLines) + // BatchInsertBuffer: 500, + // BatchInsert: func(obj interface{}) error { + // kLines := obj.([]types.KLine) + // return s.BatchInsert(kLines) + // }, + Insert: func(obj interface{}) error { + kline := obj.(types.KLine) + return s.Insert(kline) }, LogInsert: log.GetLevel() == log.DebugLevel, }, @@ -331,10 +336,6 @@ func (t *TimeRange) String() string { // create a time range slice []TimeRange // iterate the []TimeRange slice to sync data. func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error { - // truncate time point to minute - since = since.Truncate(time.Minute) - until = until.Truncate(time.Minute) - t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until) if err != nil && err != sql.ErrNoRows { return err @@ -345,7 +346,6 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy return s.SyncFresh(ctx, ex, symbol, interval, since, until) } - log.Debugf("found existing kline data, now using partial sync...") timeRanges, err := s.FindMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time()) if err != nil { return err @@ -358,22 +358,22 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy // there are few cases: // t1 == since && t2 == until // [since] ------- [t1] data [t2] ------ [until] - if since.Before(t1.Time()) { + if since.Before(t1.Time()) && t1.Time().Sub(since) > interval.Duration() { // shift slice timeRanges = append([]TimeRange{ - {Start: since.Add(-2 * time.Second), End: t1.Time()}, // include since + {Start: since.Add(-2 * time.Second), End: t1.Time()}, // we should include since }, timeRanges...) } - if t2.Time().Before(until) { + if t2.Time().Before(until) && until.Sub(t2.Time()) > interval.Duration() { timeRanges = append(timeRanges, TimeRange{ Start: t2.Time(), - End: until.Add(2 * time.Second), // include until + End: until.Add(-interval.Duration()), // include until }) } for _, timeRange := range timeRanges { - err = s.SyncKLineByInterval(ctx, ex, symbol, interval, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second)) + err = s.SyncKLineByInterval(ctx, ex, symbol, interval, timeRange.Start.Add(time.Second), timeRange.End) if err != nil { return err } @@ -481,6 +481,9 @@ func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.I } if len(args) == 2 { + // NOTE + // sqlite does not support timezone format, so we are converting to local timezone + // mysql works in this case, so this is a workaround since := args[0] until := args[1] conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until)) diff --git a/pkg/service/sync_task.go b/pkg/service/sync_task.go index a04239591..796bccfe9 100644 --- a/pkg/service/sync_task.go +++ b/pkg/service/sync_task.go @@ -92,8 +92,9 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim defer func() { if sel.BatchInsert != nil && batchBufferRefVal.Len() > 0 { - if err := sel.BatchInsert(batchBufferRefVal.Interface()); err != nil { - logrus.WithError(err).Errorf("batch insert error") + slice := batchBufferRefVal.Interface() + if err := sel.BatchInsert(slice); err != nil { + logrus.WithError(err).Errorf("batch insert error: %+v", slice) } } }() @@ -107,7 +108,6 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim default: v, ok := dataCRef.Recv() if !ok { - err := <-errC return err } @@ -118,6 +118,11 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim continue } + tt := sel.Time(obj) + if tt.Before(startTime) || tt.Equal(endTime) || tt.After(endTime) { + continue + } + if sel.Filter != nil { if !sel.Filter(obj) { continue @@ -125,7 +130,7 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim } if sel.BatchInsert != nil { - if batchBufferRefVal.Len() >= sel.BatchInsertBuffer { + if batchBufferRefVal.Len() >= sel.BatchInsertBuffer-1 { if sel.LogInsert { logrus.Infof("batch inserting %d %T", batchBufferRefVal.Len(), obj) } else { @@ -148,10 +153,12 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim if sel.Insert != nil { // for custom insert if err := sel.Insert(obj); err != nil { + logrus.WithError(err).Errorf("can not insert record: %v", obj) return err } } else { if err := insertType(db, obj); err != nil { + logrus.WithError(err).Errorf("can not insert record: %v", obj) return err } }