Merge pull request #1667 from c9s/c9s/fix-profit-fixer-batch-query

FIX: [common] fix profit fixer batch query
This commit is contained in:
c9s 2024-07-08 17:57:20 +08:00 committed by GitHub
commit e7a20db048
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -39,9 +39,9 @@ func (f *ProfitFixer) batchQueryTrades(
service types.ExchangeTradeHistoryService, service types.ExchangeTradeHistoryService,
symbol string, symbol string,
since, until time.Time, since, until time.Time,
) ([]types.Trade, error) { ) (chan types.Trade, chan error) {
q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service} q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service}
return q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ return q.Query(ctx, symbol, &types.TradeQueryOptions{
StartTime: &since, StartTime: &since,
EndTime: &until, EndTime: &until,
}) })
@ -58,16 +58,26 @@ func (f *ProfitFixer) aggregateAllTrades(ctx context.Context, symbol string, sin
service := s service := s
g.Go(func() error { g.Go(func() error {
log.Infof("batch querying %s trade history from %s since %s until %s", symbol, sessionName, since.String(), until.String()) log.Infof("batch querying %s trade history from %s since %s until %s", symbol, sessionName, since.String(), until.String())
trades, err := f.batchQueryTrades(subCtx, service, symbol, since, until) tradeC, errC := f.batchQueryTrades(subCtx, service, symbol, since, until)
if err != nil {
log.WithError(err).Errorf("unable to batch query trades for fixer")
return err
}
mu.Lock() for {
allTrades = append(allTrades, trades...) select {
mu.Unlock() case <-ctx.Done():
return nil return ctx.Err()
case err := <-errC:
return err
case trade, ok := <-tradeC:
if !ok {
return nil
}
mu.Lock()
allTrades = append(allTrades, trade)
mu.Unlock()
}
}
}) })
} }
@ -75,7 +85,10 @@ func (f *ProfitFixer) aggregateAllTrades(ctx context.Context, symbol string, sin
return nil, err return nil, err
} }
mu.Lock()
allTrades = types.SortTradesAscending(allTrades) allTrades = types.SortTradesAscending(allTrades)
mu.Unlock()
return allTrades, nil return allTrades, nil
} }