mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
Merge pull request #380 from tony1223/bug/kline-scan
backtest : finetune for kline scan logic to prevent hanging for
This commit is contained in:
commit
e0844459b9
|
@ -188,14 +188,7 @@ func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbo
|
|||
func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval) (chan types.KLine, chan error) {
|
||||
|
||||
if len(symbols) == 0 {
|
||||
|
||||
errC := make(chan error, 1)
|
||||
// avoid blocking
|
||||
go func() {
|
||||
errC <- errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. ")
|
||||
close(errC)
|
||||
}()
|
||||
return nil, errC
|
||||
return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. "))
|
||||
}
|
||||
|
||||
tableName := s._targetKlineTable(exchange.Name())
|
||||
|
@ -211,24 +204,33 @@ func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.E
|
|||
})
|
||||
|
||||
sql, args, err = sqlx.In(sql, args...)
|
||||
if err != nil {
|
||||
return returnError(err)
|
||||
}
|
||||
sql = s.DB.Rebind(sql)
|
||||
|
||||
rows, err := s.DB.Queryx(sql, args...)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("backtest query error")
|
||||
|
||||
errC := make(chan error, 1)
|
||||
// avoid blocking
|
||||
go func() {
|
||||
errC <- err
|
||||
close(errC)
|
||||
}()
|
||||
return nil, errC
|
||||
return returnError(err)
|
||||
}
|
||||
|
||||
return s.scanRowsCh(rows)
|
||||
}
|
||||
|
||||
func returnError(err error) (chan types.KLine, chan error) {
|
||||
ch := make(chan types.KLine, 0)
|
||||
close(ch)
|
||||
log.WithError(err).Error("backtest query error")
|
||||
|
||||
errC := make(chan error, 1)
|
||||
// avoid blocking
|
||||
go func() {
|
||||
errC <- err
|
||||
close(errC)
|
||||
}()
|
||||
return ch, errC
|
||||
}
|
||||
|
||||
// scanRowsCh scan rows into channel
|
||||
func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) (chan types.KLine, chan error) {
|
||||
ch := make(chan types.KLine, 500)
|
||||
|
@ -344,7 +346,9 @@ func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange
|
|||
nowStartTime = k.StartTime.Time().Add(interval.Duration())
|
||||
}
|
||||
|
||||
s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval)
|
||||
if nowStartTime.Unix() < endTime.Unix() && nowStartTime.Unix() < time.Now().Unix() {
|
||||
s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval)
|
||||
}
|
||||
|
||||
if err := <-errC; err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue
Block a user