fix batch kline sync

This commit is contained in:
c9s 2021-12-26 03:04:21 +08:00
parent cf6da76ef0
commit 4c263dd205
2 changed files with 43 additions and 31 deletions

View File

@ -2,10 +2,11 @@ package batch
import (
"context"
"github.com/pkg/errors"
"sort"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
@ -94,26 +95,31 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
StartTime: &currentTime,
EndTime: &endTime,
})
sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() })
tryQueryKlineTimes++
if err != nil {
errC <- err
return
}
sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() })
if len(kLines) == 0 {
return
} else if len(kLines) == 1 && kLines[0].StartTime.Unix() == currentTime.Unix() {
return
}
tryQueryKlineTimes++
const BatchSize = 200
var batchKLines = make([]types.KLine, 0, BatchSize)
for _, kline := range kLines {
// ignore any kline before the given start time
if currentTime.Unix() != startTime.Unix() && kline.StartTime.Unix() <= currentTime.Unix() {
// ignore any kline before the given start time of the batch query
if currentTime.Unix() != startTime.Unix() && kline.StartTime.Before(currentTime) {
continue
}
// if there is a kline after the endTime of the batch query, it means the data is out of scope, we should exit
if kline.StartTime.After(endTime) || kline.EndTime.After(endTime) {
if len(batchKLines) != 0 {
c <- batchKLines
@ -129,18 +135,19 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
batchKLines = nil
}
//The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
// The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
currentTime = kline.StartTime.Time()
tryQueryKlineTimes = 0
}
if len(batchKLines) != 0 {
// push the rest klines in the buffer
if len(batchKLines) > 0 {
c <- batchKLines
batchKLines = nil
}
if tryQueryKlineTimes > 10 { // it means loop 10 times
errC <- errors.Errorf("there's a dead loop in batch.go#Query , symbol: %s , interval: %s, startTime :%s ", symbol, interval, startTime.String())
errC <- errors.Errorf("there's a dead loop in batch.go#Query , symbol: %s , interval: %s, startTime:%s ", symbol, interval, startTime.String())
return
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi"
"github.com/c9s/bbgo/pkg/types"
@ -27,7 +28,6 @@ type Exchange struct {
client *kucoinapi.RestClient
}
func New(key, secret, passphrase string) *Exchange {
client := kucoinapi.NewClient()
@ -126,16 +126,15 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbols ...string) (map[str
return tickers, nil
}
var supportedIntervals = map[types.Interval]int{
types.Interval1m: 60,
types.Interval5m: 60 * 5,
types.Interval5m: 60 * 15,
types.Interval1m: 60,
types.Interval5m: 60 * 5,
types.Interval5m: 60 * 15,
types.Interval30m: 60 * 30,
types.Interval1h: 60 * 60,
types.Interval2h: 60 * 60 * 2,
types.Interval4h: 60 * 60 * 4,
types.Interval6h: 60 * 60 * 6,
types.Interval1h: 60 * 60,
types.Interval2h: 60 * 60 * 2,
types.Interval4h: 60 * 60 * 4,
types.Interval6h: 60 * 60 * 6,
// types.Interval8h: 60 * 60 * 8,
types.Interval12h: 60 * 60 * 12,
}
@ -149,14 +148,20 @@ func (e *Exchange) IsSupportedInterval(interval types.Interval) bool {
return ok
}
var marketDataLimiter = rate.NewLimiter(rate.Every(200*time.Millisecond), 1)
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
_ = marketDataLimiter.Wait(ctx)
req := e.client.MarketDataService.NewGetKLinesRequest()
req.Symbol(toLocalSymbol(symbol))
req.Interval(toLocalInterval(interval))
if options.StartTime != nil {
req.StartAt(*options.StartTime)
} else if options.EndTime != nil {
req.StartAt(*options.EndTime)
}
if options.EndTime != nil {
req.EndAt(*options.EndTime)
}
ks, err := req.Do(ctx)
@ -168,18 +173,18 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
for _, k := range ks {
gi := toGlobalInterval(k.Interval)
klines = append(klines, types.KLine{
Exchange: types.ExchangeKucoin,
Symbol: toGlobalSymbol(k.Symbol),
StartTime: types.Time(k.StartTime),
EndTime: types.Time(k.StartTime.Add(gi.Duration() - time.Millisecond)),
Interval: gi,
Open: k.Open.Float64(),
Close: k.Close.Float64(),
High: k.High.Float64(),
Low: k.Low.Float64(),
Volume: k.Volume.Float64(),
QuoteVolume: k.QuoteVolume.Float64(),
Closed: true,
Exchange: types.ExchangeKucoin,
Symbol: toGlobalSymbol(k.Symbol),
StartTime: types.Time(k.StartTime),
EndTime: types.Time(k.StartTime.Add(gi.Duration() - time.Millisecond)),
Interval: gi,
Open: k.Open.Float64(),
Close: k.Close.Float64(),
High: k.High.Float64(),
Low: k.Low.Float64(),
Volume: k.Volume.Float64(),
QuoteVolume: k.QuoteVolume.Float64(),
Closed: true,
})
}