fix empty start time sync issue

This commit is contained in:
c9s 2022-01-01 02:43:08 +08:00
parent 25f01b8837
commit 129f44bbcb
2 changed files with 26 additions and 8 deletions

View File

@ -304,10 +304,10 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
// However, when you query orders in done status, the start and end time range cannot exceed 7* 24 hours.
// An error will occur if the specified time window exceeds the range.
// If you specify the end time only, the system will automatically calculate the start time as end time minus 7*24 hours, and vice versa.
if until.Sub(since) < 7 * 24 * time.Hour {
if until.Sub(since) < 7*24*time.Hour {
req.EndAt(until)
} else {
req.EndAt(since.Add(7 * 24 * time.Hour - time.Minute))
req.EndAt(since.Add(7*24*time.Hour - time.Minute))
}
orderList, err := req.Do(ctx)
@ -323,11 +323,28 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
return orders, err
}
var launchDate = time.Date(2017, 9, 0, 0, 0, 0, 0, nil)
func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
req := e.client.TradeService.NewGetFillsRequest()
req.Symbol(toLocalSymbol(symbol))
if options.StartTime != nil {
// we always sync trades in the ascending order, and kucoin does not support last trade ID query
// hence we need to set the start time here
if options.StartTime != nil && options.StartTime.Before(launchDate) {
// copy the time data object
t := launchDate
options.StartTime = &t
}
if options.StartTime != nil && options.EndTime != nil {
req.StartAt(*options.StartTime)
if options.EndTime.Sub(*options.StartTime) < 7*24*time.Hour {
req.EndAt(*options.EndTime)
} else {
req.StartAt(options.StartTime.Add(7*24*time.Hour - time.Minute))
}
} else if options.StartTime != nil {
req.StartAt(*options.StartTime)
} else if options.EndTime != nil {
req.EndAt(*options.EndTime)

View File

@ -72,7 +72,6 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
symbol = futuresSettings.IsolatedFuturesSymbol
}
}
// records descending ordered, buffer 50 trades and use the trades ID to scan if the new trades are duplicated
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 50)
@ -82,7 +81,8 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
var tradeKeys = map[types.TradeKey]struct{}{}
var lastTradeID uint64 = 1
var startTime time.Time
var lastTradeTime time.Time
var startTime *time.Time
var now = time.Now()
if len(records) > 0 {
for _, record := range records {
@ -90,14 +90,15 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
}
lastTradeID = records[0].ID
startTime = time.Time(records[0].Time)
lastTradeTime = time.Time(records[0].Time)
startTime = &lastTradeTime
}
b := &batch.TradeBatchQuery{Exchange: exchange}
tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{
LastTradeID: lastTradeID,
StartTime: &startTime,
EndTime: &now,
StartTime: startTime,
EndTime: &now,
})
for trade := range tradeC {