remove start time query condition for trade sync since starting from trade id = 1 works

This commit is contained in:
c9s 2021-02-19 14:18:50 +08:00
parent 44fa74a4c9
commit dd13b9a8bf
4 changed files with 6 additions and 20 deletions

View File

@ -293,7 +293,7 @@ func (session *ExchangeSession) InitSymbol(ctx context.Context, environ *Environ
var trades []types.Trade var trades []types.Trade
if environ.TradeSync != nil { if environ.TradeSync != nil {
log.Infof("syncing trades from %s for symbol %s...", session.Exchange.Name(), symbol) log.Infof("syncing trades from %s for symbol %s...", session.Exchange.Name(), symbol)
if err := environ.TradeSync.SyncTrades(ctx, session.Exchange, symbol, environ.tradeScanTime); err != nil { if err := environ.TradeSync.SyncTrades(ctx, session.Exchange, symbol); err != nil {
return err return err
} }

View File

@ -1,2 +0,0 @@
package bbgo

View File

@ -118,12 +118,6 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
var lastTradeID = options.LastTradeID var lastTradeID = options.LastTradeID
// last 7 days
var startTime = time.Now().Add(-7 * 24 * time.Hour)
if options.StartTime != nil {
startTime = *options.StartTime
}
go func() { go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
@ -137,13 +131,12 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
logrus.WithError(err).Error("rate limit error") logrus.WithError(err).Error("rate limit error")
} }
logrus.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit) logrus.Debugf("querying %s trades from id=%d limit=%d", symbol, lastTradeID, options.Limit)
var err error var err error
var trades []types.Trade var trades []types.Trade
trades, err = e.Exchange.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ trades, err = e.Exchange.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
Limit: options.Limit, Limit: options.Limit,
LastTradeID: lastTradeID, LastTradeID: lastTradeID,
}) })
@ -164,8 +157,6 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
logrus.Debugf("returned %d trades", len(trades)) logrus.Debugf("returned %d trades", len(trades))
// increase the window to the next time frame by adding 1 millisecond
startTime = time.Time(trades[len(trades)-1].Time)
for _, t := range trades { for _, t := range trades {
key := t.Key() key := t.Key()
if _, ok := tradeKeys[key]; ok { if _, ok := tradeKeys[key]; ok {

View File

@ -65,7 +65,7 @@ func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, s
return <-errC return <-errC
} }
func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string) error {
isMargin := false isMargin := false
isIsolated := false isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok { if marginExchange, ok := exchange.(types.MarginExchange); ok {
@ -83,7 +83,7 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
} }
var tradeKeys = map[types.TradeKey]struct{}{} var tradeKeys = map[types.TradeKey]struct{}{}
var lastTradeID int64 = 0 var lastTradeID int64 = 1
if len(lastTrades) > 0 { if len(lastTrades) > 0 {
for _, t := range lastTrades { for _, t := range lastTrades {
tradeKeys[t.Key()] = struct{}{} tradeKeys[t.Key()] = struct{}{}
@ -91,14 +91,11 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
lastTrade := lastTrades[len(lastTrades)-1] lastTrade := lastTrades[len(lastTrades)-1]
lastTradeID = lastTrade.ID lastTradeID = lastTrade.ID
logrus.Debugf("found last trade, start from lastID = %d", lastTrade.ID)
startTime = time.Time(lastTrade.Time)
logrus.Debugf("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime)
} }
b := &batch.ExchangeBatchProcessor{Exchange: exchange} b := &batch.ExchangeBatchProcessor{Exchange: exchange}
tradeC, errC := b.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{ tradeC, errC := b.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
LastTradeID: lastTradeID, LastTradeID: lastTradeID,
}) })
@ -135,7 +132,7 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
// SyncSessionSymbols syncs the trades from the given exchange session // SyncSessionSymbols syncs the trades from the given exchange session
func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error { func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error {
for _, symbol := range symbols { for _, symbol := range symbols {
if err := s.SyncTrades(ctx, exchange, symbol, startTime); err != nil { if err := s.SyncTrades(ctx, exchange, symbol); err != nil {
return err return err
} }