service: use batch insert for kline

This commit is contained in:
c9s 2022-06-06 17:21:31 +08:00
parent 5688a79917
commit 022775d0a2
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 76 additions and 23 deletions

View File

@ -671,13 +671,12 @@ func sync(ctx context.Context, userConfig *bbgo.Config, backtestService *service
// if we don't have klines before the start time endpoint, the back-test will fail.
// because the last price will be missing.
if firstKLine != nil {
if err := backtestService.SyncPartial(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil {
return err
}
} else {
log.Debugf("starting a fresh kline data sync...")
if err := backtestService.Sync(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil {
if err := backtestService.SyncFresh(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil {
return err
}
}

View File

@ -51,9 +51,10 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
q := &batch.KLineBatchQuery{Exchange: exchange}
return q.Query(ctx, symbol, interval, startTime, endTime)
},
Insert: func(obj interface{}) error {
kline := obj.(types.KLine)
return s.Insert(kline)
BatchInsertBuffer: 500,
BatchInsert: func(obj interface{}) error {
kLines := obj.([]types.KLine)
return s.BatchInsert(kLines)
},
LogInsert: log.GetLevel() == log.DebugLevel,
},
@ -74,7 +75,8 @@ func (s *BacktestService) Verify(sourceExchange types.Exchange, symbols []string
for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s backtesting data: %s to %s...", symbol, interval, startTime, endTime)
timeRanges, err := s.FindMissingTimeRanges(context.Background(), sourceExchange, symbol, interval, startTime, endTime)
timeRanges, err := s.FindMissingTimeRanges(context.Background(), sourceExchange, symbol, interval,
startTime, endTime)
if err != nil {
return err
}
@ -101,7 +103,9 @@ func (s *BacktestService) Verify(sourceExchange types.Exchange, symbols []string
return nil
}
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
func (s *BacktestService) SyncFresh(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
startTime = startTime.Truncate(time.Minute).Add(-2 * time.Second)
endTime = endTime.Truncate(time.Minute).Add(2 * time.Second)
return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
}
@ -291,6 +295,24 @@ func (s *BacktestService) Insert(kline types.KLine) error {
return err
}
// BatchInsert Note: all kline should be same exchange, or it will cause issue.
func (s *BacktestService) BatchInsert(kline []types.KLine) error {
if len(kline) == 0 {
return nil
}
tableName := targetKlineTable(kline[0].Exchange)
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
" VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName)
tx := s.DB.MustBegin()
if _, err := tx.NamedExec(sql, kline); err != nil {
return err
}
return tx.Commit()
}
type TimeRange struct {
Start time.Time
End time.Time
@ -317,7 +339,7 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy
if err == sql.ErrNoRows || t1 == nil || t2 == nil {
// fallback to fresh sync
return s.Sync(ctx, ex, symbol, interval, since, until)
return s.SyncFresh(ctx, ex, symbol, interval, since, until)
}
log.Debugf("found existing kline data, now using partial sync...")
@ -391,7 +413,7 @@ func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Ex
lastTime = t
}
if lastTime.Before(until) {
if lastTime.Before(until) && until.Sub(lastTime) > intervalDuration*2 {
timeRanges = append(timeRanges, TimeRange{
Start: lastTime,
End: until,

View File

@ -37,17 +37,24 @@ type SyncTask struct {
// Filter is an optional field, which is used for filtering the remote records
Filter func(obj interface{}) bool
// BatchQuery is used for querying remote records.
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
// Insert is an option field, which is used for customizing the record insert
Insert func(obj interface{}) error
// Insert is an option field, which is used for customizing the record batch insert
BatchInsert func(obj interface{}) error
BatchInsertBuffer int
// LogInsert logs the insert record in INFO level
LogInsert bool
// BatchQuery is used for querying remote records.
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
}
func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time, args ...time.Time) error {
batchBufferRefVal := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer)
// query from db
recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type)
if err != nil {
@ -83,6 +90,14 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
dataC, errC := sel.BatchQuery(ctx, startTime, endTime)
dataCRef := reflect.ValueOf(dataC)
defer func() {
if sel.BatchInsert != nil && batchBufferRefVal.Len() > 0 {
if err := sel.BatchInsert(batchBufferRefVal.Interface()); err != nil {
logrus.WithError(err).Errorf("batch insert error")
}
}
}()
for {
select {
case <-ctx.Done():
@ -92,6 +107,7 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
default:
v, ok := dataCRef.Recv()
if !ok {
err := <-errC
return err
}
@ -108,12 +124,27 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
}
}
if sel.BatchInsert != nil {
if batchBufferRefVal.Len() >= sel.BatchInsertBuffer {
if sel.LogInsert {
logrus.Infof("batch inserting %d %T", batchBufferRefVal.Len(), obj)
} else {
logrus.Debugf("batch inserting %d %T", batchBufferRefVal.Len(), obj)
}
if err := sel.BatchInsert(batchBufferRefVal.Interface()); err != nil {
return err
}
batchBufferRefVal = reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer)
}
batchBufferRefVal = reflect.Append(batchBufferRefVal, v)
} else {
if sel.LogInsert {
logrus.Infof("inserting %T: %+v", obj, obj)
} else {
logrus.Debugf("inserting %T: %+v", obj, obj)
}
if sel.Insert != nil {
// for custom insert
if err := sel.Insert(obj); err != nil {
@ -127,6 +158,7 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
}
}
}
}
func lastRecordTime(sel SyncTask, recordSlice reflect.Value, defaultTime time.Time) time.Time {
since := defaultTime