fix timezone issue for sqlite and mysql

This commit is contained in:
c9s 2022-06-07 00:48:13 +08:00
parent b32b852303
commit 53e74b6262
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
5 changed files with 58 additions and 25 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strings" "strings"
"syscall" "syscall"
"time" "time"
@ -214,13 +215,13 @@ var BacktestCmd = &cobra.Command{
if wantSync { if wantSync {
log.Infof("starting synchronization: %v", userConfig.Backtest.Symbols) log.Infof("starting synchronization: %v", userConfig.Backtest.Symbols)
if err := sync(ctx, userConfig, backtestService, sourceExchanges, syncFromTime, endTime); err != nil { if err := sync(ctx, userConfig, backtestService, sourceExchanges, syncFromTime.Local(), endTime.Local()); err != nil {
return err return err
} }
log.Info("synchronization done") log.Info("synchronization done")
if shouldVerify { if shouldVerify {
err := verify(userConfig, backtestService, sourceExchanges, syncFromTime, endTime) err := verify(userConfig, backtestService, sourceExchanges, syncFromTime.Local(), endTime.Local())
if err != nil { if err != nil {
return err return err
} }
@ -662,7 +663,16 @@ func sync(ctx context.Context, userConfig *bbgo.Config, backtestService *service
supportIntervals = types.SupportedIntervals supportIntervals = types.SupportedIntervals
} }
// sort intervals
var intervals []types.Interval
for interval := range supportIntervals { for interval := range supportIntervals {
intervals = append(intervals, interval)
}
sort.Slice(intervals, func(i, j int) bool {
return intervals[i].Duration() < intervals[j].Duration()
})
for _, interval := range intervals {
firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, interval) firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, interval)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to query backtest kline") return errors.Wrapf(err, "failed to query backtest kline")

View File

@ -2,6 +2,7 @@ package batch
import ( import (
"context" "context"
"strconv"
"time" "time"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
@ -22,11 +23,11 @@ func (e *KLineBatchQuery) Query(ctx context.Context, symbol string, interval typ
}) })
}, },
T: func(obj interface{}) time.Time { T: func(obj interface{}) time.Time {
return time.Time(obj.(types.KLine).StartTime).UTC() return time.Time(obj.(types.KLine).StartTime)
}, },
ID: func(obj interface{}) string { ID: func(obj interface{}) string {
kline := obj.(types.KLine) kline := obj.(types.KLine)
return kline.StartTime.String() return strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
}, },
} }

View File

@ -1240,11 +1240,18 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
var kLines []types.KLine var kLines []types.KLine
for _, k := range resp { for _, k := range resp {
startTime := types.NewTimeFromUnix(0, k.OpenTime*int64(time.Millisecond))
if options.EndTime != nil {
if !startTime.Before(*options.EndTime) {
continue
}
}
kLines = append(kLines, types.KLine{ kLines = append(kLines, types.KLine{
Exchange: types.ExchangeBinance, Exchange: types.ExchangeBinance,
Symbol: symbol, Symbol: symbol,
Interval: interval, Interval: interval,
StartTime: types.NewTimeFromUnix(0, k.OpenTime*int64(time.Millisecond)), StartTime: startTime,
EndTime: types.NewTimeFromUnix(0, k.CloseTime*int64(time.Millisecond)), EndTime: types.NewTimeFromUnix(0, k.CloseTime*int64(time.Millisecond)),
Open: fixedpoint.MustNewFromString(k.Open), Open: fixedpoint.MustNewFromString(k.Open),
Close: fixedpoint.MustNewFromString(k.Close), Close: fixedpoint.MustNewFromString(k.Close),
@ -1259,6 +1266,11 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
Closed: true, Closed: true,
}) })
} }
kLines = types.SortKLinesAscending(kLines)
for _, k := range kLines {
log.Info(k)
}
return kLines, nil return kLines, nil
} }

View File

@ -41,20 +41,25 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
Type: types.KLine{}, Type: types.KLine{},
Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100), Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100),
Time: func(obj interface{}) time.Time { Time: func(obj interface{}) time.Time {
return obj.(types.KLine).StartTime.Time().UTC() return obj.(types.KLine).StartTime.Time()
}, },
ID: func(obj interface{}) string { ID: func(obj interface{}) string {
kline := obj.(types.KLine) kline := obj.(types.KLine)
return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10) return strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
// return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
}, },
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
q := &batch.KLineBatchQuery{Exchange: exchange} q := &batch.KLineBatchQuery{Exchange: exchange}
return q.Query(ctx, symbol, interval, startTime, endTime) return q.Query(ctx, symbol, interval, startTime, endTime)
}, },
BatchInsertBuffer: 500, // BatchInsertBuffer: 500,
BatchInsert: func(obj interface{}) error { // BatchInsert: func(obj interface{}) error {
kLines := obj.([]types.KLine) // kLines := obj.([]types.KLine)
return s.BatchInsert(kLines) // return s.BatchInsert(kLines)
// },
Insert: func(obj interface{}) error {
kline := obj.(types.KLine)
return s.Insert(kline)
}, },
LogInsert: log.GetLevel() == log.DebugLevel, LogInsert: log.GetLevel() == log.DebugLevel,
}, },
@ -331,10 +336,6 @@ func (t *TimeRange) String() string {
// create a time range slice []TimeRange // create a time range slice []TimeRange
// iterate the []TimeRange slice to sync data. // iterate the []TimeRange slice to sync data.
func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error { func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error {
// truncate time point to minute
since = since.Truncate(time.Minute)
until = until.Truncate(time.Minute)
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until) t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
return err return err
@ -345,7 +346,6 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy
return s.SyncFresh(ctx, ex, symbol, interval, since, until) return s.SyncFresh(ctx, ex, symbol, interval, since, until)
} }
log.Debugf("found existing kline data, now using partial sync...")
timeRanges, err := s.FindMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time()) timeRanges, err := s.FindMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time())
if err != nil { if err != nil {
return err return err
@ -358,22 +358,22 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy
// there are few cases: // there are few cases:
// t1 == since && t2 == until // t1 == since && t2 == until
// [since] ------- [t1] data [t2] ------ [until] // [since] ------- [t1] data [t2] ------ [until]
if since.Before(t1.Time()) { if since.Before(t1.Time()) && t1.Time().Sub(since) > interval.Duration() {
// shift slice // shift slice
timeRanges = append([]TimeRange{ timeRanges = append([]TimeRange{
{Start: since.Add(-2 * time.Second), End: t1.Time()}, // include since {Start: since.Add(-2 * time.Second), End: t1.Time()}, // we should include since
}, timeRanges...) }, timeRanges...)
} }
if t2.Time().Before(until) { if t2.Time().Before(until) && until.Sub(t2.Time()) > interval.Duration() {
timeRanges = append(timeRanges, TimeRange{ timeRanges = append(timeRanges, TimeRange{
Start: t2.Time(), Start: t2.Time(),
End: until.Add(2 * time.Second), // include until End: until.Add(-interval.Duration()), // include until
}) })
} }
for _, timeRange := range timeRanges { for _, timeRange := range timeRanges {
err = s.SyncKLineByInterval(ctx, ex, symbol, interval, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second)) err = s.SyncKLineByInterval(ctx, ex, symbol, interval, timeRange.Start.Add(time.Second), timeRange.End)
if err != nil { if err != nil {
return err return err
} }
@ -481,6 +481,9 @@ func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.I
} }
if len(args) == 2 { if len(args) == 2 {
// NOTE
// sqlite does not support timezone format, so we are converting to local timezone
// mysql works in this case, so this is a workaround
since := args[0] since := args[0]
until := args[1] until := args[1]
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until)) conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))

View File

@ -92,8 +92,9 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
defer func() { defer func() {
if sel.BatchInsert != nil && batchBufferRefVal.Len() > 0 { if sel.BatchInsert != nil && batchBufferRefVal.Len() > 0 {
if err := sel.BatchInsert(batchBufferRefVal.Interface()); err != nil { slice := batchBufferRefVal.Interface()
logrus.WithError(err).Errorf("batch insert error") if err := sel.BatchInsert(slice); err != nil {
logrus.WithError(err).Errorf("batch insert error: %+v", slice)
} }
} }
}() }()
@ -107,7 +108,6 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
default: default:
v, ok := dataCRef.Recv() v, ok := dataCRef.Recv()
if !ok { if !ok {
err := <-errC err := <-errC
return err return err
} }
@ -118,6 +118,11 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
continue continue
} }
tt := sel.Time(obj)
if tt.Before(startTime) || tt.Equal(endTime) || tt.After(endTime) {
continue
}
if sel.Filter != nil { if sel.Filter != nil {
if !sel.Filter(obj) { if !sel.Filter(obj) {
continue continue
@ -125,7 +130,7 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
} }
if sel.BatchInsert != nil { if sel.BatchInsert != nil {
if batchBufferRefVal.Len() >= sel.BatchInsertBuffer { if batchBufferRefVal.Len() >= sel.BatchInsertBuffer-1 {
if sel.LogInsert { if sel.LogInsert {
logrus.Infof("batch inserting %d %T", batchBufferRefVal.Len(), obj) logrus.Infof("batch inserting %d %T", batchBufferRefVal.Len(), obj)
} else { } else {
@ -148,10 +153,12 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
if sel.Insert != nil { if sel.Insert != nil {
// for custom insert // for custom insert
if err := sel.Insert(obj); err != nil { if err := sel.Insert(obj); err != nil {
logrus.WithError(err).Errorf("can not insert record: %v", obj)
return err return err
} }
} else { } else {
if err := insertType(db, obj); err != nil { if err := insertType(db, obj); err != nil {
logrus.WithError(err).Errorf("can not insert record: %v", obj)
return err return err
} }
} }