From 21bca95b6a5e8a04c7efad306076f3faf1ce841a Mon Sep 17 00:00:00 2001 From: Matthias Date: Sat, 16 Mar 2024 17:04:48 +0100 Subject: [PATCH] Group things logically in exchange class --- freqtrade/exchange/exchange.py | 326 +++++++++++++++++---------------- 1 file changed, 164 insertions(+), 162 deletions(-) diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index e2d806080..432d5e47d 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -2041,13 +2041,6 @@ class Exchange: data = sorted(data, key=lambda x: x[0]) return pair, timeframe, candle_type, data, self._ohlcv_partial_candle - def needed_candle_ms(self, timeframe: str, candle_type: CandleType): - one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( - timeframe, candle_type) - move_to = one_call * self.required_candle_call_count - now = timeframe_to_next_date(timeframe) - return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) - def _build_coroutine( self, pair: str, timeframe: str, candle_type: CandleType, since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: @@ -2136,37 +2129,6 @@ class Exchange: self._klines[(pair, timeframe, c_type)] = ohlcv_df return ohlcv_df - def _process_trades_df(self, - pair: str, - timeframe: str, - c_type: CandleType, - ticks: List[List], - cache: bool, - first_required_candle_date: Optional[int]) -> DataFrame: - # keeping parsed dataframe in cache - trades_df = trades_list_to_df(ticks, True) - # keeping last candle time as last refreshed time of the pair - if ticks and cache: - idx = -1 - # NOTE: // is floor: divides and rounds to nearest int - self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # noqa - if cache: - if (pair, timeframe, c_type) in self._trades: - old = self._trades[(pair, timeframe, c_type)] - # Reassign so we return the updated, combined df - combined_df = concat([old, trades_df], axis=0) - logger.debug(f"Clean duplicated ticks from Trades data {pair}") - trades_df = DataFrame(trades_df_remove_duplicates(combined_df), - columns=combined_df.columns) - # Age out old candles - if first_required_candle_date: - # slice of older dates - trades_df = trades_df[ - first_required_candle_date < trades_df['timestamp']] - trades_df = trades_df.reset_index(drop=True) - self._trades[(pair, timeframe, c_type)] = trades_df - return trades_df - def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, since_ms: Optional[int] = None, cache: bool = True, drop_incomplete: Optional[bool] = None @@ -2217,6 +2179,170 @@ class Exchange: return results_df + def refresh_ohlcv_with_cache( + self, + pairs: List[PairWithTimeframe], + since_ms: int + ) -> Dict[PairWithTimeframe, DataFrame]: + """ + Refresh ohlcv data for all pairs in needed_pairs if necessary. + Caches data with expiring per timeframe. + Should only be used for pairlists which need "on time" expirarion, and no longer cache. + """ + + timeframes = {p[1] for p in pairs} + for timeframe in timeframes: + if (timeframe, since_ms) not in self._expiring_candle_cache: + timeframe_in_sec = timeframe_to_seconds(timeframe) + # Initialise cache + self._expiring_candle_cache[(timeframe, since_ms)] = PeriodicCache( + ttl=timeframe_in_sec, maxsize=1000) + + # Get candles from cache + candles = { + c: self._expiring_candle_cache[(c[1], since_ms)].get(c, None) for c in pairs + if c in self._expiring_candle_cache[(c[1], since_ms)] + } + pairs_to_download = [p for p in pairs if p not in candles] + if pairs_to_download: + candles = self.refresh_latest_ohlcv( + pairs_to_download, since_ms=since_ms, cache=False + ) + for c, val in candles.items(): + self._expiring_candle_cache[(c[1], since_ms)][c] = val + return candles + + def _now_is_time_to_refresh(self, pair: str, timeframe: str, candle_type: CandleType) -> bool: + # Timeframe in seconds + interval_in_sec = timeframe_to_seconds(timeframe) + plr = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) + interval_in_sec + # current,active candle open date + now = int(timeframe_to_prev_date(timeframe).timestamp()) + return plr < now + + @retrier_async + async def _async_get_candle_history( + self, + pair: str, + timeframe: str, + candle_type: CandleType, + since_ms: Optional[int] = None, + ) -> OHLCVResponse: + """ + Asynchronously get candle history data using fetch_ohlcv + :param candle_type: '', mark, index, premiumIndex, or funding_rate + returns tuple: (pair, timeframe, ohlcv_list) + """ + try: + # Fetch OHLCV asynchronously + s = '(' + dt_from_ts(since_ms).isoformat() + ') ' if since_ms is not None else '' + logger.debug( + "Fetching pair %s, %s, interval %s, since %s %s...", + pair, candle_type, timeframe, since_ms, s + ) + params = deepcopy(self._ft_has.get('ohlcv_params', {})) + candle_limit = self.ohlcv_candle_limit( + timeframe, candle_type=candle_type, since_ms=since_ms) + + if candle_type and candle_type != CandleType.SPOT: + params.update({'price': candle_type.value}) + if candle_type != CandleType.FUNDING_RATE: + data = await self._api_async.fetch_ohlcv( + pair, timeframe=timeframe, since=since_ms, + limit=candle_limit, params=params) + else: + # Funding rate + data = await self._fetch_funding_rate_history( + pair=pair, + timeframe=timeframe, + limit=candle_limit, + since_ms=since_ms, + ) + # Some exchanges sort OHLCV in ASC order and others in DESC. + # Ex: Bittrex returns the list of OHLCV in ASC order (oldest first, newest last) + # while GDAX returns the list of OHLCV in DESC order (newest first, oldest last) + # Only sort if necessary to save computing time + try: + if data and data[0][0] > data[-1][0]: + data = sorted(data, key=lambda x: x[0]) + except IndexError: + logger.exception("Error loading %s. Result was %s.", pair, data) + return pair, timeframe, candle_type, [], self._ohlcv_partial_candle + logger.debug("Done fetching pair %s, %s interval %s...", pair, candle_type, timeframe) + return pair, timeframe, candle_type, data, self._ohlcv_partial_candle + + except ccxt.NotSupported as e: + raise OperationalException( + f'Exchange {self._api.name} does not support fetching historical ' + f'candle (OHLCV) data. Message: {e}') from e + except ccxt.DDoSProtection as e: + raise DDosProtection(e) from e + except (ccxt.NetworkError, ccxt.ExchangeError) as e: + raise TemporaryError(f'Could not fetch historical candle (OHLCV) data ' + f'for pair {pair} due to {e.__class__.__name__}. ' + f'Message: {e}') from e + except ccxt.BaseError as e: + raise OperationalException(f'Could not fetch historical candle (OHLCV) data ' + f'for pair {pair}. Message: {e}') from e + + async def _fetch_funding_rate_history( + self, + pair: str, + timeframe: str, + limit: int, + since_ms: Optional[int] = None, + ) -> List[List]: + """ + Fetch funding rate history - used to selectively override this by subclasses. + """ + # Funding rate + data = await self._api_async.fetch_funding_rate_history( + pair, since=since_ms, + limit=limit) + # Convert funding rate to candle pattern + data = [[x['timestamp'], x['fundingRate'], 0, 0, 0, 0] for x in data] + return data + + # fetch Trade data stuff + + def needed_candle_ms(self, timeframe: str, candle_type: CandleType): + one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( + timeframe, candle_type) + move_to = one_call * self.required_candle_call_count + now = timeframe_to_next_date(timeframe) + return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) + + def _process_trades_df(self, + pair: str, + timeframe: str, + c_type: CandleType, + ticks: List[List], + cache: bool, + first_required_candle_date: Optional[int]) -> DataFrame: + # keeping parsed dataframe in cache + trades_df = trades_list_to_df(ticks, True) + # keeping last candle time as last refreshed time of the pair + if ticks and cache: + idx = -1 + # NOTE: // is floor: divides and rounds to nearest int + self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # noqa + if cache: + if (pair, timeframe, c_type) in self._trades: + old = self._trades[(pair, timeframe, c_type)] + # Reassign so we return the updated, combined df + combined_df = concat([old, trades_df], axis=0) + logger.debug(f"Clean duplicated ticks from Trades data {pair}") + trades_df = DataFrame(trades_df_remove_duplicates(combined_df), + columns=combined_df.columns) + # Age out old candles + if first_required_candle_date: + # slice of older dates + trades_df = trades_df[ + first_required_candle_date < trades_df['timestamp']] + trades_df = trades_df.reset_index(drop=True) + self._trades[(pair, timeframe, c_type)] = trades_df + return trades_df + def refresh_latest_trades(self, pair_list: ListPairsWithTimeframes, data_handler: Any, # using IDataHandler ends with circular import @@ -2305,47 +2431,6 @@ class Exchange: return results_df - def refresh_ohlcv_with_cache( - self, - pairs: List[PairWithTimeframe], - since_ms: int - ) -> Dict[PairWithTimeframe, DataFrame]: - """ - Refresh ohlcv data for all pairs in needed_pairs if necessary. - Caches data with expiring per timeframe. - Should only be used for pairlists which need "on time" expirarion, and no longer cache. - """ - - timeframes = {p[1] for p in pairs} - for timeframe in timeframes: - if (timeframe, since_ms) not in self._expiring_candle_cache: - timeframe_in_sec = timeframe_to_seconds(timeframe) - # Initialise cache - self._expiring_candle_cache[(timeframe, since_ms)] = PeriodicCache( - ttl=timeframe_in_sec, maxsize=1000) - - # Get candles from cache - candles = { - c: self._expiring_candle_cache[(c[1], since_ms)].get(c, None) for c in pairs - if c in self._expiring_candle_cache[(c[1], since_ms)] - } - pairs_to_download = [p for p in pairs if p not in candles] - if pairs_to_download: - candles = self.refresh_latest_ohlcv( - pairs_to_download, since_ms=since_ms, cache=False - ) - for c, val in candles.items(): - self._expiring_candle_cache[(c[1], since_ms)][c] = val - return candles - - def _now_is_time_to_refresh(self, pair: str, timeframe: str, candle_type: CandleType) -> bool: - # Timeframe in seconds - interval_in_sec = timeframe_to_seconds(timeframe) - plr = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) + interval_in_sec - # current,active candle open date - now = int(timeframe_to_prev_date(timeframe).timestamp()) - return plr < now - def _now_is_time_to_refresh_trades(self, pair: str, timeframe: str, @@ -2358,89 +2443,6 @@ class Exchange: now = int(timeframe_to_prev_date(timeframe).timestamp()) return plr < now - @retrier_async - async def _async_get_candle_history( - self, - pair: str, - timeframe: str, - candle_type: CandleType, - since_ms: Optional[int] = None, - ) -> OHLCVResponse: - """ - Asynchronously get candle history data using fetch_ohlcv - :param candle_type: '', mark, index, premiumIndex, or funding_rate - returns tuple: (pair, timeframe, ohlcv_list) - """ - try: - # Fetch OHLCV asynchronously - s = '(' + dt_from_ts(since_ms).isoformat() + ') ' if since_ms is not None else '' - logger.debug( - "Fetching pair %s, %s, interval %s, since %s %s...", - pair, candle_type, timeframe, since_ms, s - ) - params = deepcopy(self._ft_has.get('ohlcv_params', {})) - candle_limit = self.ohlcv_candle_limit( - timeframe, candle_type=candle_type, since_ms=since_ms) - - if candle_type and candle_type != CandleType.SPOT: - params.update({'price': candle_type.value}) - if candle_type != CandleType.FUNDING_RATE: - data = await self._api_async.fetch_ohlcv( - pair, timeframe=timeframe, since=since_ms, - limit=candle_limit, params=params) - else: - # Funding rate - data = await self._fetch_funding_rate_history( - pair=pair, - timeframe=timeframe, - limit=candle_limit, - since_ms=since_ms, - ) - # Some exchanges sort OHLCV in ASC order and others in DESC. - # Ex: Bittrex returns the list of OHLCV in ASC order (oldest first, newest last) - # while GDAX returns the list of OHLCV in DESC order (newest first, oldest last) - # Only sort if necessary to save computing time - try: - if data and data[0][0] > data[-1][0]: - data = sorted(data, key=lambda x: x[0]) - except IndexError: - logger.exception("Error loading %s. Result was %s.", pair, data) - return pair, timeframe, candle_type, [], self._ohlcv_partial_candle - logger.debug("Done fetching pair %s, %s interval %s...", pair, candle_type, timeframe) - return pair, timeframe, candle_type, data, self._ohlcv_partial_candle - - except ccxt.NotSupported as e: - raise OperationalException( - f'Exchange {self._api.name} does not support fetching historical ' - f'candle (OHLCV) data. Message: {e}') from e - except ccxt.DDoSProtection as e: - raise DDosProtection(e) from e - except (ccxt.NetworkError, ccxt.ExchangeError) as e: - raise TemporaryError(f'Could not fetch historical candle (OHLCV) data ' - f'for pair {pair} due to {e.__class__.__name__}. ' - f'Message: {e}') from e - except ccxt.BaseError as e: - raise OperationalException(f'Could not fetch historical candle (OHLCV) data ' - f'for pair {pair}. Message: {e}') from e - - async def _fetch_funding_rate_history( - self, - pair: str, - timeframe: str, - limit: int, - since_ms: Optional[int] = None, - ) -> List[List]: - """ - Fetch funding rate history - used to selectively override this by subclasses. - """ - # Funding rate - data = await self._api_async.fetch_funding_rate_history( - pair, since=since_ms, - limit=limit) - # Convert funding rate to candle pattern - data = [[x['timestamp'], x['fundingRate'], 0, 0, 0, 0] for x in data] - return data - # Fetch historic trades @retrier_async