From 022775d0a2a821a47b4bddd6ac8e5668afafbbed Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 6 Jun 2022 17:21:31 +0800 Subject: [PATCH] service: use batch insert for kline --- pkg/cmd/backtest.go | 3 +- pkg/service/backtest.go | 36 +++++++++++++++++++----- pkg/service/sync_task.go | 60 ++++++++++++++++++++++++++++++---------- 3 files changed, 76 insertions(+), 23 deletions(-) diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index b5fbad7d2..ebe501722 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -671,13 +671,12 @@ func sync(ctx context.Context, userConfig *bbgo.Config, backtestService *service // if we don't have klines before the start time endpoint, the back-test will fail. // because the last price will be missing. if firstKLine != nil { - if err := backtestService.SyncPartial(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil { return err } } else { log.Debugf("starting a fresh kline data sync...") - if err := backtestService.Sync(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil { + if err := backtestService.SyncFresh(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil { return err } } diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index a55e874fc..9d445525f 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -51,9 +51,10 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type q := &batch.KLineBatchQuery{Exchange: exchange} return q.Query(ctx, symbol, interval, startTime, endTime) }, - Insert: func(obj interface{}) error { - kline := obj.(types.KLine) - return s.Insert(kline) + BatchInsertBuffer: 500, + BatchInsert: func(obj interface{}) error { + kLines := obj.([]types.KLine) + return s.BatchInsert(kLines) }, LogInsert: log.GetLevel() == log.DebugLevel, }, @@ -74,7 +75,8 @@ func (s *BacktestService) Verify(sourceExchange types.Exchange, symbols []string for interval := range types.SupportedIntervals { log.Infof("verifying %s %s backtesting data: %s to %s...", symbol, interval, startTime, endTime) - timeRanges, err := s.FindMissingTimeRanges(context.Background(), sourceExchange, symbol, interval, startTime, endTime) + timeRanges, err := s.FindMissingTimeRanges(context.Background(), sourceExchange, symbol, interval, + startTime, endTime) if err != nil { return err } @@ -101,7 +103,9 @@ func (s *BacktestService) Verify(sourceExchange types.Exchange, symbols []string return nil } -func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { +func (s *BacktestService) SyncFresh(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { + startTime = startTime.Truncate(time.Minute).Add(-2 * time.Second) + endTime = endTime.Truncate(time.Minute).Add(2 * time.Second) return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime) } @@ -291,6 +295,24 @@ func (s *BacktestService) Insert(kline types.KLine) error { return err } +// BatchInsert Note: all kline should be same exchange, or it will cause issue. +func (s *BacktestService) BatchInsert(kline []types.KLine) error { + if len(kline) == 0 { + return nil + } + + tableName := targetKlineTable(kline[0].Exchange) + + sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+ + " VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName) + + tx := s.DB.MustBegin() + if _, err := tx.NamedExec(sql, kline); err != nil { + return err + } + return tx.Commit() +} + type TimeRange struct { Start time.Time End time.Time @@ -317,7 +339,7 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy if err == sql.ErrNoRows || t1 == nil || t2 == nil { // fallback to fresh sync - return s.Sync(ctx, ex, symbol, interval, since, until) + return s.SyncFresh(ctx, ex, symbol, interval, since, until) } log.Debugf("found existing kline data, now using partial sync...") @@ -391,7 +413,7 @@ func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Ex lastTime = t } - if lastTime.Before(until) { + if lastTime.Before(until) && until.Sub(lastTime) > intervalDuration*2 { timeRanges = append(timeRanges, TimeRange{ Start: lastTime, End: until, diff --git a/pkg/service/sync_task.go b/pkg/service/sync_task.go index 0d49e239e..a04239591 100644 --- a/pkg/service/sync_task.go +++ b/pkg/service/sync_task.go @@ -37,17 +37,24 @@ type SyncTask struct { // Filter is an optional field, which is used for filtering the remote records Filter func(obj interface{}) bool + // BatchQuery is used for querying remote records. + BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) + // Insert is an option field, which is used for customizing the record insert Insert func(obj interface{}) error + // Insert is an option field, which is used for customizing the record batch insert + BatchInsert func(obj interface{}) error + + BatchInsertBuffer int + // LogInsert logs the insert record in INFO level LogInsert bool - - // BatchQuery is used for querying remote records. - BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) } func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time, args ...time.Time) error { + batchBufferRefVal := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer) + // query from db recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type) if err != nil { @@ -83,6 +90,14 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim dataC, errC := sel.BatchQuery(ctx, startTime, endTime) dataCRef := reflect.ValueOf(dataC) + defer func() { + if sel.BatchInsert != nil && batchBufferRefVal.Len() > 0 { + if err := sel.BatchInsert(batchBufferRefVal.Interface()); err != nil { + logrus.WithError(err).Errorf("batch insert error") + } + } + }() + for { select { case <-ctx.Done(): @@ -92,6 +107,7 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim default: v, ok := dataCRef.Recv() if !ok { + err := <-errC return err } @@ -108,20 +124,36 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim } } - if sel.LogInsert { - logrus.Infof("inserting %T: %+v", obj, obj) - } else { - logrus.Debugf("inserting %T: %+v", obj, obj) - } + if sel.BatchInsert != nil { + if batchBufferRefVal.Len() >= sel.BatchInsertBuffer { + if sel.LogInsert { + logrus.Infof("batch inserting %d %T", batchBufferRefVal.Len(), obj) + } else { + logrus.Debugf("batch inserting %d %T", batchBufferRefVal.Len(), obj) + } - if sel.Insert != nil { - // for custom insert - if err := sel.Insert(obj); err != nil { - return err + if err := sel.BatchInsert(batchBufferRefVal.Interface()); err != nil { + return err + } + + batchBufferRefVal = reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer) } + batchBufferRefVal = reflect.Append(batchBufferRefVal, v) } else { - if err := insertType(db, obj); err != nil { - return err + if sel.LogInsert { + logrus.Infof("inserting %T: %+v", obj, obj) + } else { + logrus.Debugf("inserting %T: %+v", obj, obj) + } + if sel.Insert != nil { + // for custom insert + if err := sel.Insert(obj); err != nil { + return err + } + } else { + if err := insertType(db, obj); err != nil { + return err + } } } }