From 5eb4ad2208479bbb839d59caa9ad7024911b3d58 Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 12 May 2024 16:24:43 +0200 Subject: [PATCH] Ruff format edge --- freqtrade/edge/edge_positioning.py | 295 ++++++++++++++++------------- 1 file changed, 164 insertions(+), 131 deletions(-) diff --git a/freqtrade/edge/edge_positioning.py b/freqtrade/edge/edge_positioning.py index d863be03b..b6cc1a7df 100644 --- a/freqtrade/edge/edge_positioning.py +++ b/freqtrade/edge/edge_positioning.py @@ -1,5 +1,6 @@ # pragma pylint: disable=W0603 -""" Edge positioning package """ +"""Edge positioning package""" + import logging from collections import defaultdict from copy import deepcopy @@ -46,48 +47,49 @@ class Edge: _cached_pairs: Dict[str, Any] = {} # Keeps a list of pairs def __init__(self, config: Config, exchange, strategy) -> None: - self.config = config self.exchange = exchange self.strategy: IStrategy = strategy - self.edge_config = self.config.get('edge', {}) + self.edge_config = self.config.get("edge", {}) self._cached_pairs: Dict[str, Any] = {} # Keeps a list of pairs self._final_pairs: list = [] # checking max_open_trades. it should be -1 as with Edge # the number of trades is determined by position size - if self.config['max_open_trades'] != float('inf'): - logger.critical('max_open_trades should be -1 in config !') + if self.config["max_open_trades"] != float("inf"): + logger.critical("max_open_trades should be -1 in config !") - if self.config['stake_amount'] != UNLIMITED_STAKE_AMOUNT: - raise OperationalException('Edge works only with unlimited stake amount') + if self.config["stake_amount"] != UNLIMITED_STAKE_AMOUNT: + raise OperationalException("Edge works only with unlimited stake amount") - self._capital_ratio: float = self.config['tradable_balance_ratio'] - self._allowed_risk: float = self.edge_config.get('allowed_risk') - self._since_number_of_days: int = self.edge_config.get('calculate_since_number_of_days', 14) + self._capital_ratio: float = self.config["tradable_balance_ratio"] + self._allowed_risk: float = self.edge_config.get("allowed_risk") + self._since_number_of_days: int = self.edge_config.get("calculate_since_number_of_days", 14) self._last_updated: int = 0 # Timestamp of pairs last updated time self._refresh_pairs = True - self._stoploss_range_min = float(self.edge_config.get('stoploss_range_min', -0.01)) - self._stoploss_range_max = float(self.edge_config.get('stoploss_range_max', -0.05)) - self._stoploss_range_step = float(self.edge_config.get('stoploss_range_step', -0.001)) + self._stoploss_range_min = float(self.edge_config.get("stoploss_range_min", -0.01)) + self._stoploss_range_max = float(self.edge_config.get("stoploss_range_max", -0.05)) + self._stoploss_range_step = float(self.edge_config.get("stoploss_range_step", -0.001)) # calculating stoploss range self._stoploss_range = np.arange( - self._stoploss_range_min, - self._stoploss_range_max, - self._stoploss_range_step + self._stoploss_range_min, self._stoploss_range_max, self._stoploss_range_step ) self._timerange: TimeRange = TimeRange.parse_timerange( - f"{(dt_now() - timedelta(days=self._since_number_of_days)).strftime('%Y%m%d')}-") - if config.get('fee'): - self.fee = config['fee'] + f"{(dt_now() - timedelta(days=self._since_number_of_days)).strftime('%Y%m%d')}-" + ) + if config.get("fee"): + self.fee = config["fee"] else: try: - self.fee = self.exchange.get_fee(symbol=expand_pairlist( - self.config['exchange']['pair_whitelist'], list(self.exchange.markets))[0]) + self.fee = self.exchange.get_fee( + symbol=expand_pairlist( + self.config["exchange"]["pair_whitelist"], list(self.exchange.markets) + )[0] + ) except IndexError: self.fee = None @@ -95,28 +97,30 @@ class Edge: if self.fee is None and pairs: self.fee = self.exchange.get_fee(pairs[0]) - heartbeat = self.edge_config.get('process_throttle_secs') + heartbeat = self.edge_config.get("process_throttle_secs") if (self._last_updated > 0) and ( - self._last_updated + heartbeat > int(dt_now().timestamp())): + self._last_updated + heartbeat > int(dt_now().timestamp()) + ): return False data: Dict[str, Any] = {} - logger.info('Using stake_currency: %s ...', self.config['stake_currency']) - logger.info('Using local backtesting data (using whitelist in given config) ...') + logger.info("Using stake_currency: %s ...", self.config["stake_currency"]) + logger.info("Using local backtesting data (using whitelist in given config) ...") if self._refresh_pairs: timerange_startup = deepcopy(self._timerange) - timerange_startup.subtract_start(timeframe_to_seconds( - self.strategy.timeframe) * self.strategy.startup_candle_count) + timerange_startup.subtract_start( + timeframe_to_seconds(self.strategy.timeframe) * self.strategy.startup_candle_count + ) refresh_data( - datadir=self.config['datadir'], + datadir=self.config["datadir"], pairs=pairs, exchange=self.exchange, timeframe=self.strategy.timeframe, timerange=timerange_startup, - data_format=self.config['dataformat_ohlcv'], - candle_type=self.config.get('candle_type_def', CandleType.SPOT), + data_format=self.config["dataformat_ohlcv"], + candle_type=self.config.get("candle_type_def", CandleType.SPOT), ) # Download informative pairs too res = defaultdict(list) @@ -124,26 +128,27 @@ class Edge: res[timeframe].append(pair) for timeframe, inf_pairs in res.items(): timerange_startup = deepcopy(self._timerange) - timerange_startup.subtract_start(timeframe_to_seconds( - timeframe) * self.strategy.startup_candle_count) + timerange_startup.subtract_start( + timeframe_to_seconds(timeframe) * self.strategy.startup_candle_count + ) refresh_data( - datadir=self.config['datadir'], + datadir=self.config["datadir"], pairs=inf_pairs, exchange=self.exchange, timeframe=timeframe, timerange=timerange_startup, - data_format=self.config['dataformat_ohlcv'], - candle_type=self.config.get('candle_type_def', CandleType.SPOT), + data_format=self.config["dataformat_ohlcv"], + candle_type=self.config.get("candle_type_def", CandleType.SPOT), ) data = load_data( - datadir=self.config['datadir'], + datadir=self.config["datadir"], pairs=pairs, timeframe=self.strategy.timeframe, timerange=self._timerange, startup_candles=self.strategy.startup_candle_count, - data_format=self.config['dataformat_ohlcv'], - candle_type=self.config.get('candle_type_def', CandleType.SPOT), + data_format=self.config["dataformat_ohlcv"], + candle_type=self.config.get("candle_type_def", CandleType.SPOT), ) if not data: @@ -152,27 +157,29 @@ class Edge: logger.critical("No data found. Edge is stopped ...") return False # Fake run-mode to Edge - prior_rm = self.config['runmode'] - self.config['runmode'] = RunMode.EDGE + prior_rm = self.config["runmode"] + self.config["runmode"] = RunMode.EDGE preprocessed = self.strategy.advise_all_indicators(data) - self.config['runmode'] = prior_rm + self.config["runmode"] = prior_rm # Print timeframe min_date, max_date = get_timerange(preprocessed) - logger.info(f'Measuring data from {min_date.strftime(DATETIME_PRINT_FORMAT)} ' - f'up to {max_date.strftime(DATETIME_PRINT_FORMAT)} ' - f'({(max_date - min_date).days} days)..') + logger.info( + f"Measuring data from {min_date.strftime(DATETIME_PRINT_FORMAT)} " + f"up to {max_date.strftime(DATETIME_PRINT_FORMAT)} " + f"({(max_date - min_date).days} days).." + ) # TODO: Should edge support shorts? needs to be investigated further # * (add enter_short exit_short) - headers = ['date', 'open', 'high', 'low', 'close', 'enter_long', 'exit_long'] + headers = ["date", "open", "high", "low", "close", "enter_long", "exit_long"] trades: list = [] for pair, pair_data in preprocessed.items(): # Sorting dataframe by date and reset index - pair_data = pair_data.sort_values(by=['date']) + pair_data = pair_data.sort_values(by=["date"]) pair_data = pair_data.reset_index(drop=True) - df_analyzed = self.strategy.ft_advise_signals(pair_data, {'pair': pair})[headers].copy() + df_analyzed = self.strategy.ft_advise_signals(pair_data, {"pair": pair})[headers].copy() trades += self._find_trades_for_stoploss_range(df_analyzed, pair, self._stoploss_range) @@ -188,8 +195,9 @@ class Edge: return True - def stake_amount(self, pair: str, free_capital: float, - total_capital: float, capital_in_trade: float) -> float: + def stake_amount( + self, pair: str, free_capital: float, total_capital: float, capital_in_trade: float + ) -> float: stoploss = self.get_stoploss(pair) available_capital = (total_capital + capital_in_trade) * self._capital_ratio allowed_capital_at_risk = available_capital * self._allowed_risk @@ -198,14 +206,18 @@ class Edge: position_size = min(min(max_position_size, free_capital), available_capital) if pair in self._cached_pairs: logger.info( - 'winrate: %s, expectancy: %s, position size: %s, pair: %s,' - ' capital in trade: %s, free capital: %s, total capital: %s,' - ' stoploss: %s, available capital: %s.', + "winrate: %s, expectancy: %s, position size: %s, pair: %s," + " capital in trade: %s, free capital: %s, total capital: %s," + " stoploss: %s, available capital: %s.", self._cached_pairs[pair].winrate, self._cached_pairs[pair].expectancy, - position_size, pair, - capital_in_trade, free_capital, total_capital, - stoploss, available_capital + position_size, + pair, + capital_in_trade, + free_capital, + total_capital, + stoploss, + available_capital, ) return round(position_size, 15) @@ -213,8 +225,10 @@ class Edge: if pair in self._cached_pairs: return self._cached_pairs[pair].stoploss else: - logger.warning(f'Tried to access stoploss of non-existing pair {pair}, ' - 'strategy stoploss is returned instead.') + logger.warning( + f"Tried to access stoploss of non-existing pair {pair}, " + "strategy stoploss is returned instead." + ) return self.strategy.stoploss def adjust(self, pairs: List[str]) -> list: @@ -224,8 +238,8 @@ class Edge: final = [] for pair, info in self._cached_pairs.items(): if ( - info.expectancy > float(self.edge_config.get('minimum_expectancy', 0.2)) - and info.winrate > float(self.edge_config.get('minimum_winrate', 0.60)) + info.expectancy > float(self.edge_config.get("minimum_expectancy", 0.2)) + and info.winrate > float(self.edge_config.get("minimum_winrate", 0.60)) and pair in pairs ): final.append(pair) @@ -234,14 +248,14 @@ class Edge: self._final_pairs = final if self._final_pairs: logger.info( - 'Minimum expectancy and minimum winrate are met only for %s,' - ' so other pairs are filtered out.', - self._final_pairs + "Minimum expectancy and minimum winrate are met only for %s," + " so other pairs are filtered out.", + self._final_pairs, ) else: logger.info( - 'Edge removed all pairs as no pair with minimum expectancy ' - 'and minimum winrate was found !' + "Edge removed all pairs as no pair with minimum expectancy " + "and minimum winrate was found !" ) return self._final_pairs @@ -252,14 +266,17 @@ class Edge: """ final = [] for pair, info in self._cached_pairs.items(): - if (info.expectancy > float(self.edge_config.get('minimum_expectancy', 0.2)) and - info.winrate > float(self.edge_config.get('minimum_winrate', 0.60))): - final.append({ - 'Pair': pair, - 'Winrate': info.winrate, - 'Expectancy': info.expectancy, - 'Stoploss': info.stoploss, - }) + if info.expectancy > float( + self.edge_config.get("minimum_expectancy", 0.2) + ) and info.winrate > float(self.edge_config.get("minimum_winrate", 0.60)): + final.append( + { + "Pair": pair, + "Winrate": info.winrate, + "Expectancy": info.expectancy, + "Stoploss": info.stoploss, + } + ) return final def _fill_calculable_fields(self, result: DataFrame) -> DataFrame: @@ -279,28 +296,29 @@ class Edge: # All returned values are relative, they are defined as ratios. stake = 0.015 - result['trade_duration'] = result['close_date'] - result['open_date'] + result["trade_duration"] = result["close_date"] - result["open_date"] - result['trade_duration'] = result['trade_duration'].map( - lambda x: int(x.total_seconds() / 60)) + result["trade_duration"] = result["trade_duration"].map( + lambda x: int(x.total_seconds() / 60) + ) # Spends, Takes, Profit, Absolute Profit # Buy Price - result['buy_vol'] = stake / result['open_rate'] # How many target are we buying - result['buy_fee'] = stake * self.fee - result['buy_spend'] = stake + result['buy_fee'] # How much we're spending + result["buy_vol"] = stake / result["open_rate"] # How many target are we buying + result["buy_fee"] = stake * self.fee + result["buy_spend"] = stake + result["buy_fee"] # How much we're spending # Sell price - result['sell_sum'] = result['buy_vol'] * result['close_rate'] - result['sell_fee'] = result['sell_sum'] * self.fee - result['sell_take'] = result['sell_sum'] - result['sell_fee'] + result["sell_sum"] = result["buy_vol"] * result["close_rate"] + result["sell_fee"] = result["sell_sum"] * self.fee + result["sell_take"] = result["sell_sum"] - result["sell_fee"] # profit_ratio - result['profit_ratio'] = (result['sell_take'] - result['buy_spend']) / result['buy_spend'] + result["profit_ratio"] = (result["sell_take"] - result["buy_spend"]) / result["buy_spend"] # Absolute profit - result['profit_abs'] = result['sell_take'] - result['buy_spend'] + result["profit_abs"] = result["sell_take"] - result["buy_spend"] return result @@ -310,8 +328,8 @@ class Edge: The calculation will be done per pair and per strategy. """ # Removing pairs having less than min_trades_number - min_trades_number = self.edge_config.get('min_trade_number', 10) - results = results.groupby(['pair', 'stoploss']).filter(lambda x: len(x) > min_trades_number) + min_trades_number = self.edge_config.get("min_trade_number", 10) + results = results.groupby(["pair", "stoploss"]).filter(lambda x: len(x) > min_trades_number) ################################### # Removing outliers (Only Pumps) from the dataset @@ -319,13 +337,15 @@ class Edge: # Then every value more than (standard deviation + 2*average) is out (pump) # # Removing Pumps - if self.edge_config.get('remove_pumps', False): - results = results[results['profit_abs'] < 2 * results['profit_abs'].std() - + results['profit_abs'].mean()] + if self.edge_config.get("remove_pumps", False): + results = results[ + results["profit_abs"] + < 2 * results["profit_abs"].std() + results["profit_abs"].mean() + ] ########################################################################## # Removing trades having a duration more than X minutes (set in config) - max_trade_duration = self.edge_config.get('max_trade_duration_minute', 1440) + max_trade_duration = self.edge_config.get("max_trade_duration_minute", 1440) results = results[results.trade_duration < max_trade_duration] ####################################################################### @@ -333,44 +353,54 @@ class Edge: return {} groupby_aggregator = { - 'profit_abs': [ - ('nb_trades', 'count'), # number of all trades - ('profit_sum', lambda x: x[x > 0].sum()), # cumulative profit of all winning trades - ('loss_sum', lambda x: abs(x[x < 0].sum())), # cumulative loss of all losing trades - ('nb_win_trades', lambda x: x[x > 0].count()) # number of winning trades + "profit_abs": [ + ("nb_trades", "count"), # number of all trades + ("profit_sum", lambda x: x[x > 0].sum()), # cumulative profit of all winning trades + ("loss_sum", lambda x: abs(x[x < 0].sum())), # cumulative loss of all losing trades + ("nb_win_trades", lambda x: x[x > 0].count()), # number of winning trades ], - 'trade_duration': [('avg_trade_duration', 'mean')] + "trade_duration": [("avg_trade_duration", "mean")], } # Group by (pair and stoploss) by applying above aggregator - df = results.groupby(['pair', 'stoploss'])[['profit_abs', 'trade_duration']].agg( - groupby_aggregator).reset_index(col_level=1) + df = ( + results.groupby(["pair", "stoploss"])[["profit_abs", "trade_duration"]] + .agg(groupby_aggregator) + .reset_index(col_level=1) + ) # Dropping level 0 as we don't need it df.columns = df.columns.droplevel(0) # Calculating number of losing trades, average win and average loss - df['nb_loss_trades'] = df['nb_trades'] - df['nb_win_trades'] - df['average_win'] = np.where(df['nb_win_trades'] == 0, 0.0, - df['profit_sum'] / df['nb_win_trades']) - df['average_loss'] = np.where(df['nb_loss_trades'] == 0, 0.0, - df['loss_sum'] / df['nb_loss_trades']) + df["nb_loss_trades"] = df["nb_trades"] - df["nb_win_trades"] + df["average_win"] = np.where( + df["nb_win_trades"] == 0, 0.0, df["profit_sum"] / df["nb_win_trades"] + ) + df["average_loss"] = np.where( + df["nb_loss_trades"] == 0, 0.0, df["loss_sum"] / df["nb_loss_trades"] + ) # Win rate = number of profitable trades / number of trades - df['winrate'] = df['nb_win_trades'] / df['nb_trades'] + df["winrate"] = df["nb_win_trades"] / df["nb_trades"] # risk_reward_ratio = average win / average loss - df['risk_reward_ratio'] = df['average_win'] / df['average_loss'] + df["risk_reward_ratio"] = df["average_win"] / df["average_loss"] # required_risk_reward = (1 / winrate) - 1 - df['required_risk_reward'] = (1 / df['winrate']) - 1 + df["required_risk_reward"] = (1 / df["winrate"]) - 1 # expectancy = (risk_reward_ratio * winrate) - (lossrate) - df['expectancy'] = (df['risk_reward_ratio'] * df['winrate']) - (1 - df['winrate']) + df["expectancy"] = (df["risk_reward_ratio"] * df["winrate"]) - (1 - df["winrate"]) # sort by expectancy and stoploss - df = df.sort_values(by=['expectancy', 'stoploss'], ascending=False).groupby( - 'pair').first().sort_values(by=['expectancy'], ascending=False).reset_index() + df = ( + df.sort_values(by=["expectancy", "stoploss"], ascending=False) + .groupby("pair") + .first() + .sort_values(by=["expectancy"], ascending=False) + .reset_index() + ) final = {} for x in df.itertuples(): @@ -381,17 +411,17 @@ class Edge: x.required_risk_reward, x.expectancy, x.nb_trades, - x.avg_trade_duration + x.avg_trade_duration, ) # Returning a list of pairs in order of "expectancy" return final def _find_trades_for_stoploss_range(self, df, pair: str, stoploss_range) -> list: - buy_column = df['enter_long'].values - sell_column = df['exit_long'].values - date_column = df['date'].values - ohlc_columns = df[['open', 'high', 'low', 'close']].values + buy_column = df["enter_long"].values + sell_column = df["exit_long"].values + date_column = df["date"].values + ohlc_columns = df[["open", "high", "low", "close"]].values result: list = [] for stoploss in stoploss_range: @@ -401,8 +431,9 @@ class Edge: return result - def _detect_next_stop_or_sell_point(self, buy_column, sell_column, date_column, - ohlc_columns, stoploss, pair: str): + def _detect_next_stop_or_sell_point( + self, buy_column, sell_column, date_column, ohlc_columns, stoploss, pair: str + ): """ Iterate through ohlc_columns in order to find the next trade Next trade opens from the first buy signal noticed to @@ -429,27 +460,28 @@ class Edge: open_trade_index += 1 open_price = ohlc_columns[open_trade_index, 0] - stop_price = (open_price * (stoploss + 1)) + stop_price = open_price * (stoploss + 1) # Searching for the index where stoploss is hit stop_index = utf1st.find_1st( - ohlc_columns[open_trade_index:, 2], stop_price, utf1st.cmp_smaller) + ohlc_columns[open_trade_index:, 2], stop_price, utf1st.cmp_smaller + ) # If we don't find it then we assume stop_index will be far in future (infinite number) if stop_index == -1: - stop_index = float('inf') + stop_index = float("inf") # Searching for the index where sell is hit sell_index = utf1st.find_1st(sell_column[open_trade_index:], 1, utf1st.cmp_equal) # If we don't find it then we assume sell_index will be far in future (infinite number) if sell_index == -1: - sell_index = float('inf') + sell_index = float("inf") # Check if we don't find any stop or sell point (in that case trade remains open) # It is not interesting for Edge to consider it so we simply ignore the trade # And stop iterating there is no more entry - if stop_index == sell_index == float('inf'): + if stop_index == sell_index == float("inf"): break if stop_index <= sell_index: @@ -467,17 +499,18 @@ class Edge: exit_type = ExitType.EXIT_SIGNAL exit_price = ohlc_columns[exit_index, 0] - trade = {'pair': pair, - 'stoploss': stoploss, - 'profit_ratio': '', - 'profit_abs': '', - 'open_date': date_column[open_trade_index], - 'close_date': date_column[exit_index], - 'trade_duration': '', - 'open_rate': round(open_price, 15), - 'close_rate': round(exit_price, 15), - 'exit_type': exit_type - } + trade = { + "pair": pair, + "stoploss": stoploss, + "profit_ratio": "", + "profit_abs": "", + "open_date": date_column[open_trade_index], + "close_date": date_column[exit_index], + "trade_duration": "", + "open_rate": round(open_price, 15), + "close_rate": round(exit_price, 15), + "exit_type": exit_type, + } result.append(trade)