diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 3ed35b03d..197e41cb2 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -2678,6 +2678,94 @@ class Exchange: self._trades[(pair, timeframe, c_type)] = trades_df return trades_df + async def _build_trades_dl_jobs( + self, pairwt: PairWithTimeframe, data_handler, cache: bool + ) -> Tuple[PairWithTimeframe, Optional[DataFrame]]: + """ + Build coroutines to refresh trades for (they're then called through async.gather) + """ + pair, timeframe, candle_type = pairwt + since_ms = None + new_ticks: List = [] + all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"]) + first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type) + # refresh, if + # a. not in _trades + # b. no cache used + # c. need new data + is_in_cache = (pair, timeframe, candle_type) in self._trades + if ( + not is_in_cache + or not cache + or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type) + ): + logger.debug(f"Refreshing TRADES data for {pair}") + # fetch trades since latest _trades and + # store together with existing trades + try: + until = None + from_id = None + if is_in_cache: + from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"] + until = dt_ts() # now + + else: + until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000 + all_stored_ticks_df = data_handler.trades_load( + f"{pair}-cached", self.trading_mode + ) + + if not all_stored_ticks_df.empty: + if ( + all_stored_ticks_df.iloc[-1]["timestamp"] > first_candle_ms + and all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms + ): + # Use cache and populate further + last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"] + from_id = all_stored_ticks_df.iloc[-1]["id"] + # only use cached if it's closer than first_candle_ms + since_ms = ( + last_cached_ms + if last_cached_ms > first_candle_ms + else first_candle_ms + ) + else: + # Skip cache, it's too old + all_stored_ticks_df = DataFrame( + columns=DEFAULT_TRADES_COLUMNS + ["date"] + ) + + # from_id overrules with exchange set to id paginate + [_, new_ticks] = await self._async_get_trade_history( + pair, + since=since_ms if since_ms else first_candle_ms, + until=until, + from_id=from_id, + ) + + except Exception: + logger.exception(f"Refreshing TRADES data for {pair} failed") + return pairwt, None + + if new_ticks: + all_stored_ticks_list = all_stored_ticks_df[DEFAULT_TRADES_COLUMNS].values.tolist() + all_stored_ticks_list.extend(new_ticks) + trades_df = self._process_trades_df( + pair, + timeframe, + candle_type, + all_stored_ticks_list, + cache, + first_required_candle_date=first_candle_ms, + ) + data_handler.trades_store( + f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode + ) + return pairwt, trades_df + else: + logger.error(f"No new ticks for {pair}") + return pairwt, None + def refresh_latest_trades( self, pair_list: ListPairsWithTimeframes, @@ -2698,90 +2786,25 @@ class Exchange: self._config["datadir"], data_format=self._config["dataformat_trades"] ) logger.debug("Refreshing TRADES data for %d pairs", len(pair_list)) - since_ms = None results_df = {} - for pair, timeframe, candle_type in set(pair_list): - new_ticks: List = [] - all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"]) - first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type) - # refresh, if - # a. not in _trades - # b. no cache used - # c. need new data - is_in_cache = (pair, timeframe, candle_type) in self._trades - if ( - not is_in_cache - or not cache - or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type) - ): - logger.debug(f"Refreshing TRADES data for {pair}") - # fetch trades since latest _trades and - # store together with existing trades - try: - until = None - from_id = None - if is_in_cache: - from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"] - until = dt_ts() # now + coros = [] + for pairwt in set(pair_list): + coros.append(self._build_trades_dl_jobs(pairwt, data_handler, cache)) - else: - until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000 - all_stored_ticks_df = data_handler.trades_load( - f"{pair}-cached", self.trading_mode - ) + async def gather_stuff(coro): + return await asyncio.gather(*coro, return_exceptions=True) - if not all_stored_ticks_df.empty: - if ( - all_stored_ticks_df.iloc[-1]["timestamp"] > first_candle_ms - and all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms - ): - # Use cache and populate further - last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"] - from_id = all_stored_ticks_df.iloc[-1]["id"] - # only use cached if it's closer than first_candle_ms - since_ms = ( - last_cached_ms - if last_cached_ms > first_candle_ms - else first_candle_ms - ) - else: - # Skip cache, it's too old - all_stored_ticks_df = DataFrame( - columns=DEFAULT_TRADES_COLUMNS + ["date"] - ) + for input_coro in chunks(coros, 100): + with self._loop_lock: + results = self.loop.run_until_complete(gather_stuff(input_coro)) - # from_id overrules with exchange set to id paginate - [_, new_ticks] = self.get_historic_trades( - pair, - since=since_ms if since_ms else first_candle_ms, - until=until, - from_id=from_id, - ) - - except Exception: - logger.exception(f"Refreshing TRADES data for {pair} failed") + for res in results: + if isinstance(res, Exception): + logger.warning(f"Async code raised an exception: {repr(res)}") continue - - if new_ticks: - all_stored_ticks_list = all_stored_ticks_df[ - DEFAULT_TRADES_COLUMNS - ].values.tolist() - all_stored_ticks_list.extend(new_ticks) - trades_df = self._process_trades_df( - pair, - timeframe, - candle_type, - all_stored_ticks_list, - cache, - first_required_candle_date=first_candle_ms, - ) - results_df[(pair, timeframe, candle_type)] = trades_df - data_handler.trades_store( - f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode - ) - - else: - logger.error(f"No new ticks for {pair}") + pairwt, trades_df = res + if trades_df is not None: + results_df[pairwt] = trades_df return results_df