avoid using last trade id for syncing data

This commit is contained in:
c9s 2021-02-18 16:40:47 +08:00
parent 43c7da59f8
commit c3dbb1b204
3 changed files with 6 additions and 19 deletions

View File

@ -703,6 +703,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
if options.EndTime != nil { if options.EndTime != nil {
req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond)) req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond))
} }
if options.LastTradeID > 0 { if options.LastTradeID > 0 {
req.FromID(options.LastTradeID) req.FromID(options.LastTradeID)
} }

View File

@ -81,18 +81,14 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
return err return err
} }
var lastID int64 = 0
if lastTrade != nil { if lastTrade != nil {
lastID = lastTrade.ID startTime = time.Time(lastTrade.Time).Add(time.Millisecond)
startTime = time.Time(lastTrade.Time) logrus.Infof("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime)
logrus.Infof("found last trade, start from lastID = %d since %s", lastID, startTime)
} }
batch := &types.ExchangeBatchProcessor{Exchange: exchange} batch := &types.ExchangeBatchProcessor{Exchange: exchange}
tradeC, errC := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{ tradeC, errC := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime, StartTime: &startTime,
LastTradeID: lastID,
}) })
for trade := range tradeC { for trade := range tradeC {

View File

@ -120,8 +120,6 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
startTime = *options.StartTime startTime = *options.StartTime
} }
var lastTradeID = options.LastTradeID
go func() { go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
@ -138,7 +136,7 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{ trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{
StartTime: &startTime, StartTime: &startTime,
Limit: options.Limit, Limit: options.Limit,
LastTradeID: lastTradeID, // LastTradeID: lastTradeID,
}) })
if err != nil { if err != nil {
errC <- err errC <- err
@ -149,21 +147,13 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
break break
} }
if len(trades) == 1 && trades[0].ID == lastTradeID {
break
}
logrus.Infof("returned %d trades", len(trades)) logrus.Infof("returned %d trades", len(trades))
startTime = time.Time(trades[len(trades)-1].Time) // increase the window to the next time frame by adding 1 millisecond
startTime = time.Time(trades[len(trades)-1].Time).Add(time.Millisecond)
for _, t := range trades { for _, t := range trades {
// ignore the first trade if last TradeID is given // ignore the first trade if last TradeID is given
if t.ID == lastTradeID {
continue
}
c <- t c <- t
lastTradeID = t.ID
} }
} }
}() }()