diff --git a/pkg/strategy/common/profit_fixer.go b/pkg/strategy/common/profit_fixer.go index 91d4b8158..44817acdf 100644 --- a/pkg/strategy/common/profit_fixer.go +++ b/pkg/strategy/common/profit_fixer.go @@ -39,9 +39,9 @@ func (f *ProfitFixer) batchQueryTrades( service types.ExchangeTradeHistoryService, symbol string, since, until time.Time, -) ([]types.Trade, error) { +) (chan types.Trade, chan error) { q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service} - return q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ + return q.Query(ctx, symbol, &types.TradeQueryOptions{ StartTime: &since, EndTime: &until, }) @@ -58,16 +58,26 @@ func (f *ProfitFixer) aggregateAllTrades(ctx context.Context, symbol string, sin service := s g.Go(func() error { log.Infof("batch querying %s trade history from %s since %s until %s", symbol, sessionName, since.String(), until.String()) - trades, err := f.batchQueryTrades(subCtx, service, symbol, since, until) - if err != nil { - log.WithError(err).Errorf("unable to batch query trades for fixer") - return err - } + tradeC, errC := f.batchQueryTrades(subCtx, service, symbol, since, until) - mu.Lock() - allTrades = append(allTrades, trades...) - mu.Unlock() - return nil + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + return err + + case trade, ok := <-tradeC: + if !ok { + return nil + } + + mu.Lock() + allTrades = append(allTrades, trade) + mu.Unlock() + } + } }) } @@ -75,7 +85,10 @@ func (f *ProfitFixer) aggregateAllTrades(ctx context.Context, symbol string, sin return nil, err } + mu.Lock() allTrades = types.SortTradesAscending(allTrades) + mu.Unlock() + return allTrades, nil }