common: fix profit fixer batch query

This commit is contained in:
c9s 2024-07-08 17:43:22 +08:00
parent d41de325f8
commit 22c154f9cd
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -39,9 +39,9 @@ func (f *ProfitFixer) batchQueryTrades(
service types.ExchangeTradeHistoryService,
symbol string,
since, until time.Time,
) ([]types.Trade, error) {
) (chan types.Trade, chan error) {
q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service}
return q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
return q.Query(ctx, symbol, &types.TradeQueryOptions{
StartTime: &since,
EndTime: &until,
})
@ -58,16 +58,26 @@ func (f *ProfitFixer) aggregateAllTrades(ctx context.Context, symbol string, sin
service := s
g.Go(func() error {
log.Infof("batch querying %s trade history from %s since %s until %s", symbol, sessionName, since.String(), until.String())
trades, err := f.batchQueryTrades(subCtx, service, symbol, since, until)
if err != nil {
log.WithError(err).Errorf("unable to batch query trades for fixer")
return err
}
tradeC, errC := f.batchQueryTrades(subCtx, service, symbol, since, until)
mu.Lock()
allTrades = append(allTrades, trades...)
mu.Unlock()
return nil
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
case trade, ok := <-tradeC:
if !ok {
return nil
}
mu.Lock()
allTrades = append(allTrades, trade)
mu.Unlock()
}
}
})
}
@ -75,7 +85,10 @@ func (f *ProfitFixer) aggregateAllTrades(ctx context.Context, symbol string, sin
return nil, err
}
mu.Lock()
allTrades = types.SortTradesAscending(allTrades)
mu.Unlock()
return allTrades, nil
}