add error handling

This commit is contained in:
c9s 2022-05-20 18:57:41 +08:00
parent 62de3a43ed
commit b9f0159537
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 17 additions and 8 deletions

View File

@ -13,6 +13,8 @@ import (
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
var log = logrus.WithField("component", "batch")
type KLineBatchQuery struct { type KLineBatchQuery struct {
types.Exchange types.Exchange
} }
@ -27,6 +29,8 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
var tryQueryKlineTimes = 0 var tryQueryKlineTimes = 0
for startTime.Before(endTime) { for startTime.Before(endTime) {
log.Debugf("batch query klines %s %s %s <=> %s", symbol, interval, startTime, endTime)
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &startTime, StartTime: &startTime,
EndTime: &endTime, EndTime: &endTime,
@ -113,10 +117,10 @@ func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Ti
for startTime.Before(endTime) { for startTime.Before(endTime) {
if err := limiter.Wait(ctx); err != nil { if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error") log.WithError(err).Error("rate limit error")
} }
logrus.Infof("batch querying rewards %s <=> %s", startTime, endTime) log.Infof("batch querying rewards %s <=> %s", startTime, endTime)
rewards, err := q.Service.QueryRewards(ctx, startTime) rewards, err := q.Service.QueryRewards(ctx, startTime)
if err != nil { if err != nil {

View File

@ -17,9 +17,9 @@ import (
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
var marketDataLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) var marketDataLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1)
var queryTradeLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) var queryTradeLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1)
var queryOrderLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) var queryOrderLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1)
var ErrMissingSequence = errors.New("sequence is missing") var ErrMissingSequence = errors.New("sequence is missing")

View File

@ -34,9 +34,10 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
if err := s.BatchInsert(klines); err != nil { if err := s.BatchInsert(klines); err != nil {
return err return err
} }
count += len(klines) count += len(klines)
} }
log.Infof("found %s kline %s data count: %d", symbol, interval.String(), count) log.Debugf("inserted klines %s %s data: %d", symbol, interval.String(), count)
if err := <-errC; err != nil { if err := <-errC; err != nil {
return err return err
@ -344,13 +345,17 @@ func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange
for k := range klineC { for k := range klineC {
if nowStartTime.Unix() < k.StartTime.Unix() { if nowStartTime.Unix() < k.StartTime.Unix() {
log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime) log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime)
s.Sync(ctx, exchange, symbol, nowStartTime, k.EndTime.Time().Add(-1*interval.Duration()), interval) if err := s.Sync(ctx, exchange, symbol, nowStartTime, k.EndTime.Time().Add(-1*interval.Duration()), interval); err != nil {
log.WithError(err).Errorf("sync error")
}
} }
nowStartTime = k.StartTime.Time().Add(interval.Duration()) nowStartTime = k.StartTime.Time().Add(interval.Duration())
} }
if nowStartTime.Unix() < endTime.Unix() && nowStartTime.Unix() < time.Now().Unix() { if nowStartTime.Unix() < endTime.Unix() && nowStartTime.Unix() < time.Now().Unix() {
s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval) if err := s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval); err != nil {
log.WithError(err).Errorf("sync error")
}
} }
if err := <-errC; err != nil { if err := <-errC; err != nil {