mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-21 22:43:52 +00:00
simplify ftx kline sync call
This commit is contained in:
parent
b9b2b8727a
commit
b8eb036556
|
@ -391,13 +391,7 @@ var BacktestCmd = &cobra.Command{
|
|||
var numOfExchangeSources = len(exchangeSources)
|
||||
if numOfExchangeSources == 1 {
|
||||
exSource := exchangeSources[0]
|
||||
var lastk types.KLine
|
||||
for k := range exSource.C {
|
||||
// avoid duplicated klines
|
||||
if k == lastk {
|
||||
continue
|
||||
}
|
||||
|
||||
exSource.Exchange.ConsumeKLine(k)
|
||||
|
||||
for _, h := range kLineHandlers {
|
||||
|
|
|
@ -25,12 +25,10 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
|
|||
defer close(c)
|
||||
defer close(errC)
|
||||
|
||||
tryQueryKlineTimes := 0
|
||||
|
||||
var currentTime = startTime
|
||||
for currentTime.Before(endTime) {
|
||||
var tryQueryKlineTimes = 0
|
||||
for startTime.Before(endTime) {
|
||||
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
|
||||
StartTime: ¤tTime,
|
||||
StartTime: &startTime,
|
||||
EndTime: &endTime,
|
||||
})
|
||||
|
||||
|
@ -39,12 +37,13 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
|
|||
return
|
||||
}
|
||||
|
||||
sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() })
|
||||
// ensure the kline is in the right order
|
||||
sort.Slice(kLines, func(i, j int) bool {
|
||||
return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix()
|
||||
})
|
||||
|
||||
if len(kLines) == 0 {
|
||||
return
|
||||
} else if len(kLines) == 1 && kLines[0].StartTime.Unix() == currentTime.Unix() {
|
||||
return
|
||||
}
|
||||
|
||||
tryQueryKlineTimes++
|
||||
|
@ -53,7 +52,7 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
|
|||
var batchKLines = make([]types.KLine, 0, BatchSize)
|
||||
for _, kline := range kLines {
|
||||
// ignore any kline before the given start time of the batch query
|
||||
if currentTime.Unix() != startTime.Unix() && kline.StartTime.Before(currentTime) {
|
||||
if kline.StartTime.Before(startTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -74,7 +73,8 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
|
|||
}
|
||||
|
||||
// The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
|
||||
currentTime = kline.StartTime.Time()
|
||||
// (above comment was written by @tony1223)
|
||||
startTime = kline.EndTime.Time()
|
||||
tryQueryKlineTimes = 0
|
||||
}
|
||||
|
||||
|
|
|
@ -231,85 +231,23 @@ func (e *Exchange) IsSupportedInterval(interval types.Interval) bool {
|
|||
|
||||
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
|
||||
var klines []types.KLine
|
||||
var since, until, currentEnd time.Time
|
||||
if options.StartTime != nil {
|
||||
since = *options.StartTime
|
||||
}
|
||||
if options.EndTime != nil {
|
||||
until = *options.EndTime
|
||||
} else {
|
||||
until = time.Now()
|
||||
|
||||
// the fetch result is from newest to oldest
|
||||
// currentEnd = until
|
||||
// endTime := currentEnd.Add(interval.Duration())
|
||||
klines, err := e._queryKLines(ctx, symbol, interval, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
currentEnd = until
|
||||
|
||||
for {
|
||||
|
||||
// the fetch result is from newest to oldest
|
||||
endTime := currentEnd.Add(interval.Duration())
|
||||
options.EndTime = &endTime
|
||||
lines, err := e._queryKLines(ctx, symbol, interval, types.KLineQueryOptions{
|
||||
StartTime: &since,
|
||||
EndTime: ¤tEnd,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(lines) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, line := range lines {
|
||||
|
||||
if line.StartTime.Unix() < currentEnd.Unix() {
|
||||
currentEnd = line.StartTime.Time()
|
||||
}
|
||||
|
||||
if line.StartTime.Unix() > since.Unix() {
|
||||
klines = append(klines, line)
|
||||
}
|
||||
}
|
||||
|
||||
if len(lines) == 1 && lines[0].StartTime.Unix() == currentEnd.Unix() {
|
||||
break
|
||||
}
|
||||
|
||||
outBound := currentEnd.Add(interval.Duration()*-1).Unix() <= since.Unix()
|
||||
if since.IsZero() || currentEnd.Unix() == since.Unix() || outBound {
|
||||
break
|
||||
}
|
||||
|
||||
if options.Limit != 0 && options.Limit <= len(lines) {
|
||||
break
|
||||
}
|
||||
}
|
||||
sort.Slice(klines, func(i, j int) bool { return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() })
|
||||
|
||||
if options.Limit != 0 {
|
||||
limitedItems := len(klines) - options.Limit
|
||||
if limitedItems > 0 {
|
||||
return klines[limitedItems:], nil
|
||||
}
|
||||
}
|
||||
sort.Slice(klines, func(i, j int) bool {
|
||||
return klines[i].StartTime.Unix() < klines[j].StartTime.Unix()
|
||||
})
|
||||
|
||||
return klines, nil
|
||||
}
|
||||
|
||||
func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
|
||||
var since, until time.Time
|
||||
if options.StartTime != nil {
|
||||
since = *options.StartTime
|
||||
}
|
||||
if options.EndTime != nil {
|
||||
until = *options.EndTime
|
||||
} else {
|
||||
until = time.Now()
|
||||
}
|
||||
if since.After(until) {
|
||||
return nil, fmt.Errorf("invalid query klines time range, since: %+v, until: %+v", since, until)
|
||||
}
|
||||
if !isIntervalSupportedInKLine(interval) {
|
||||
return nil, fmt.Errorf("interval %s is not supported", interval.String())
|
||||
}
|
||||
|
@ -318,7 +256,7 @@ func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval typ
|
|||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, 0, since, until)
|
||||
resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), options.StartTime, options.EndTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -621,7 +559,10 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri
|
|||
}
|
||||
|
||||
// ctx context.Context, market string, interval types.Interval, limit int64, start, end time.Time
|
||||
prices, err := rest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, time.Now().Add(time.Duration(-1)*time.Hour), time.Now())
|
||||
now := time.Now()
|
||||
since := now.Add(time.Duration(-1) * time.Hour)
|
||||
until := now
|
||||
prices, err := rest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, &since, &until)
|
||||
if err != nil || !prices.Success || len(prices.Result) == 0 {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ type marketRequest struct {
|
|||
supported resolutions: window length in seconds. options: 15, 60, 300, 900, 3600, 14400, 86400
|
||||
doc: https://docs.ftx.com/?javascript#get-historical-prices
|
||||
*/
|
||||
func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, interval types.Interval, limit int64, start, end time.Time) (HistoricalPricesResponse, error) {
|
||||
func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, interval types.Interval, limit int64, start, end *time.Time) (HistoricalPricesResponse, error) {
|
||||
q := map[string]string{
|
||||
"resolution": strconv.FormatInt(int64(interval.Minutes())*60, 10),
|
||||
}
|
||||
|
@ -27,11 +27,11 @@ func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, int
|
|||
q["limit"] = strconv.FormatInt(limit, 10)
|
||||
}
|
||||
|
||||
if start != (time.Time{}) {
|
||||
if start != nil {
|
||||
q["start_time"] = strconv.FormatInt(start.Unix(), 10)
|
||||
}
|
||||
|
||||
if end != (time.Time{}) {
|
||||
if end != nil {
|
||||
q["end_time"] = strconv.FormatInt(end.Unix(), 10)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user