From f62235b94ef2666be22be80d1692b3c942e5a27e Mon Sep 17 00:00:00 2001 From: TonyQ Date: Mon, 20 Dec 2021 20:28:22 +0800 Subject: [PATCH] backtest : finetune for kline scan logic to prevent hanging for query --- pkg/service/backtest.go | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index f239480d9..e6ef7594b 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -188,14 +188,7 @@ func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbo func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval) (chan types.KLine, chan error) { if len(symbols) == 0 { - - errC := make(chan error, 1) - // avoid blocking - go func() { - errC <- errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. ") - close(errC) - }() - return nil, errC + return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. ")) } tableName := s._targetKlineTable(exchange.Name()) @@ -211,24 +204,33 @@ func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.E }) sql, args, err = sqlx.In(sql, args...) + if err != nil { + return returnError(err) + } sql = s.DB.Rebind(sql) rows, err := s.DB.Queryx(sql, args...) if err != nil { - log.WithError(err).Error("backtest query error") - - errC := make(chan error, 1) - // avoid blocking - go func() { - errC <- err - close(errC) - }() - return nil, errC + return returnError(err) } return s.scanRowsCh(rows) } +func returnError(err error) (chan types.KLine, chan error) { + ch := make(chan types.KLine, 0) + close(ch) + log.WithError(err).Error("backtest query error") + + errC := make(chan error, 1) + // avoid blocking + go func() { + errC <- err + close(errC) + }() + return ch, errC +} + // scanRowsCh scan rows into channel func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) (chan types.KLine, chan error) { ch := make(chan types.KLine, 500) @@ -344,7 +346,9 @@ func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange nowStartTime = k.StartTime.Time().Add(interval.Duration()) } - s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval) + if nowStartTime.Unix() < endTime.Unix() && nowStartTime.Unix() < time.Now().Unix() { + s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval) + } if err := <-errC; err != nil { return err