From fea1653e31410e055f8f27865425e4a7df6db8fb Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 12 May 2024 17:41:55 +0200 Subject: [PATCH] ruff format: freqtrade.data --- freqtrade/data/__init__.py | 4 +- freqtrade/data/btanalysis.py | 214 ++++--- freqtrade/data/converter/__init__.py | 30 +- freqtrade/data/converter/converter.py | 193 +++--- freqtrade/data/converter/trade_converter.py | 49 +- .../data/converter/trade_converter_kraken.py | 33 +- freqtrade/data/dataprovider.py | 184 +++--- freqtrade/data/entryexitanalysis.py | 301 +++++----- freqtrade/data/history/__init__.py | 1 + .../datahandlers/featherdatahandler.py | 42 +- .../history/datahandlers/hdf5datahandler.py | 68 ++- .../data/history/datahandlers/idatahandler.py | 207 ++++--- .../history/datahandlers/jsondatahandler.py | 44 +- .../datahandlers/parquetdatahandler.py | 35 +- freqtrade/data/history/history_utils.py | 550 +++++++++++------- freqtrade/data/metrics.py | 157 ++--- 16 files changed, 1195 insertions(+), 917 deletions(-) diff --git a/freqtrade/data/__init__.py b/freqtrade/data/__init__.py index 0e7eea0d0..f716abfc5 100644 --- a/freqtrade/data/__init__.py +++ b/freqtrade/data/__init__.py @@ -3,6 +3,4 @@ Module to handle data operations for freqtrade """ # limit what's imported when using `from freqtrade.data import *` -__all__ = [ - 'converter' -] +__all__ = ["converter"] diff --git a/freqtrade/data/btanalysis.py b/freqtrade/data/btanalysis.py index 07417b27f..eef415879 100644 --- a/freqtrade/data/btanalysis.py +++ b/freqtrade/data/btanalysis.py @@ -1,6 +1,7 @@ """ Helpers when analyzing backtest data """ + import logging from copy import copy from datetime import datetime, timezone @@ -21,14 +22,35 @@ from freqtrade.types import BacktestHistoryEntryType, BacktestResultType logger = logging.getLogger(__name__) # Newest format -BT_DATA_COLUMNS = ['pair', 'stake_amount', 'max_stake_amount', 'amount', - 'open_date', 'close_date', 'open_rate', 'close_rate', - 'fee_open', 'fee_close', 'trade_duration', - 'profit_ratio', 'profit_abs', 'exit_reason', - 'initial_stop_loss_abs', 'initial_stop_loss_ratio', 'stop_loss_abs', - 'stop_loss_ratio', 'min_rate', 'max_rate', 'is_open', 'enter_tag', - 'leverage', 'is_short', 'open_timestamp', 'close_timestamp', 'orders' - ] +BT_DATA_COLUMNS = [ + "pair", + "stake_amount", + "max_stake_amount", + "amount", + "open_date", + "close_date", + "open_rate", + "close_rate", + "fee_open", + "fee_close", + "trade_duration", + "profit_ratio", + "profit_abs", + "exit_reason", + "initial_stop_loss_abs", + "initial_stop_loss_ratio", + "stop_loss_abs", + "stop_loss_ratio", + "min_rate", + "max_rate", + "is_open", + "enter_tag", + "leverage", + "is_short", + "open_timestamp", + "close_timestamp", + "orders", +] def get_latest_optimize_filename(directory: Union[Path, str], variant: str) -> str: @@ -50,15 +72,16 @@ def get_latest_optimize_filename(directory: Union[Path, str], variant: str) -> s if not filename.is_file(): raise ValueError( - f"Directory '{directory}' does not seem to contain backtest statistics yet.") + f"Directory '{directory}' does not seem to contain backtest statistics yet." + ) with filename.open() as file: data = json_load(file) - if f'latest_{variant}' not in data: + if f"latest_{variant}" not in data: raise ValueError(f"Invalid '{LAST_BT_RESULT_FN}' format.") - return data[f'latest_{variant}'] + return data[f"latest_{variant}"] def get_latest_backtest_filename(directory: Union[Path, str]) -> str: @@ -71,7 +94,7 @@ def get_latest_backtest_filename(directory: Union[Path, str]) -> str: * `directory/.last_result.json` does not exist * `directory/.last_result.json` has the wrong content """ - return get_latest_optimize_filename(directory, 'backtest') + return get_latest_optimize_filename(directory, "backtest") def get_latest_hyperopt_filename(directory: Union[Path, str]) -> str: @@ -85,14 +108,15 @@ def get_latest_hyperopt_filename(directory: Union[Path, str]) -> str: * `directory/.last_result.json` has the wrong content """ try: - return get_latest_optimize_filename(directory, 'hyperopt') + return get_latest_optimize_filename(directory, "hyperopt") except ValueError: # Return default (legacy) pickle filename - return 'hyperopt_results.pickle' + return "hyperopt_results.pickle" def get_latest_hyperopt_file( - directory: Union[Path, str], predef_filename: Optional[str] = None) -> Path: + directory: Union[Path, str], predef_filename: Optional[str] = None +) -> Path: """ Get latest hyperopt export based on '.last_result.json'. :param directory: Directory to search for last result @@ -107,7 +131,8 @@ def get_latest_hyperopt_file( if predef_filename: if Path(predef_filename).is_absolute(): raise ConfigurationError( - "--hyperopt-filename expects only the filename, not an absolute path.") + "--hyperopt-filename expects only the filename, not an absolute path." + ) return directory / predef_filename return directory / get_latest_hyperopt_filename(directory) @@ -126,7 +151,7 @@ def load_backtest_metadata(filename: Union[Path, str]) -> Dict[str, Any]: except FileNotFoundError: return {} except Exception as e: - raise OperationalException('Unexpected error while loading backtest metadata.') from e + raise OperationalException("Unexpected error while loading backtest metadata.") from e def load_backtest_stats(filename: Union[Path, str]) -> BacktestResultType: @@ -147,7 +172,7 @@ def load_backtest_stats(filename: Union[Path, str]) -> BacktestResultType: # Legacy list format does not contain metadata. if isinstance(data, dict): - data['metadata'] = load_backtest_metadata(filename) + data["metadata"] = load_backtest_metadata(filename) return data @@ -159,38 +184,39 @@ def load_and_merge_backtest_result(strategy_name: str, filename: Path, results: :param results: dict to merge the result to. """ bt_data = load_backtest_stats(filename) - k: Literal['metadata', 'strategy'] - for k in ('metadata', 'strategy'): # type: ignore + k: Literal["metadata", "strategy"] + for k in ("metadata", "strategy"): # type: ignore results[k][strategy_name] = bt_data[k][strategy_name] - results['metadata'][strategy_name]['filename'] = filename.stem - comparison = bt_data['strategy_comparison'] + results["metadata"][strategy_name]["filename"] = filename.stem + comparison = bt_data["strategy_comparison"] for i in range(len(comparison)): - if comparison[i]['key'] == strategy_name: - results['strategy_comparison'].append(comparison[i]) + if comparison[i]["key"] == strategy_name: + results["strategy_comparison"].append(comparison[i]) break def _get_backtest_files(dirname: Path) -> List[Path]: # Weird glob expression here avoids including .meta.json files. - return list(reversed(sorted(dirname.glob('backtest-result-*-[0-9][0-9].json')))) + return list(reversed(sorted(dirname.glob("backtest-result-*-[0-9][0-9].json")))) def _extract_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]: metadata = load_backtest_metadata(filename) return [ { - 'filename': filename.stem, - 'strategy': s, - 'run_id': v['run_id'], - 'notes': v.get('notes', ''), + "filename": filename.stem, + "strategy": s, + "run_id": v["run_id"], + "notes": v.get("notes", ""), # Backtest "run" time - 'backtest_start_time': v['backtest_start_time'], + "backtest_start_time": v["backtest_start_time"], # Backtest timerange - 'backtest_start_ts': v.get('backtest_start_ts', None), - 'backtest_end_ts': v.get('backtest_end_ts', None), - 'timeframe': v.get('timeframe', None), - 'timeframe_detail': v.get('timeframe_detail', None), - } for s, v in metadata.items() + "backtest_start_ts": v.get("backtest_start_ts", None), + "backtest_end_ts": v.get("backtest_end_ts", None), + "timeframe": v.get("timeframe", None), + "timeframe_detail": v.get("timeframe_detail", None), + } + for s, v in metadata.items() ] @@ -218,7 +244,7 @@ def delete_backtest_result(file_abs: Path): """ # *.meta.json logger.info(f"Deleting backtest result file: {file_abs.name}") - file_abs_meta = file_abs.with_suffix('.meta.json') + file_abs_meta = file_abs.with_suffix(".meta.json") file_abs.unlink() file_abs_meta.unlink() @@ -244,12 +270,13 @@ def get_backtest_market_change(filename: Path, include_ts: bool = True) -> pd.Da """ df = pd.read_feather(filename) if include_ts: - df.loc[:, '__date_ts'] = df.loc[:, 'date'].astype(np.int64) // 1000 // 1000 + df.loc[:, "__date_ts"] = df.loc[:, "date"].astype(np.int64) // 1000 // 1000 return df -def find_existing_backtest_stats(dirname: Union[Path, str], run_ids: Dict[str, str], - min_backtest_date: Optional[datetime] = None) -> Dict[str, Any]: +def find_existing_backtest_stats( + dirname: Union[Path, str], run_ids: Dict[str, str], min_backtest_date: Optional[datetime] = None +) -> Dict[str, Any]: """ Find existing backtest stats that match specified run IDs and load them. :param dirname: pathlib.Path object, or string pointing to the file. @@ -261,9 +288,9 @@ def find_existing_backtest_stats(dirname: Union[Path, str], run_ids: Dict[str, s run_ids = copy(run_ids) dirname = Path(dirname) results: Dict[str, Any] = { - 'metadata': {}, - 'strategy': {}, - 'strategy_comparison': [], + "metadata": {}, + "strategy": {}, + "strategy_comparison": [], } for filename in _get_backtest_files(dirname): @@ -280,14 +307,14 @@ def find_existing_backtest_stats(dirname: Union[Path, str], run_ids: Dict[str, s continue if min_backtest_date is not None: - backtest_date = strategy_metadata['backtest_start_time'] + backtest_date = strategy_metadata["backtest_start_time"] backtest_date = datetime.fromtimestamp(backtest_date, tz=timezone.utc) if backtest_date < min_backtest_date: # Do not use a cached result for this strategy as first result is too old. del run_ids[strategy_name] continue - if strategy_metadata['run_id'] == run_id: + if strategy_metadata["run_id"] == run_id: del run_ids[strategy_name] load_and_merge_backtest_result(strategy_name, filename, results) @@ -300,20 +327,20 @@ def _load_backtest_data_df_compatibility(df: pd.DataFrame) -> pd.DataFrame: """ Compatibility support for older backtest data. """ - df['open_date'] = pd.to_datetime(df['open_date'], utc=True) - df['close_date'] = pd.to_datetime(df['close_date'], utc=True) + df["open_date"] = pd.to_datetime(df["open_date"], utc=True) + df["close_date"] = pd.to_datetime(df["close_date"], utc=True) # Compatibility support for pre short Columns - if 'is_short' not in df.columns: - df['is_short'] = False - if 'leverage' not in df.columns: - df['leverage'] = 1.0 - if 'enter_tag' not in df.columns: - df['enter_tag'] = df['buy_tag'] - df = df.drop(['buy_tag'], axis=1) - if 'max_stake_amount' not in df.columns: - df['max_stake_amount'] = df['stake_amount'] - if 'orders' not in df.columns: - df['orders'] = None + if "is_short" not in df.columns: + df["is_short"] = False + if "leverage" not in df.columns: + df["leverage"] = 1.0 + if "enter_tag" not in df.columns: + df["enter_tag"] = df["buy_tag"] + df = df.drop(["buy_tag"], axis=1) + if "max_stake_amount" not in df.columns: + df["max_stake_amount"] = df["stake_amount"] + if "orders" not in df.columns: + df["orders"] = None return df @@ -329,23 +356,25 @@ def load_backtest_data(filename: Union[Path, str], strategy: Optional[str] = Non data = load_backtest_stats(filename) if not isinstance(data, list): # new, nested format - if 'strategy' not in data: + if "strategy" not in data: raise ValueError("Unknown dataformat.") if not strategy: - if len(data['strategy']) == 1: - strategy = list(data['strategy'].keys())[0] + if len(data["strategy"]) == 1: + strategy = list(data["strategy"].keys())[0] else: - raise ValueError("Detected backtest result with more than one strategy. " - "Please specify a strategy.") + raise ValueError( + "Detected backtest result with more than one strategy. " + "Please specify a strategy." + ) - if strategy not in data['strategy']: + if strategy not in data["strategy"]: raise ValueError( f"Strategy {strategy} not available in the backtest result. " f"Available strategies are '{','.join(data['strategy'].keys())}'" - ) + ) - data = data['strategy'][strategy]['trades'] + data = data["strategy"][strategy]["trades"] df = pd.DataFrame(data) if not df.empty: df = _load_backtest_data_df_compatibility(df) @@ -353,7 +382,8 @@ def load_backtest_data(filename: Union[Path, str], strategy: Optional[str] = Non else: # old format - only with lists. raise OperationalException( - "Backtest-results with only trades data are no longer supported.") + "Backtest-results with only trades data are no longer supported." + ) if not df.empty: df = df.sort_values("open_date").reset_index(drop=True) return df @@ -368,23 +398,26 @@ def analyze_trade_parallelism(results: pd.DataFrame, timeframe: str) -> pd.DataF :return: dataframe with open-counts per time-period in timeframe """ from freqtrade.exchange import timeframe_to_resample_freq + timeframe_freq = timeframe_to_resample_freq(timeframe) - dates = [pd.Series(pd.date_range(row[1]['open_date'], row[1]['close_date'], - freq=timeframe_freq)) - for row in results[['open_date', 'close_date']].iterrows()] + dates = [ + pd.Series(pd.date_range(row[1]["open_date"], row[1]["close_date"], freq=timeframe_freq)) + for row in results[["open_date", "close_date"]].iterrows() + ] deltas = [len(x) for x in dates] - dates = pd.Series(pd.concat(dates).values, name='date') + dates = pd.Series(pd.concat(dates).values, name="date") df2 = pd.DataFrame(np.repeat(results.values, deltas, axis=0), columns=results.columns) df2 = pd.concat([dates, df2], axis=1) - df2 = df2.set_index('date') - df_final = df2.resample(timeframe_freq)[['pair']].count() - df_final = df_final.rename({'pair': 'open_trades'}, axis=1) + df2 = df2.set_index("date") + df_final = df2.resample(timeframe_freq)[["pair"]].count() + df_final = df_final.rename({"pair": "open_trades"}, axis=1) return df_final -def evaluate_result_multi(results: pd.DataFrame, timeframe: str, - max_open_trades: IntOrInf) -> pd.DataFrame: +def evaluate_result_multi( + results: pd.DataFrame, timeframe: str, max_open_trades: IntOrInf +) -> pd.DataFrame: """ Find overlapping trades by expanding each trade once per period it was open and then counting overlaps @@ -394,7 +427,7 @@ def evaluate_result_multi(results: pd.DataFrame, timeframe: str, :return: dataframe with open-counts per time-period in freq """ df_final = analyze_trade_parallelism(results, timeframe) - return df_final[df_final['open_trades'] > max_open_trades] + return df_final[df_final["open_trades"] > max_open_trades] def trade_list_to_dataframe(trades: Union[List[Trade], List[LocalTrade]]) -> pd.DataFrame: @@ -405,9 +438,9 @@ def trade_list_to_dataframe(trades: Union[List[Trade], List[LocalTrade]]) -> pd. """ df = pd.DataFrame.from_records([t.to_json(True) for t in trades], columns=BT_DATA_COLUMNS) if len(df) > 0: - df['close_date'] = pd.to_datetime(df['close_date'], utc=True) - df['open_date'] = pd.to_datetime(df['open_date'], utc=True) - df['close_rate'] = df['close_rate'].astype('float64') + df["close_date"] = pd.to_datetime(df["close_date"], utc=True) + df["open_date"] = pd.to_datetime(df["open_date"], utc=True) + df["close_rate"] = df["close_rate"].astype("float64") return df @@ -429,8 +462,13 @@ def load_trades_from_db(db_url: str, strategy: Optional[str] = None) -> pd.DataF return trades -def load_trades(source: str, db_url: str, exportfilename: Path, - no_trades: bool = False, strategy: Optional[str] = None) -> pd.DataFrame: +def load_trades( + source: str, + db_url: str, + exportfilename: Path, + no_trades: bool = False, + strategy: Optional[str] = None, +) -> pd.DataFrame: """ Based on configuration option 'trade_source': * loads data from DB (using `db_url`) @@ -451,8 +489,9 @@ def load_trades(source: str, db_url: str, exportfilename: Path, return load_backtest_data(exportfilename, strategy) -def extract_trades_of_period(dataframe: pd.DataFrame, trades: pd.DataFrame, - date_index=False) -> pd.DataFrame: +def extract_trades_of_period( + dataframe: pd.DataFrame, trades: pd.DataFrame, date_index=False +) -> pd.DataFrame: """ Compare trades and backtested pair DataFrames to get trades performed on backtested period :return: the DataFrame of a trades of period @@ -461,8 +500,9 @@ def extract_trades_of_period(dataframe: pd.DataFrame, trades: pd.DataFrame, trades_start = dataframe.index[0] trades_stop = dataframe.index[-1] else: - trades_start = dataframe.iloc[0]['date'] - trades_stop = dataframe.iloc[-1]['date'] - trades = trades.loc[(trades['open_date'] >= trades_start) & - (trades['close_date'] <= trades_stop)] + trades_start = dataframe.iloc[0]["date"] + trades_stop = dataframe.iloc[-1]["date"] + trades = trades.loc[ + (trades["open_date"] >= trades_start) & (trades["close_date"] <= trades_stop) + ] return trades diff --git a/freqtrade/data/converter/__init__.py b/freqtrade/data/converter/__init__.py index 36b7f2f87..76c8a7edc 100644 --- a/freqtrade/data/converter/__init__.py +++ b/freqtrade/data/converter/__init__.py @@ -20,19 +20,19 @@ from freqtrade.data.converter.trade_converter import ( __all__ = [ - 'clean_ohlcv_dataframe', - 'convert_ohlcv_format', - 'ohlcv_fill_up_missing_data', - 'ohlcv_to_dataframe', - 'order_book_to_dataframe', - 'reduce_dataframe_footprint', - 'trim_dataframe', - 'trim_dataframes', - 'convert_trades_format', - 'convert_trades_to_ohlcv', - 'trades_convert_types', - 'trades_df_remove_duplicates', - 'trades_dict_to_list', - 'trades_list_to_df', - 'trades_to_ohlcv', + "clean_ohlcv_dataframe", + "convert_ohlcv_format", + "ohlcv_fill_up_missing_data", + "ohlcv_to_dataframe", + "order_book_to_dataframe", + "reduce_dataframe_footprint", + "trim_dataframe", + "trim_dataframes", + "convert_trades_format", + "convert_trades_to_ohlcv", + "trades_convert_types", + "trades_df_remove_duplicates", + "trades_dict_to_list", + "trades_list_to_df", + "trades_to_ohlcv", ] diff --git a/freqtrade/data/converter/converter.py b/freqtrade/data/converter/converter.py index 0ebf24a4f..33a507740 100644 --- a/freqtrade/data/converter/converter.py +++ b/freqtrade/data/converter/converter.py @@ -1,6 +1,7 @@ """ Functions to convert data from one format to another """ + import logging from typing import Dict @@ -15,8 +16,14 @@ from freqtrade.enums import CandleType, TradingMode logger = logging.getLogger(__name__) -def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *, - fill_missing: bool = True, drop_incomplete: bool = True) -> DataFrame: +def ohlcv_to_dataframe( + ohlcv: list, + timeframe: str, + pair: str, + *, + fill_missing: bool = True, + drop_incomplete: bool = True, +) -> DataFrame: """ Converts a list with candle (OHLCV) data (in format returned by ccxt.fetch_ohlcv) to a Dataframe @@ -32,20 +39,28 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *, cols = DEFAULT_DATAFRAME_COLUMNS df = DataFrame(ohlcv, columns=cols) - df['date'] = to_datetime(df['date'], unit='ms', utc=True) + df["date"] = to_datetime(df["date"], unit="ms", utc=True) # Some exchanges return int values for Volume and even for OHLC. # Convert them since TA-LIB indicators used in the strategy assume floats # and fail with exception... - df = df.astype(dtype={'open': 'float', 'high': 'float', 'low': 'float', 'close': 'float', - 'volume': 'float'}) - return clean_ohlcv_dataframe(df, timeframe, pair, - fill_missing=fill_missing, - drop_incomplete=drop_incomplete) + df = df.astype( + dtype={ + "open": "float", + "high": "float", + "low": "float", + "close": "float", + "volume": "float", + } + ) + return clean_ohlcv_dataframe( + df, timeframe, pair, fill_missing=fill_missing, drop_incomplete=drop_incomplete + ) -def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *, - fill_missing: bool, drop_incomplete: bool) -> DataFrame: +def clean_ohlcv_dataframe( + data: DataFrame, timeframe: str, pair: str, *, fill_missing: bool, drop_incomplete: bool +) -> DataFrame: """ Cleanse a OHLCV dataframe by * Grouping it by date (removes duplicate tics) @@ -60,17 +75,19 @@ def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *, :return: DataFrame """ # group by index and aggregate results to eliminate duplicate ticks - data = data.groupby(by='date', as_index=False, sort=True).agg({ - 'open': 'first', - 'high': 'max', - 'low': 'min', - 'close': 'last', - 'volume': 'max', - }) + data = data.groupby(by="date", as_index=False, sort=True).agg( + { + "open": "first", + "high": "max", + "low": "min", + "close": "last", + "volume": "max", + } + ) # eliminate partial candle if drop_incomplete: data.drop(data.tail(1).index, inplace=True) - logger.debug('Dropping last candle') + logger.debug("Dropping last candle") if fill_missing: return ohlcv_fill_up_missing_data(data, timeframe, pair) @@ -86,32 +103,30 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) """ from freqtrade.exchange import timeframe_to_resample_freq - ohlcv_dict = { - 'open': 'first', - 'high': 'max', - 'low': 'min', - 'close': 'last', - 'volume': 'sum' - } + ohlcv_dict = {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"} resample_interval = timeframe_to_resample_freq(timeframe) # Resample to create "NAN" values - df = dataframe.resample(resample_interval, on='date').agg(ohlcv_dict) + df = dataframe.resample(resample_interval, on="date").agg(ohlcv_dict) # Forwardfill close for missing columns - df['close'] = df['close'].ffill() + df["close"] = df["close"].ffill() # Use close for "open, high, low" - df.loc[:, ['open', 'high', 'low']] = df[['open', 'high', 'low']].fillna( - value={'open': df['close'], - 'high': df['close'], - 'low': df['close'], - }) + df.loc[:, ["open", "high", "low"]] = df[["open", "high", "low"]].fillna( + value={ + "open": df["close"], + "high": df["close"], + "low": df["close"], + } + ) df.reset_index(inplace=True) len_before = len(dataframe) len_after = len(df) pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0 if len_before != len_after: - message = (f"Missing data fillup for {pair}, {timeframe}: " - f"before: {len_before} - after: {len_after} - {pct_missing:.2%}") + message = ( + f"Missing data fillup for {pair}, {timeframe}: " + f"before: {len_before} - after: {len_after} - {pct_missing:.2%}" + ) if pct_missing > 0.01: logger.info(message) else: @@ -120,8 +135,9 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) return df -def trim_dataframe(df: DataFrame, timerange, *, df_date_col: str = 'date', - startup_candles: int = 0) -> DataFrame: +def trim_dataframe( + df: DataFrame, timerange, *, df_date_col: str = "date", startup_candles: int = 0 +) -> DataFrame: """ Trim dataframe based on given timerange :param df: Dataframe to trim @@ -134,15 +150,16 @@ def trim_dataframe(df: DataFrame, timerange, *, df_date_col: str = 'date', # Trim candles instead of timeframe in case of given startup_candle count df = df.iloc[startup_candles:, :] else: - if timerange.starttype == 'date': + if timerange.starttype == "date": df = df.loc[df[df_date_col] >= timerange.startdt, :] - if timerange.stoptype == 'date': + if timerange.stoptype == "date": df = df.loc[df[df_date_col] <= timerange.stopdt, :] return df -def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange, - startup_candles: int) -> Dict[str, DataFrame]: +def trim_dataframes( + preprocessed: Dict[str, DataFrame], timerange, startup_candles: int +) -> Dict[str, DataFrame]: """ Trim startup period from analyzed dataframes :param preprocessed: Dict of pair: dataframe @@ -157,8 +174,9 @@ def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange, if not trimed_df.empty: processed[pair] = trimed_df else: - logger.warning(f'{pair} has no data left after adjusting for startup candles, ' - f'skipping.') + logger.warning( + f"{pair} has no data left after adjusting for startup candles, " f"skipping." + ) return processed @@ -170,19 +188,28 @@ def order_book_to_dataframe(bids: list, asks: list) -> DataFrame: b_sum b_size bids asks a_size a_sum ------------------------------------------------------------------- """ - cols = ['bids', 'b_size'] + cols = ["bids", "b_size"] bids_frame = DataFrame(bids, columns=cols) # add cumulative sum column - bids_frame['b_sum'] = bids_frame['b_size'].cumsum() - cols2 = ['asks', 'a_size'] + bids_frame["b_sum"] = bids_frame["b_size"].cumsum() + cols2 = ["asks", "a_size"] asks_frame = DataFrame(asks, columns=cols2) # add cumulative sum column - asks_frame['a_sum'] = asks_frame['a_size'].cumsum() + asks_frame["a_sum"] = asks_frame["a_size"].cumsum() - frame = pd.concat([bids_frame['b_sum'], bids_frame['b_size'], bids_frame['bids'], - asks_frame['asks'], asks_frame['a_size'], asks_frame['a_sum']], axis=1, - keys=['b_sum', 'b_size', 'bids', 'asks', 'a_size', 'a_sum']) + frame = pd.concat( + [ + bids_frame["b_sum"], + bids_frame["b_size"], + bids_frame["bids"], + asks_frame["asks"], + asks_frame["a_size"], + asks_frame["a_sum"], + ], + axis=1, + keys=["b_sum", "b_size", "bids", "asks", "a_size", "a_sum"], + ) # logger.info('order book %s', frame ) return frame @@ -201,47 +228,51 @@ def convert_ohlcv_format( :param erase: Erase source data (does not apply if source and target format are identical) """ from freqtrade.data.history import get_datahandler - src = get_datahandler(config['datadir'], convert_from) - trg = get_datahandler(config['datadir'], convert_to) - timeframes = config.get('timeframes', [config.get('timeframe')]) + + src = get_datahandler(config["datadir"], convert_from) + trg = get_datahandler(config["datadir"], convert_to) + timeframes = config.get("timeframes", [config.get("timeframe")]) logger.info(f"Converting candle (OHLCV) for timeframe {timeframes}") - candle_types = [CandleType.from_string(ct) for ct in config.get('candle_types', [ - c.value for c in CandleType])] + candle_types = [ + CandleType.from_string(ct) + for ct in config.get("candle_types", [c.value for c in CandleType]) + ] logger.info(candle_types) - paircombs = src.ohlcv_get_available_data(config['datadir'], TradingMode.SPOT) - paircombs.extend(src.ohlcv_get_available_data(config['datadir'], TradingMode.FUTURES)) + paircombs = src.ohlcv_get_available_data(config["datadir"], TradingMode.SPOT) + paircombs.extend(src.ohlcv_get_available_data(config["datadir"], TradingMode.FUTURES)) - if 'pairs' in config: + if "pairs" in config: # Filter pairs - paircombs = [comb for comb in paircombs if comb[0] in config['pairs']] + paircombs = [comb for comb in paircombs if comb[0] in config["pairs"]] - if 'timeframes' in config: - paircombs = [comb for comb in paircombs if comb[1] in config['timeframes']] + if "timeframes" in config: + paircombs = [comb for comb in paircombs if comb[1] in config["timeframes"]] paircombs = [comb for comb in paircombs if comb[2] in candle_types] paircombs = sorted(paircombs, key=lambda x: (x[0], x[1], x[2].value)) - formatted_paircombs = '\n'.join([f"{pair}, {timeframe}, {candle_type}" - for pair, timeframe, candle_type in paircombs]) + formatted_paircombs = "\n".join( + [f"{pair}, {timeframe}, {candle_type}" for pair, timeframe, candle_type in paircombs] + ) - logger.info(f"Converting candle (OHLCV) data for the following pair combinations:\n" - f"{formatted_paircombs}") + logger.info( + f"Converting candle (OHLCV) data for the following pair combinations:\n" + f"{formatted_paircombs}" + ) for pair, timeframe, candle_type in paircombs: - data = src.ohlcv_load(pair=pair, timeframe=timeframe, - timerange=None, - fill_missing=False, - drop_incomplete=False, - startup_candles=0, - candle_type=candle_type) + data = src.ohlcv_load( + pair=pair, + timeframe=timeframe, + timerange=None, + fill_missing=False, + drop_incomplete=False, + startup_candles=0, + candle_type=candle_type, + ) logger.info(f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}") if len(data) > 0: - trg.ohlcv_store( - pair=pair, - timeframe=timeframe, - data=data, - candle_type=candle_type - ) + trg.ohlcv_store(pair=pair, timeframe=timeframe, data=data, candle_type=candle_type) if erase and convert_from != convert_to: logger.info(f"Deleting source data for {pair} / {timeframe}") src.ohlcv_purge(pair=pair, timeframe=timeframe, candle_type=candle_type) @@ -254,12 +285,11 @@ def reduce_dataframe_footprint(df: DataFrame) -> DataFrame: :return: Dataframe converted to float/int 32s """ - logger.debug(f"Memory usage of dataframe is " - f"{df.memory_usage().sum() / 1024**2:.2f} MB") + logger.debug(f"Memory usage of dataframe is " f"{df.memory_usage().sum() / 1024**2:.2f} MB") df_dtypes = df.dtypes for column, dtype in df_dtypes.items(): - if column in ['open', 'high', 'low', 'close', 'volume']: + if column in ["open", "high", "low", "close", "volume"]: continue if dtype == np.float64: df_dtypes[column] = np.float32 @@ -267,7 +297,8 @@ def reduce_dataframe_footprint(df: DataFrame) -> DataFrame: df_dtypes[column] = np.int32 df = df.astype(df_dtypes) - logger.debug(f"Memory usage after optimization is: " - f"{df.memory_usage().sum() / 1024**2:.2f} MB") + logger.debug( + f"Memory usage after optimization is: " f"{df.memory_usage().sum() / 1024**2:.2f} MB" + ) return df diff --git a/freqtrade/data/converter/trade_converter.py b/freqtrade/data/converter/trade_converter.py index a94cf284b..9b8fe718e 100644 --- a/freqtrade/data/converter/trade_converter.py +++ b/freqtrade/data/converter/trade_converter.py @@ -1,6 +1,7 @@ """ Functions to convert data from one format to another """ + import logging from pathlib import Path from typing import Dict, List @@ -30,7 +31,7 @@ def trades_df_remove_duplicates(trades: pd.DataFrame) -> pd.DataFrame: :param trades: DataFrame with the columns constants.DEFAULT_TRADES_COLUMNS :return: DataFrame with duplicates removed based on the 'timestamp' column """ - return trades.drop_duplicates(subset=['timestamp', 'id']) + return trades.drop_duplicates(subset=["timestamp", "id"]) def trades_dict_to_list(trades: List[Dict]) -> TradeList: @@ -47,7 +48,7 @@ def trades_convert_types(trades: DataFrame) -> DataFrame: Convert Trades dtypes and add 'date' column """ trades = trades.astype(TRADES_DTYPES) - trades['date'] = to_datetime(trades['timestamp'], unit='ms', utc=True) + trades["date"] = to_datetime(trades["timestamp"], unit="ms", utc=True) return trades @@ -76,13 +77,14 @@ def trades_to_ohlcv(trades: DataFrame, timeframe: str) -> DataFrame: :raises: ValueError if no trades are provided """ from freqtrade.exchange import timeframe_to_resample_freq + if trades.empty: - raise ValueError('Trade-list empty.') - df = trades.set_index('date', drop=True) + raise ValueError("Trade-list empty.") + df = trades.set_index("date", drop=True) resample_interval = timeframe_to_resample_freq(timeframe) - df_new = df['price'].resample(resample_interval).ohlc() - df_new['volume'] = df['amount'].resample(resample_interval).sum() - df_new['date'] = df_new.index + df_new = df["price"].resample(resample_interval).ohlc() + df_new["volume"] = df["amount"].resample(resample_interval).sum() + df_new["date"] = df_new.index # Drop 0 volume rows df_new = df_new.dropna() return df_new.loc[:, DEFAULT_DATAFRAME_COLUMNS] @@ -102,24 +104,27 @@ def convert_trades_to_ohlcv( Convert stored trades data to ohlcv data """ from freqtrade.data.history import get_datahandler + data_handler_trades = get_datahandler(datadir, data_format=data_format_trades) data_handler_ohlcv = get_datahandler(datadir, data_format=data_format_ohlcv) - logger.info(f"About to convert pairs: '{', '.join(pairs)}', " - f"intervals: '{', '.join(timeframes)}' to {datadir}") + logger.info( + f"About to convert pairs: '{', '.join(pairs)}', " + f"intervals: '{', '.join(timeframes)}' to {datadir}" + ) trading_mode = TradingMode.FUTURES if candle_type != CandleType.SPOT else TradingMode.SPOT for pair in pairs: trades = data_handler_trades.trades_load(pair, trading_mode) for timeframe in timeframes: if erase: if data_handler_ohlcv.ohlcv_purge(pair, timeframe, candle_type=candle_type): - logger.info(f'Deleting existing data for pair {pair}, interval {timeframe}.') + logger.info(f"Deleting existing data for pair {pair}, interval {timeframe}.") try: ohlcv = trades_to_ohlcv(trades, timeframe) # Store ohlcv data_handler_ohlcv.ohlcv_store(pair, timeframe, data=ohlcv, candle_type=candle_type) except ValueError: - logger.warning(f'Could not convert {pair} to OHLCV.') + logger.warning(f"Could not convert {pair} to OHLCV.") def convert_trades_format(config: Config, convert_from: str, convert_to: str, erase: bool): @@ -130,25 +135,27 @@ def convert_trades_format(config: Config, convert_from: str, convert_to: str, er :param convert_to: Target format :param erase: Erase source data (does not apply if source and target format are identical) """ - if convert_from == 'kraken_csv': - if config['exchange']['name'] != 'kraken': + if convert_from == "kraken_csv": + if config["exchange"]["name"] != "kraken": raise OperationalException( - 'Converting from csv is only supported for kraken.' - 'Please refer to the documentation for details about this special mode.' + "Converting from csv is only supported for kraken." + "Please refer to the documentation for details about this special mode." ) from freqtrade.data.converter.trade_converter_kraken import import_kraken_trades_from_csv + import_kraken_trades_from_csv(config, convert_to) return from freqtrade.data.history import get_datahandler - src = get_datahandler(config['datadir'], convert_from) - trg = get_datahandler(config['datadir'], convert_to) - if 'pairs' not in config: - config['pairs'] = src.trades_get_pairs(config['datadir']) + src = get_datahandler(config["datadir"], convert_from) + trg = get_datahandler(config["datadir"], convert_to) + + if "pairs" not in config: + config["pairs"] = src.trades_get_pairs(config["datadir"]) logger.info(f"Converting trades for {config['pairs']}") - trading_mode: TradingMode = config.get('trading_mode', TradingMode.SPOT) - for pair in config['pairs']: + trading_mode: TradingMode = config.get("trading_mode", TradingMode.SPOT) + for pair in config["pairs"]: data = src.trades_load(pair, trading_mode) logger.info(f"Converting {len(data)} trades for {pair}") trg.trades_store(pair, data, trading_mode) diff --git a/freqtrade/data/converter/trade_converter_kraken.py b/freqtrade/data/converter/trade_converter_kraken.py index 332352d47..911fcd17b 100644 --- a/freqtrade/data/converter/trade_converter_kraken.py +++ b/freqtrade/data/converter/trade_converter_kraken.py @@ -17,32 +17,33 @@ from freqtrade.resolvers import ExchangeResolver logger = logging.getLogger(__name__) -KRAKEN_CSV_TRADE_COLUMNS = ['timestamp', 'price', 'amount'] +KRAKEN_CSV_TRADE_COLUMNS = ["timestamp", "price", "amount"] def import_kraken_trades_from_csv(config: Config, convert_to: str): """ Import kraken trades from csv """ - if config['exchange']['name'] != 'kraken': - raise OperationalException('This function is only for the kraken exchange.') + if config["exchange"]["name"] != "kraken": + raise OperationalException("This function is only for the kraken exchange.") - datadir: Path = config['datadir'] + datadir: Path = config["datadir"] data_handler = get_datahandler(datadir, data_format=convert_to) - tradesdir: Path = config['datadir'] / 'trades_csv' + tradesdir: Path = config["datadir"] / "trades_csv" exchange = ExchangeResolver.load_exchange(config, validate=False) # iterate through directories in this directory - data_symbols = {p.stem for p in tradesdir.rglob('*.csv')} + data_symbols = {p.stem for p in tradesdir.rglob("*.csv")} # create pair/filename mapping markets = { - (m['symbol'], m['altname']) for m in exchange.markets.values() - if m.get('altname') in data_symbols + (m["symbol"], m["altname"]) + for m in exchange.markets.values() + if m.get("altname") in data_symbols } logger.info(f"Found csv files for {', '.join(data_symbols)}.") - if pairs_raw := config.get('pairs'): + if pairs_raw := config.get("pairs"): pairs = expand_pairlist(pairs_raw, [m[0] for m in markets]) markets = {m for m in markets if m[0] in pairs} if not markets: @@ -68,18 +69,20 @@ def import_kraken_trades_from_csv(config: Config, convert_to: str): trades = pd.concat(dfs, ignore_index=True) del dfs - trades.loc[:, 'timestamp'] = trades['timestamp'] * 1e3 - trades.loc[:, 'cost'] = trades['price'] * trades['amount'] + trades.loc[:, "timestamp"] = trades["timestamp"] * 1e3 + trades.loc[:, "cost"] = trades["price"] * trades["amount"] for col in DEFAULT_TRADES_COLUMNS: if col not in trades.columns: - trades.loc[:, col] = '' + trades.loc[:, col] = "" trades = trades[DEFAULT_TRADES_COLUMNS] trades = trades_convert_types(trades) trades_df = trades_df_remove_duplicates(trades) del trades - logger.info(f"{pair}: {len(trades_df)} trades, from " - f"{trades_df['date'].min():{DATETIME_PRINT_FORMAT}} to " - f"{trades_df['date'].max():{DATETIME_PRINT_FORMAT}}") + logger.info( + f"{pair}: {len(trades_df)} trades, from " + f"{trades_df['date'].min():{DATETIME_PRINT_FORMAT}} to " + f"{trades_df['date'].max():{DATETIME_PRINT_FORMAT}}" + ) data_handler.trades_store(pair, trades_df, TradingMode.SPOT) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index fc906b123..777f99895 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -4,6 +4,7 @@ Responsible to provide data to the bot including ticker and orderbook data, live and historical candle (OHLCV) data Common Interface for bot and strategy to access data. """ + import logging from collections import deque from datetime import datetime, timezone @@ -31,18 +32,17 @@ from freqtrade.util import PeriodicCache logger = logging.getLogger(__name__) -NO_EXCHANGE_EXCEPTION = 'Exchange is not available to DataProvider.' +NO_EXCHANGE_EXCEPTION = "Exchange is not available to DataProvider." MAX_DATAFRAME_CANDLES = 1000 class DataProvider: - def __init__( self, config: Config, exchange: Optional[Exchange], pairlists=None, - rpc: Optional[RPCManager] = None + rpc: Optional[RPCManager] = None, ) -> None: self._config = config self._exchange = exchange @@ -53,18 +53,20 @@ class DataProvider: self.__slice_date: Optional[datetime] = None self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {} - self.__producer_pairs_df: Dict[str, - Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]] = {} + self.__producer_pairs_df: Dict[ + str, Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] + ] = {} self.__producer_pairs: Dict[str, List[str]] = {} self._msg_queue: deque = deque() - self._default_candle_type = self._config.get('candle_type_def', CandleType.SPOT) - self._default_timeframe = self._config.get('timeframe', '1h') + self._default_candle_type = self._config.get("candle_type_def", CandleType.SPOT) + self._default_timeframe = self._config.get("timeframe", "1h") self.__msg_cache = PeriodicCache( - maxsize=1000, ttl=timeframe_to_seconds(self._default_timeframe)) + maxsize=1000, ttl=timeframe_to_seconds(self._default_timeframe) + ) - self.producers = self._config.get('external_message_consumer', {}).get('producers', []) + self.producers = self._config.get("external_message_consumer", {}).get("producers", []) self.external_data_enabled = len(self.producers) > 0 def _set_dataframe_max_index(self, limit_index: int): @@ -84,11 +86,7 @@ class DataProvider: self.__slice_date = limit_date def _set_cached_df( - self, - pair: str, - timeframe: str, - dataframe: DataFrame, - candle_type: CandleType + self, pair: str, timeframe: str, dataframe: DataFrame, candle_type: CandleType ) -> None: """ Store cached Dataframe. @@ -100,8 +98,7 @@ class DataProvider: :param candle_type: Any of the enum CandleType (must match trading mode!) """ pair_key = (pair, timeframe, candle_type) - self.__cached_pairs[pair_key] = ( - dataframe, datetime.now(timezone.utc)) + self.__cached_pairs[pair_key] = (dataframe, datetime.now(timezone.utc)) # For multiple producers we will want to merge the pairlists instead of overwriting def _set_producer_pairs(self, pairlist: List[str], producer_name: str = "default"): @@ -120,12 +117,7 @@ class DataProvider: """ return self.__producer_pairs.get(producer_name, []).copy() - def _emit_df( - self, - pair_key: PairWithTimeframe, - dataframe: DataFrame, - new_candle: bool - ) -> None: + def _emit_df(self, pair_key: PairWithTimeframe, dataframe: DataFrame, new_candle: bool) -> None: """ Send this dataframe as an ANALYZED_DF message to RPC @@ -135,19 +127,21 @@ class DataProvider: """ if self.__rpc: msg: RPCAnalyzedDFMsg = { - 'type': RPCMessageType.ANALYZED_DF, - 'data': { - 'key': pair_key, - 'df': dataframe.tail(1), - 'la': datetime.now(timezone.utc) - } - } + "type": RPCMessageType.ANALYZED_DF, + "data": { + "key": pair_key, + "df": dataframe.tail(1), + "la": datetime.now(timezone.utc), + }, + } self.__rpc.send_msg(msg) if new_candle: - self.__rpc.send_msg({ - 'type': RPCMessageType.NEW_CANDLE, - 'data': pair_key, - }) + self.__rpc.send_msg( + { + "type": RPCMessageType.NEW_CANDLE, + "data": pair_key, + } + ) def _replace_external_df( self, @@ -156,7 +150,7 @@ class DataProvider: last_analyzed: datetime, timeframe: str, candle_type: CandleType, - producer_name: str = "default" + producer_name: str = "default", ) -> None: """ Add the pair data to this class from an external source. @@ -182,7 +176,7 @@ class DataProvider: last_analyzed: datetime, timeframe: str, candle_type: CandleType, - producer_name: str = "default" + producer_name: str = "default", ) -> Tuple[bool, int]: """ Append a candle to the existing external dataframe. The incoming dataframe @@ -208,12 +202,14 @@ class DataProvider: last_analyzed=last_analyzed, timeframe=timeframe, candle_type=candle_type, - producer_name=producer_name + producer_name=producer_name, ) return (True, 0) - if (producer_name not in self.__producer_pairs_df - or pair_key not in self.__producer_pairs_df[producer_name]): + if ( + producer_name not in self.__producer_pairs_df + or pair_key not in self.__producer_pairs_df[producer_name] + ): # We don't have data from this producer yet, # or we don't have data for this pair_key # return False and 1000 for the full df @@ -224,12 +220,12 @@ class DataProvider: # CHECK FOR MISSING CANDLES # Convert the timeframe to a timedelta for pandas timeframe_delta: Timedelta = to_timedelta(timeframe) - local_last: Timestamp = existing_df.iloc[-1]['date'] # We want the last date from our copy + local_last: Timestamp = existing_df.iloc[-1]["date"] # We want the last date from our copy # We want the first date from the incoming - incoming_first: Timestamp = dataframe.iloc[0]['date'] + incoming_first: Timestamp = dataframe.iloc[0]["date"] # Remove existing candles that are newer than the incoming first candle - existing_df1 = existing_df[existing_df['date'] < incoming_first] + existing_df1 = existing_df[existing_df["date"] < incoming_first] candle_difference = (incoming_first - local_last) / timeframe_delta @@ -247,13 +243,13 @@ class DataProvider: # Everything is good, we appended self._replace_external_df( - pair, - appended_df, - last_analyzed=last_analyzed, - timeframe=timeframe, - candle_type=candle_type, - producer_name=producer_name - ) + pair, + appended_df, + last_analyzed=last_analyzed, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name, + ) return (True, 0) def get_producer_df( @@ -261,7 +257,7 @@ class DataProvider: pair: str, timeframe: Optional[str] = None, candle_type: Optional[CandleType] = None, - producer_name: str = "default" + producer_name: str = "default", ) -> Tuple[DataFrame, datetime]: """ Get the pair data from producers. @@ -296,64 +292,64 @@ class DataProvider: """ self._pairlists = pairlists - def historic_ohlcv( - self, - pair: str, - timeframe: str, - candle_type: str = '' - ) -> DataFrame: + def historic_ohlcv(self, pair: str, timeframe: str, candle_type: str = "") -> DataFrame: """ Get stored historical candle (OHLCV) data :param pair: pair to get the data for :param timeframe: timeframe to get data for :param candle_type: '', mark, index, premiumIndex, or funding_rate """ - _candle_type = CandleType.from_string( - candle_type) if candle_type != '' else self._config['candle_type_def'] + _candle_type = ( + CandleType.from_string(candle_type) + if candle_type != "" + else self._config["candle_type_def"] + ) saved_pair: PairWithTimeframe = (pair, str(timeframe), _candle_type) if saved_pair not in self.__cached_pairs_backtesting: - timerange = TimeRange.parse_timerange(None if self._config.get( - 'timerange') is None else str(self._config.get('timerange'))) + timerange = TimeRange.parse_timerange( + None + if self._config.get("timerange") is None + else str(self._config.get("timerange")) + ) startup_candles = self.get_required_startup(str(timeframe)) tf_seconds = timeframe_to_seconds(str(timeframe)) timerange.subtract_start(tf_seconds * startup_candles) - logger.info(f"Loading data for {pair} {timeframe} " - f"from {timerange.start_fmt} to {timerange.stop_fmt}") + logger.info( + f"Loading data for {pair} {timeframe} " + f"from {timerange.start_fmt} to {timerange.stop_fmt}" + ) self.__cached_pairs_backtesting[saved_pair] = load_pair_history( pair=pair, timeframe=timeframe, - datadir=self._config['datadir'], + datadir=self._config["datadir"], timerange=timerange, - data_format=self._config['dataformat_ohlcv'], + data_format=self._config["dataformat_ohlcv"], candle_type=_candle_type, - ) return self.__cached_pairs_backtesting[saved_pair].copy() def get_required_startup(self, timeframe: str) -> int: - freqai_config = self._config.get('freqai', {}) - if not freqai_config.get('enabled', False): - return self._config.get('startup_candle_count', 0) + freqai_config = self._config.get("freqai", {}) + if not freqai_config.get("enabled", False): + return self._config.get("startup_candle_count", 0) else: - startup_candles = self._config.get('startup_candle_count', 0) - indicator_periods = freqai_config['feature_parameters']['indicator_periods_candles'] + startup_candles = self._config.get("startup_candle_count", 0) + indicator_periods = freqai_config["feature_parameters"]["indicator_periods_candles"] # make sure the startupcandles is at least the set maximum indicator periods - self._config['startup_candle_count'] = max(startup_candles, max(indicator_periods)) + self._config["startup_candle_count"] = max(startup_candles, max(indicator_periods)) tf_seconds = timeframe_to_seconds(timeframe) - train_candles = freqai_config['train_period_days'] * 86400 / tf_seconds - total_candles = int(self._config['startup_candle_count'] + train_candles) + train_candles = freqai_config["train_period_days"] * 86400 / tf_seconds + total_candles = int(self._config["startup_candle_count"] + train_candles) logger.info( - f'Increasing startup_candle_count for freqai on {timeframe} to {total_candles}') + f"Increasing startup_candle_count for freqai on {timeframe} to {total_candles}" + ) return total_candles def get_pair_dataframe( - self, - pair: str, - timeframe: Optional[str] = None, - candle_type: str = '' + self, pair: str, timeframe: Optional[str] = None, candle_type: str = "" ) -> DataFrame: """ Return pair candle (OHLCV) data, either live or cached historical -- depending @@ -370,13 +366,13 @@ class DataProvider: data = self.ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type) else: # Get historical OHLCV data (cached on disk). - timeframe = timeframe or self._config['timeframe'] + timeframe = timeframe or self._config["timeframe"] data = self.historic_ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type) # Cut date to timeframe-specific date. # This is necessary to prevent lookahead bias in callbacks through informative pairs. if self.__slice_date: cutoff_date = timeframe_to_prev_date(timeframe, self.__slice_date) - data = data.loc[data['date'] < cutoff_date] + data = data.loc[data["date"] < cutoff_date] if len(data) == 0: logger.warning(f"No data found for ({pair}, {timeframe}, {candle_type}).") return data @@ -391,7 +387,7 @@ class DataProvider: combination. Returns empty dataframe and Epoch 0 (1970-01-01) if no dataframe was cached. """ - pair_key = (pair, timeframe, self._config.get('candle_type_def', CandleType.SPOT)) + pair_key = (pair, timeframe, self._config.get("candle_type_def", CandleType.SPOT)) if pair_key in self.__cached_pairs: if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE): df, date = self.__cached_pairs[pair_key] @@ -399,7 +395,7 @@ class DataProvider: df, date = self.__cached_pairs[pair_key] if self.__slice_index is not None: max_index = self.__slice_index - df = df.iloc[max(0, max_index - MAX_DATAFRAME_CANDLES):max_index] + df = df.iloc[max(0, max_index - MAX_DATAFRAME_CANDLES) : max_index] return df, date else: return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) @@ -410,7 +406,7 @@ class DataProvider: Get runmode of the bot can be "live", "dry-run", "backtest", "edgecli", "hyperopt" or "other". """ - return RunMode(self._config.get('runmode', RunMode.OTHER)) + return RunMode(self._config.get("runmode", RunMode.OTHER)) def current_whitelist(self) -> List[str]: """ @@ -438,9 +434,11 @@ class DataProvider: # Exchange functions - def refresh(self, - pairlist: ListPairsWithTimeframes, - helping_pairs: Optional[ListPairsWithTimeframes] = None) -> None: + def refresh( + self, + pairlist: ListPairsWithTimeframes, + helping_pairs: Optional[ListPairsWithTimeframes] = None, + ) -> None: """ Refresh data, called with each cycle """ @@ -460,11 +458,7 @@ class DataProvider: return list(self._exchange._klines.keys()) def ohlcv( - self, - pair: str, - timeframe: Optional[str] = None, - copy: bool = True, - candle_type: str = '' + self, pair: str, timeframe: Optional[str] = None, copy: bool = True, candle_type: str = "" ) -> DataFrame: """ Get candle (OHLCV) data for the given pair as DataFrame @@ -478,11 +472,13 @@ class DataProvider: if self._exchange is None: raise OperationalException(NO_EXCHANGE_EXCEPTION) if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE): - _candle_type = CandleType.from_string( - candle_type) if candle_type != '' else self._config['candle_type_def'] + _candle_type = ( + CandleType.from_string(candle_type) + if candle_type != "" + else self._config["candle_type_def"] + ) return self._exchange.klines( - (pair, timeframe or self._config['timeframe'], _candle_type), - copy=copy + (pair, timeframe or self._config["timeframe"], _candle_type), copy=copy ) else: return DataFrame() diff --git a/freqtrade/data/entryexitanalysis.py b/freqtrade/data/entryexitanalysis.py index c85a72834..9d936d295 100644 --- a/freqtrade/data/entryexitanalysis.py +++ b/freqtrade/data/entryexitanalysis.py @@ -21,9 +21,10 @@ logger = logging.getLogger(__name__) def _load_backtest_analysis_data(backtest_dir: Path, name: str): if backtest_dir.is_dir(): - scpf = Path(backtest_dir, - Path(get_latest_backtest_filename(backtest_dir)).stem + "_" + name + ".pkl" - ) + scpf = Path( + backtest_dir, + Path(get_latest_backtest_filename(backtest_dir)).stem + "_" + name + ".pkl", + ) else: scpf = Path(backtest_dir.parent / f"{backtest_dir.stem}_{name}.pkl") @@ -56,7 +57,8 @@ def _process_candles_and_indicators(pairlist, strategy_name, trades, signal_cand for pair in pairlist: if pair in signal_candles[strategy_name]: analysed_trades_dict[strategy_name][pair] = _analyze_candles_and_indicators( - pair, trades, signal_candles[strategy_name][pair]) + pair, trades, signal_candles[strategy_name][pair] + ) except Exception as e: print(f"Cannot process entry/exit reasons for {strategy_name}: ", e) @@ -67,28 +69,28 @@ def _analyze_candles_and_indicators(pair, trades: pd.DataFrame, signal_candles: buyf = signal_candles if len(buyf) > 0: - buyf = buyf.set_index('date', drop=False) - trades_red = trades.loc[trades['pair'] == pair].copy() + buyf = buyf.set_index("date", drop=False) + trades_red = trades.loc[trades["pair"] == pair].copy() trades_inds = pd.DataFrame() if trades_red.shape[0] > 0 and buyf.shape[0] > 0: for t, v in trades_red.open_date.items(): - allinds = buyf.loc[(buyf['date'] < v)] + allinds = buyf.loc[(buyf["date"] < v)] if allinds.shape[0] > 0: tmp_inds = allinds.iloc[[-1]] - trades_red.loc[t, 'signal_date'] = tmp_inds['date'].values[0] - trades_red.loc[t, 'enter_reason'] = trades_red.loc[t, 'enter_tag'] - tmp_inds.index.rename('signal_date', inplace=True) + trades_red.loc[t, "signal_date"] = tmp_inds["date"].values[0] + trades_red.loc[t, "enter_reason"] = trades_red.loc[t, "enter_tag"] + tmp_inds.index.rename("signal_date", inplace=True) trades_inds = pd.concat([trades_inds, tmp_inds]) - if 'signal_date' in trades_red: - trades_red['signal_date'] = pd.to_datetime(trades_red['signal_date'], utc=True) - trades_red.set_index('signal_date', inplace=True) + if "signal_date" in trades_red: + trades_red["signal_date"] = pd.to_datetime(trades_red["signal_date"], utc=True) + trades_red.set_index("signal_date", inplace=True) try: - trades_red = pd.merge(trades_red, trades_inds, on='signal_date', how='outer') + trades_red = pd.merge(trades_red, trades_inds, on="signal_date", how="outer") except Exception as e: raise e return trades_red @@ -96,138 +98,166 @@ def _analyze_candles_and_indicators(pair, trades: pd.DataFrame, signal_candles: return pd.DataFrame() -def _do_group_table_output(bigdf, glist, csv_path: Path, to_csv=False, ): +def _do_group_table_output( + bigdf, + glist, + csv_path: Path, + to_csv=False, +): for g in glist: # 0: summary wins/losses grouped by enter tag if g == "0": - group_mask = ['enter_reason'] - wins = bigdf.loc[bigdf['profit_abs'] >= 0] \ - .groupby(group_mask) \ - .agg({'profit_abs': ['sum']}) + group_mask = ["enter_reason"] + wins = ( + bigdf.loc[bigdf["profit_abs"] >= 0].groupby(group_mask).agg({"profit_abs": ["sum"]}) + ) - wins.columns = ['profit_abs_wins'] - loss = bigdf.loc[bigdf['profit_abs'] < 0] \ - .groupby(group_mask) \ - .agg({'profit_abs': ['sum']}) - loss.columns = ['profit_abs_loss'] + wins.columns = ["profit_abs_wins"] + loss = ( + bigdf.loc[bigdf["profit_abs"] < 0].groupby(group_mask).agg({"profit_abs": ["sum"]}) + ) + loss.columns = ["profit_abs_loss"] - new = bigdf.groupby(group_mask).agg({'profit_abs': [ - 'count', - lambda x: sum(x > 0), - lambda x: sum(x <= 0)]}) + new = bigdf.groupby(group_mask).agg( + {"profit_abs": ["count", lambda x: sum(x > 0), lambda x: sum(x <= 0)]} + ) new = pd.concat([new, wins, loss], axis=1).fillna(0) - new['profit_tot'] = new['profit_abs_wins'] - abs(new['profit_abs_loss']) - new['wl_ratio_pct'] = (new.iloc[:, 1] / new.iloc[:, 0] * 100).fillna(0) - new['avg_win'] = (new['profit_abs_wins'] / new.iloc[:, 1]).fillna(0) - new['avg_loss'] = (new['profit_abs_loss'] / new.iloc[:, 2]).fillna(0) + new["profit_tot"] = new["profit_abs_wins"] - abs(new["profit_abs_loss"]) + new["wl_ratio_pct"] = (new.iloc[:, 1] / new.iloc[:, 0] * 100).fillna(0) + new["avg_win"] = (new["profit_abs_wins"] / new.iloc[:, 1]).fillna(0) + new["avg_loss"] = (new["profit_abs_loss"] / new.iloc[:, 2]).fillna(0) - new['exp_ratio'] = ( - ( - (1 + (new['avg_win'] / abs(new['avg_loss']))) * (new['wl_ratio_pct'] / 100) - ) - 1).fillna(0) + new["exp_ratio"] = ( + ((1 + (new["avg_win"] / abs(new["avg_loss"]))) * (new["wl_ratio_pct"] / 100)) - 1 + ).fillna(0) - new.columns = ['total_num_buys', 'wins', 'losses', - 'profit_abs_wins', 'profit_abs_loss', - 'profit_tot', 'wl_ratio_pct', - 'avg_win', 'avg_loss', 'exp_ratio'] + new.columns = [ + "total_num_buys", + "wins", + "losses", + "profit_abs_wins", + "profit_abs_loss", + "profit_tot", + "wl_ratio_pct", + "avg_win", + "avg_loss", + "exp_ratio", + ] - sortcols = ['total_num_buys'] + sortcols = ["total_num_buys"] - _print_table(new, sortcols, show_index=True, name="Group 0:", - to_csv=to_csv, csv_path=csv_path) + _print_table( + new, sortcols, show_index=True, name="Group 0:", to_csv=to_csv, csv_path=csv_path + ) else: - agg_mask = {'profit_abs': ['count', 'sum', 'median', 'mean'], - 'profit_ratio': ['median', 'mean', 'sum']} - agg_cols = ['num_buys', 'profit_abs_sum', 'profit_abs_median', - 'profit_abs_mean', 'median_profit_pct', 'mean_profit_pct', - 'total_profit_pct'] - sortcols = ['profit_abs_sum', 'enter_reason'] + agg_mask = { + "profit_abs": ["count", "sum", "median", "mean"], + "profit_ratio": ["median", "mean", "sum"], + } + agg_cols = [ + "num_buys", + "profit_abs_sum", + "profit_abs_median", + "profit_abs_mean", + "median_profit_pct", + "mean_profit_pct", + "total_profit_pct", + ] + sortcols = ["profit_abs_sum", "enter_reason"] # 1: profit summaries grouped by enter_tag if g == "1": - group_mask = ['enter_reason'] + group_mask = ["enter_reason"] # 2: profit summaries grouped by enter_tag and exit_tag if g == "2": - group_mask = ['enter_reason', 'exit_reason'] + group_mask = ["enter_reason", "exit_reason"] # 3: profit summaries grouped by pair and enter_tag if g == "3": - group_mask = ['pair', 'enter_reason'] + group_mask = ["pair", "enter_reason"] # 4: profit summaries grouped by pair, enter_ and exit_tag (this can get quite large) if g == "4": - group_mask = ['pair', 'enter_reason', 'exit_reason'] + group_mask = ["pair", "enter_reason", "exit_reason"] # 5: profit summaries grouped by exit_tag if g == "5": - group_mask = ['exit_reason'] - sortcols = ['exit_reason'] + group_mask = ["exit_reason"] + sortcols = ["exit_reason"] if group_mask: new = bigdf.groupby(group_mask).agg(agg_mask).reset_index() new.columns = group_mask + agg_cols - new['median_profit_pct'] = new['median_profit_pct'] * 100 - new['mean_profit_pct'] = new['mean_profit_pct'] * 100 - new['total_profit_pct'] = new['total_profit_pct'] * 100 + new["median_profit_pct"] = new["median_profit_pct"] * 100 + new["mean_profit_pct"] = new["mean_profit_pct"] * 100 + new["total_profit_pct"] = new["total_profit_pct"] * 100 - _print_table(new, sortcols, name=f"Group {g}:", - to_csv=to_csv, csv_path=csv_path) + _print_table(new, sortcols, name=f"Group {g}:", to_csv=to_csv, csv_path=csv_path) else: logger.warning("Invalid group mask specified.") -def _do_rejected_signals_output(rejected_signals_df: pd.DataFrame, - to_csv: bool = False, csv_path=None) -> None: - cols = ['pair', 'date', 'enter_tag'] - sortcols = ['date', 'pair', 'enter_tag'] - _print_table(rejected_signals_df[cols], - sortcols, - show_index=False, - name="Rejected Signals:", - to_csv=to_csv, - csv_path=csv_path) +def _do_rejected_signals_output( + rejected_signals_df: pd.DataFrame, to_csv: bool = False, csv_path=None +) -> None: + cols = ["pair", "date", "enter_tag"] + sortcols = ["date", "pair", "enter_tag"] + _print_table( + rejected_signals_df[cols], + sortcols, + show_index=False, + name="Rejected Signals:", + to_csv=to_csv, + csv_path=csv_path, + ) -def _select_rows_within_dates(df, timerange=None, df_date_col: str = 'date'): +def _select_rows_within_dates(df, timerange=None, df_date_col: str = "date"): if timerange: - if timerange.starttype == 'date': + if timerange.starttype == "date": df = df.loc[(df[df_date_col] >= timerange.startdt)] - if timerange.stoptype == 'date': + if timerange.stoptype == "date": df = df.loc[(df[df_date_col] < timerange.stopdt)] return df def _select_rows_by_tags(df, enter_reason_list, exit_reason_list): if enter_reason_list and "all" not in enter_reason_list: - df = df.loc[(df['enter_reason'].isin(enter_reason_list))] + df = df.loc[(df["enter_reason"].isin(enter_reason_list))] if exit_reason_list and "all" not in exit_reason_list: - df = df.loc[(df['exit_reason'].isin(exit_reason_list))] + df = df.loc[(df["exit_reason"].isin(exit_reason_list))] return df -def prepare_results(analysed_trades, stratname, - enter_reason_list, exit_reason_list, - timerange=None): +def prepare_results( + analysed_trades, stratname, enter_reason_list, exit_reason_list, timerange=None +): res_df = pd.DataFrame() for pair, trades in analysed_trades[stratname].items(): - if (trades.shape[0] > 0): - trades.dropna(subset=['close_date'], inplace=True) + if trades.shape[0] > 0: + trades.dropna(subset=["close_date"], inplace=True) res_df = pd.concat([res_df, trades], ignore_index=True) res_df = _select_rows_within_dates(res_df, timerange) - if res_df is not None and res_df.shape[0] > 0 and ('enter_reason' in res_df.columns): + if res_df is not None and res_df.shape[0] > 0 and ("enter_reason" in res_df.columns): res_df = _select_rows_by_tags(res_df, enter_reason_list, exit_reason_list) return res_df -def print_results(res_df: pd.DataFrame, analysis_groups: List[str], indicator_list: List[str], - csv_path: Path, rejected_signals=None, to_csv=False): +def print_results( + res_df: pd.DataFrame, + analysis_groups: List[str], + indicator_list: List[str], + csv_path: Path, + rejected_signals=None, + to_csv=False, +): if res_df.shape[0] > 0: if analysis_groups: _do_group_table_output(res_df, analysis_groups, to_csv=to_csv, csv_path=csv_path) @@ -240,30 +270,31 @@ def print_results(res_df: pd.DataFrame, analysis_groups: List[str], indicator_li # NB this can be large for big dataframes! if "all" in indicator_list: - _print_table(res_df, - show_index=False, - name="Indicators:", - to_csv=to_csv, - csv_path=csv_path) + _print_table( + res_df, show_index=False, name="Indicators:", to_csv=to_csv, csv_path=csv_path + ) elif indicator_list is not None and indicator_list: available_inds = [] for ind in indicator_list: if ind in res_df: available_inds.append(ind) ilist = ["pair", "enter_reason", "exit_reason"] + available_inds - _print_table(res_df[ilist], - sortcols=['exit_reason'], - show_index=False, - name="Indicators:", - to_csv=to_csv, - csv_path=csv_path) + _print_table( + res_df[ilist], + sortcols=["exit_reason"], + show_index=False, + name="Indicators:", + to_csv=to_csv, + csv_path=csv_path, + ) else: print("\\No trades to show") -def _print_table(df: pd.DataFrame, sortcols=None, *, show_index=False, name=None, - to_csv=False, csv_path: Path): - if (sortcols is not None): +def _print_table( + df: pd.DataFrame, sortcols=None, *, show_index=False, name=None, to_csv=False, csv_path: Path +): + if sortcols is not None: data = df.sort_values(sortcols) else: data = df @@ -276,60 +307,64 @@ def _print_table(df: pd.DataFrame, sortcols=None, *, show_index=False, name=None if name is not None: print(name) - print( - tabulate( - data, - headers='keys', - tablefmt='psql', - showindex=show_index - ) - ) + print(tabulate(data, headers="keys", tablefmt="psql", showindex=show_index)) def process_entry_exit_reasons(config: Config): try: - analysis_groups = config.get('analysis_groups', []) - enter_reason_list = config.get('enter_reason_list', ["all"]) - exit_reason_list = config.get('exit_reason_list', ["all"]) - indicator_list = config.get('indicator_list', []) - do_rejected = config.get('analysis_rejected', False) - to_csv = config.get('analysis_to_csv', False) - csv_path = Path(config.get('analysis_csv_path', config['exportfilename'])) + analysis_groups = config.get("analysis_groups", []) + enter_reason_list = config.get("enter_reason_list", ["all"]) + exit_reason_list = config.get("exit_reason_list", ["all"]) + indicator_list = config.get("indicator_list", []) + do_rejected = config.get("analysis_rejected", False) + to_csv = config.get("analysis_to_csv", False) + csv_path = Path(config.get("analysis_csv_path", config["exportfilename"])) if to_csv and not csv_path.is_dir(): raise OperationalException(f"Specified directory {csv_path} does not exist.") - timerange = TimeRange.parse_timerange(None if config.get( - 'timerange') is None else str(config.get('timerange'))) + timerange = TimeRange.parse_timerange( + None if config.get("timerange") is None else str(config.get("timerange")) + ) - backtest_stats = load_backtest_stats(config['exportfilename']) + backtest_stats = load_backtest_stats(config["exportfilename"]) - for strategy_name, results in backtest_stats['strategy'].items(): - trades = load_backtest_data(config['exportfilename'], strategy_name) + for strategy_name, results in backtest_stats["strategy"].items(): + trades = load_backtest_data(config["exportfilename"], strategy_name) if trades is not None and not trades.empty: - signal_candles = _load_signal_candles(config['exportfilename']) + signal_candles = _load_signal_candles(config["exportfilename"]) rej_df = None if do_rejected: - rejected_signals_dict = _load_rejected_signals(config['exportfilename']) - rej_df = prepare_results(rejected_signals_dict, strategy_name, - enter_reason_list, exit_reason_list, - timerange=timerange) + rejected_signals_dict = _load_rejected_signals(config["exportfilename"]) + rej_df = prepare_results( + rejected_signals_dict, + strategy_name, + enter_reason_list, + exit_reason_list, + timerange=timerange, + ) analysed_trades_dict = _process_candles_and_indicators( - config['exchange']['pair_whitelist'], strategy_name, - trades, signal_candles) + config["exchange"]["pair_whitelist"], strategy_name, trades, signal_candles + ) - res_df = prepare_results(analysed_trades_dict, strategy_name, - enter_reason_list, exit_reason_list, - timerange=timerange) + res_df = prepare_results( + analysed_trades_dict, + strategy_name, + enter_reason_list, + exit_reason_list, + timerange=timerange, + ) - print_results(res_df, - analysis_groups, - indicator_list, - rejected_signals=rej_df, - to_csv=to_csv, - csv_path=csv_path) + print_results( + res_df, + analysis_groups, + indicator_list, + rejected_signals=rej_df, + to_csv=to_csv, + csv_path=csv_path, + ) except ValueError as e: raise OperationalException(e) from e diff --git a/freqtrade/data/history/__init__.py b/freqtrade/data/history/__init__.py index 88dddb112..bce4bc284 100644 --- a/freqtrade/data/history/__init__.py +++ b/freqtrade/data/history/__init__.py @@ -5,6 +5,7 @@ Includes: * load data for a pair (or a list of pairs) from disk * download data from exchange and store to disk """ + # flake8: noqa: F401 from .datahandlers import get_datahandler from .history_utils import ( diff --git a/freqtrade/data/history/datahandlers/featherdatahandler.py b/freqtrade/data/history/datahandlers/featherdatahandler.py index 6d57dbed7..8b1acb09c 100644 --- a/freqtrade/data/history/datahandlers/featherdatahandler.py +++ b/freqtrade/data/history/datahandlers/featherdatahandler.py @@ -14,11 +14,11 @@ logger = logging.getLogger(__name__) class FeatherDataHandler(IDataHandler): - _columns = DEFAULT_DATAFRAME_COLUMNS def ohlcv_store( - self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType) -> None: + self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType + ) -> None: """ Store data in json format "values". format looks as follows: @@ -33,11 +33,12 @@ class FeatherDataHandler(IDataHandler): self.create_dir_if_needed(filename) data.reset_index(drop=True).loc[:, self._columns].to_feather( - filename, compression_level=9, compression='lz4') + filename, compression_level=9, compression="lz4" + ) - def _ohlcv_load(self, pair: str, timeframe: str, - timerange: Optional[TimeRange], candle_type: CandleType - ) -> DataFrame: + def _ohlcv_load( + self, pair: str, timeframe: str, timerange: Optional[TimeRange], candle_type: CandleType + ) -> DataFrame: """ Internal method used to load data for one pair from disk. Implements the loading and conversion to a Pandas dataframe. @@ -50,28 +51,31 @@ class FeatherDataHandler(IDataHandler): :param candle_type: Any of the enum CandleType (must match trading mode!) :return: DataFrame with ohlcv data, or empty DataFrame """ - filename = self._pair_data_filename( - self._datadir, pair, timeframe, candle_type=candle_type) + filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type=candle_type) if not filename.exists(): # Fallback mode for 1M files filename = self._pair_data_filename( - self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True) + self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True + ) if not filename.exists(): return DataFrame(columns=self._columns) pairdata = read_feather(filename) pairdata.columns = self._columns - pairdata = pairdata.astype(dtype={'open': 'float', 'high': 'float', - 'low': 'float', 'close': 'float', 'volume': 'float'}) - pairdata['date'] = to_datetime(pairdata['date'], unit='ms', utc=True) + pairdata = pairdata.astype( + dtype={ + "open": "float", + "high": "float", + "low": "float", + "close": "float", + "volume": "float", + } + ) + pairdata["date"] = to_datetime(pairdata["date"], unit="ms", utc=True) return pairdata def ohlcv_append( - self, - pair: str, - timeframe: str, - data: DataFrame, - candle_type: CandleType + self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType ) -> None: """ Append data to existing data structures @@ -92,7 +96,7 @@ class FeatherDataHandler(IDataHandler): """ filename = self._pair_trades_filename(self._datadir, pair, trading_mode) self.create_dir_if_needed(filename) - data.reset_index(drop=True).to_feather(filename, compression_level=9, compression='lz4') + data.reset_index(drop=True).to_feather(filename, compression_level=9, compression="lz4") def trades_append(self, pair: str, data: DataFrame): """ @@ -104,7 +108,7 @@ class FeatherDataHandler(IDataHandler): raise NotImplementedError() def _trades_load( - self, pair: str, trading_mode: TradingMode, timerange: Optional[TimeRange] = None + self, pair: str, trading_mode: TradingMode, timerange: Optional[TimeRange] = None ) -> DataFrame: """ Load a pair from file, either .json.gz or .json diff --git a/freqtrade/data/history/datahandlers/hdf5datahandler.py b/freqtrade/data/history/datahandlers/hdf5datahandler.py index cb2cdd884..99d0a98a6 100644 --- a/freqtrade/data/history/datahandlers/hdf5datahandler.py +++ b/freqtrade/data/history/datahandlers/hdf5datahandler.py @@ -15,11 +15,11 @@ logger = logging.getLogger(__name__) class HDF5DataHandler(IDataHandler): - _columns = DEFAULT_DATAFRAME_COLUMNS def ohlcv_store( - self, pair: str, timeframe: str, data: pd.DataFrame, candle_type: CandleType) -> None: + self, pair: str, timeframe: str, data: pd.DataFrame, candle_type: CandleType + ) -> None: """ Store data in hdf5 file. :param pair: Pair - used to generate filename @@ -35,13 +35,18 @@ class HDF5DataHandler(IDataHandler): self.create_dir_if_needed(filename) _data.loc[:, self._columns].to_hdf( - filename, key=key, mode='a', complevel=9, complib='blosc', - format='table', data_columns=['date'] + filename, + key=key, + mode="a", + complevel=9, + complib="blosc", + format="table", + data_columns=["date"], ) - def _ohlcv_load(self, pair: str, timeframe: str, - timerange: Optional[TimeRange], candle_type: CandleType - ) -> pd.DataFrame: + def _ohlcv_load( + self, pair: str, timeframe: str, timerange: Optional[TimeRange], candle_type: CandleType + ) -> pd.DataFrame: """ Internal method used to load data for one pair from disk. Implements the loading and conversion to a Pandas dataframe. @@ -55,41 +60,40 @@ class HDF5DataHandler(IDataHandler): :return: DataFrame with ohlcv data, or empty DataFrame """ key = self._pair_ohlcv_key(pair, timeframe) - filename = self._pair_data_filename( - self._datadir, - pair, - timeframe, - candle_type=candle_type - ) + filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type=candle_type) if not filename.exists(): # Fallback mode for 1M files filename = self._pair_data_filename( - self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True) + self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True + ) if not filename.exists(): return pd.DataFrame(columns=self._columns) where = [] if timerange: - if timerange.starttype == 'date': + if timerange.starttype == "date": where.append(f"date >= Timestamp({timerange.startts * 1e9})") - if timerange.stoptype == 'date': + if timerange.stoptype == "date": where.append(f"date <= Timestamp({timerange.stopts * 1e9})") pairdata = pd.read_hdf(filename, key=key, mode="r", where=where) if list(pairdata.columns) != self._columns: raise ValueError("Wrong dataframe format") - pairdata = pairdata.astype(dtype={'open': 'float', 'high': 'float', - 'low': 'float', 'close': 'float', 'volume': 'float'}) + pairdata = pairdata.astype( + dtype={ + "open": "float", + "high": "float", + "low": "float", + "close": "float", + "volume": "float", + } + ) pairdata = pairdata.reset_index(drop=True) return pairdata def ohlcv_append( - self, - pair: str, - timeframe: str, - data: pd.DataFrame, - candle_type: CandleType + self, pair: str, timeframe: str, data: pd.DataFrame, candle_type: CandleType ) -> None: """ Append data to existing data structures @@ -111,9 +115,13 @@ class HDF5DataHandler(IDataHandler): key = self._pair_trades_key(pair) data.to_hdf( - self._pair_trades_filename(self._datadir, pair, trading_mode), key=key, - mode='a', complevel=9, complib='blosc', - format='table', data_columns=['timestamp'] + self._pair_trades_filename(self._datadir, pair, trading_mode), + key=key, + mode="a", + complevel=9, + complib="blosc", + format="table", + data_columns=["timestamp"], ) def trades_append(self, pair: str, data: pd.DataFrame): @@ -142,13 +150,13 @@ class HDF5DataHandler(IDataHandler): return pd.DataFrame(columns=DEFAULT_TRADES_COLUMNS) where = [] if timerange: - if timerange.starttype == 'date': + if timerange.starttype == "date": where.append(f"timestamp >= {timerange.startts * 1e3}") - if timerange.stoptype == 'date': + if timerange.stoptype == "date": where.append(f"timestamp < {timerange.stopts * 1e3}") trades: pd.DataFrame = pd.read_hdf(filename, key=key, mode="r", where=where) - trades[['id', 'type']] = trades[['id', 'type']].replace({np.nan: None}) + trades[["id", "type"]] = trades[["id", "type"]].replace({np.nan: None}) return trades @classmethod @@ -158,7 +166,7 @@ class HDF5DataHandler(IDataHandler): @classmethod def _pair_ohlcv_key(cls, pair: str, timeframe: str) -> str: # Escape futures pairs to avoid warnings - pair_esc = pair.replace(':', '_') + pair_esc = pair.replace(":", "_") return f"{pair_esc}/ohlcv/tf_{timeframe}" @classmethod diff --git a/freqtrade/data/history/datahandlers/idatahandler.py b/freqtrade/data/history/datahandlers/idatahandler.py index d7c34b7fe..e335ea770 100644 --- a/freqtrade/data/history/datahandlers/idatahandler.py +++ b/freqtrade/data/history/datahandlers/idatahandler.py @@ -3,6 +3,7 @@ Abstract datahandler interface. It's subclasses handle and storing data from disk. """ + import logging import re from abc import ABC, abstractmethod @@ -30,8 +31,7 @@ logger = logging.getLogger(__name__) class IDataHandler(ABC): - - _OHLCV_REGEX = r'^([a-zA-Z_\d-]+)\-(\d+[a-zA-Z]{1,2})\-?([a-zA-Z_]*)?(?=\.)' + _OHLCV_REGEX = r"^([a-zA-Z_\d-]+)\-(\d+[a-zA-Z]{1,2})\-?([a-zA-Z_]*)?(?=\.)" def __init__(self, datadir: Path) -> None: self._datadir = datadir @@ -45,7 +45,8 @@ class IDataHandler(ABC): @classmethod def ohlcv_get_available_data( - cls, datadir: Path, trading_mode: TradingMode) -> ListPairsWithTimeframes: + cls, datadir: Path, trading_mode: TradingMode + ) -> ListPairsWithTimeframes: """ Returns a list of all pairs with ohlcv data available in this datadir :param datadir: Directory to search for ohlcv files @@ -53,17 +54,20 @@ class IDataHandler(ABC): :return: List of Tuples of (pair, timeframe, CandleType) """ if trading_mode == TradingMode.FUTURES: - datadir = datadir.joinpath('futures') + datadir = datadir.joinpath("futures") _tmp = [ - re.search( - cls._OHLCV_REGEX, p.name - ) for p in datadir.glob(f"*.{cls._get_file_extension()}")] + re.search(cls._OHLCV_REGEX, p.name) + for p in datadir.glob(f"*.{cls._get_file_extension()}") + ] return [ ( cls.rebuild_pair_from_filename(match[1]), cls.rebuild_timeframe_from_filename(match[2]), - CandleType.from_string(match[3]) - ) for match in _tmp if match and len(match.groups()) > 1] + CandleType.from_string(match[3]), + ) + for match in _tmp + if match and len(match.groups()) > 1 + ] @classmethod def ohlcv_get_pairs(cls, datadir: Path, timeframe: str, candle_type: CandleType) -> List[str]: @@ -77,17 +81,20 @@ class IDataHandler(ABC): """ candle = "" if candle_type != CandleType.SPOT: - datadir = datadir.joinpath('futures') + datadir = datadir.joinpath("futures") candle = f"-{candle_type}" ext = cls._get_file_extension() - _tmp = [re.search(r'^(\S+)(?=\-' + timeframe + candle + f'.{ext})', p.name) - for p in datadir.glob(f"*{timeframe}{candle}.{ext}")] + _tmp = [ + re.search(r"^(\S+)(?=\-" + timeframe + candle + f".{ext})", p.name) + for p in datadir.glob(f"*{timeframe}{candle}.{ext}") + ] # Check if regex found something and only return these results return [cls.rebuild_pair_from_filename(match[0]) for match in _tmp if match] @abstractmethod def ohlcv_store( - self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType) -> None: + self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType + ) -> None: """ Store ohlcv data. :param pair: Pair - used to generate filename @@ -97,8 +104,9 @@ class IDataHandler(ABC): :return: None """ - def ohlcv_data_min_max(self, pair: str, timeframe: str, - candle_type: CandleType) -> Tuple[datetime, datetime, int]: + def ohlcv_data_min_max( + self, pair: str, timeframe: str, candle_type: CandleType + ) -> Tuple[datetime, datetime, int]: """ Returns the min and max timestamp for the given pair and timeframe. :param pair: Pair to get min/max for @@ -113,12 +121,12 @@ class IDataHandler(ABC): datetime.fromtimestamp(0, tz=timezone.utc), 0, ) - return df.iloc[0]['date'].to_pydatetime(), df.iloc[-1]['date'].to_pydatetime(), len(df) + return df.iloc[0]["date"].to_pydatetime(), df.iloc[-1]["date"].to_pydatetime(), len(df) @abstractmethod - def _ohlcv_load(self, pair: str, timeframe: str, timerange: Optional[TimeRange], - candle_type: CandleType - ) -> DataFrame: + def _ohlcv_load( + self, pair: str, timeframe: str, timerange: Optional[TimeRange], candle_type: CandleType + ) -> DataFrame: """ Internal method used to load data for one pair from disk. Implements the loading and conversion to a Pandas dataframe. @@ -148,11 +156,7 @@ class IDataHandler(ABC): @abstractmethod def ohlcv_append( - self, - pair: str, - timeframe: str, - data: DataFrame, - candle_type: CandleType + self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType ) -> None: """ Append data to existing data structures @@ -170,8 +174,10 @@ class IDataHandler(ABC): :return: List of Pairs """ _ext = cls._get_file_extension() - _tmp = [re.search(r'^(\S+)(?=\-trades.' + _ext + ')', p.name) - for p in datadir.glob(f"*trades.{_ext}")] + _tmp = [ + re.search(r"^(\S+)(?=\-trades." + _ext + ")", p.name) + for p in datadir.glob(f"*trades.{_ext}") + ] # Check if regex found something and only return these results to avoid exceptions. return [cls.rebuild_pair_from_filename(match[0]) for match in _tmp if match] @@ -231,7 +237,7 @@ class IDataHandler(ABC): return False def trades_load( - self, pair: str, trading_mode: TradingMode, timerange: Optional[TimeRange] = None + self, pair: str, trading_mode: TradingMode, timerange: Optional[TimeRange] = None ) -> DataFrame: """ Load a pair from file, either .json.gz or .json @@ -264,7 +270,7 @@ class IDataHandler(ABC): pair: str, timeframe: str, candle_type: CandleType, - no_timeframe_modify: bool = False + no_timeframe_modify: bool = False, ) -> Path: pair_s = misc.pair_to_filename(pair) candle = "" @@ -272,10 +278,9 @@ class IDataHandler(ABC): timeframe = cls.timeframe_to_file(timeframe) if candle_type != CandleType.SPOT: - datadir = datadir.joinpath('futures') + datadir = datadir.joinpath("futures") candle = f"-{candle_type}" - filename = datadir.joinpath( - f'{pair_s}-{timeframe}{candle}.{cls._get_file_extension()}') + filename = datadir.joinpath(f"{pair_s}-{timeframe}{candle}.{cls._get_file_extension()}") return filename @classmethod @@ -283,14 +288,14 @@ class IDataHandler(ABC): pair_s = misc.pair_to_filename(pair) if trading_mode == TradingMode.FUTURES: # Futures pair ... - datadir = datadir.joinpath('futures') + datadir = datadir.joinpath("futures") - filename = datadir.joinpath(f'{pair_s}-trades.{cls._get_file_extension()}') + filename = datadir.joinpath(f"{pair_s}-trades.{cls._get_file_extension()}") return filename @staticmethod def timeframe_to_file(timeframe: str): - return timeframe.replace('M', 'Mo') + return timeframe.replace("M", "Mo") @staticmethod def rebuild_timeframe_from_filename(timeframe: str) -> str: @@ -298,7 +303,7 @@ class IDataHandler(ABC): converts timeframe from disk to file Replaces mo with M (to avoid problems on case-insensitive filesystems) """ - return re.sub('1mo', '1M', timeframe, flags=re.IGNORECASE) + return re.sub("1mo", "1M", timeframe, flags=re.IGNORECASE) @staticmethod def rebuild_pair_from_filename(pair: str) -> str: @@ -306,18 +311,22 @@ class IDataHandler(ABC): Rebuild pair name from filename Assumes a asset name of max. 7 length to also support BTC-PERP and BTC-PERP:USD names. """ - res = re.sub(r'^(([A-Za-z\d]{1,10})|^([A-Za-z\-]{1,6}))(_)', r'\g<1>/', pair, count=1) - res = re.sub('_', ':', res, count=1) + res = re.sub(r"^(([A-Za-z\d]{1,10})|^([A-Za-z\-]{1,6}))(_)", r"\g<1>/", pair, count=1) + res = re.sub("_", ":", res, count=1) return res - def ohlcv_load(self, pair, timeframe: str, - candle_type: CandleType, *, - timerange: Optional[TimeRange] = None, - fill_missing: bool = True, - drop_incomplete: bool = False, - startup_candles: int = 0, - warn_no_data: bool = True, - ) -> DataFrame: + def ohlcv_load( + self, + pair, + timeframe: str, + candle_type: CandleType, + *, + timerange: Optional[TimeRange] = None, + fill_missing: bool = True, + drop_incomplete: bool = False, + startup_candles: int = 0, + warn_no_data: bool = True, + ) -> DataFrame: """ Load cached candle (OHLCV) data for the given pair. @@ -337,15 +346,12 @@ class IDataHandler(ABC): timerange_startup.subtract_start(timeframe_to_seconds(timeframe) * startup_candles) pairdf = self._ohlcv_load( - pair, - timeframe, - timerange=timerange_startup, - candle_type=candle_type + pair, timeframe, timerange=timerange_startup, candle_type=candle_type ) if self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data): return pairdf else: - enddate = pairdf.iloc[-1]['date'] + enddate = pairdf.iloc[-1]["date"] if timerange_startup: self._validate_pairdata(pair, pairdf, timeframe, candle_type, timerange_startup) @@ -354,17 +360,25 @@ class IDataHandler(ABC): return pairdf # incomplete candles should only be dropped if we didn't trim the end beforehand. - pairdf = clean_ohlcv_dataframe(pairdf, timeframe, - pair=pair, - fill_missing=fill_missing, - drop_incomplete=(drop_incomplete and - enddate == pairdf.iloc[-1]['date'])) + pairdf = clean_ohlcv_dataframe( + pairdf, + timeframe, + pair=pair, + fill_missing=fill_missing, + drop_incomplete=(drop_incomplete and enddate == pairdf.iloc[-1]["date"]), + ) self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data) return pairdf def _check_empty_df( - self, pairdf: DataFrame, pair: str, timeframe: str, candle_type: CandleType, - warn_no_data: bool, warn_price: bool = False) -> bool: + self, + pairdf: DataFrame, + pair: str, + timeframe: str, + candle_type: CandleType, + warn_no_data: bool, + warn_price: bool = False, + ) -> bool: """ Warn on empty dataframe """ @@ -377,39 +391,55 @@ class IDataHandler(ABC): return True elif warn_price: candle_price_gap = 0 - if (candle_type in (CandleType.SPOT, CandleType.FUTURES) and - not pairdf.empty - and 'close' in pairdf.columns and 'open' in pairdf.columns): + if ( + candle_type in (CandleType.SPOT, CandleType.FUTURES) + and not pairdf.empty + and "close" in pairdf.columns + and "open" in pairdf.columns + ): # Detect gaps between prior close and open - gaps = ((pairdf['open'] - pairdf['close'].shift(1)) / pairdf['close'].shift(1)) + gaps = (pairdf["open"] - pairdf["close"].shift(1)) / pairdf["close"].shift(1) gaps = gaps.dropna() if len(gaps): candle_price_gap = max(abs(gaps)) if candle_price_gap > 0.1: - logger.info(f"Price jump in {pair}, {timeframe}, {candle_type} between two candles " - f"of {candle_price_gap:.2%} detected.") + logger.info( + f"Price jump in {pair}, {timeframe}, {candle_type} between two candles " + f"of {candle_price_gap:.2%} detected." + ) return False - def _validate_pairdata(self, pair, pairdata: DataFrame, timeframe: str, - candle_type: CandleType, timerange: TimeRange): + def _validate_pairdata( + self, + pair, + pairdata: DataFrame, + timeframe: str, + candle_type: CandleType, + timerange: TimeRange, + ): """ Validates pairdata for missing data at start end end and logs warnings. :param pairdata: Dataframe to validate :param timerange: Timerange specified for start and end dates """ - if timerange.starttype == 'date': - if pairdata.iloc[0]['date'] > timerange.startdt: - logger.warning(f"{pair}, {candle_type}, {timeframe}, " - f"data starts at {pairdata.iloc[0]['date']:%Y-%m-%d %H:%M:%S}") - if timerange.stoptype == 'date': - if pairdata.iloc[-1]['date'] < timerange.stopdt: - logger.warning(f"{pair}, {candle_type}, {timeframe}, " - f"data ends at {pairdata.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}") + if timerange.starttype == "date": + if pairdata.iloc[0]["date"] > timerange.startdt: + logger.warning( + f"{pair}, {candle_type}, {timeframe}, " + f"data starts at {pairdata.iloc[0]['date']:%Y-%m-%d %H:%M:%S}" + ) + if timerange.stoptype == "date": + if pairdata.iloc[-1]["date"] < timerange.stopdt: + logger.warning( + f"{pair}, {candle_type}, {timeframe}, " + f"data ends at {pairdata.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}" + ) def rename_futures_data( - self, pair: str, new_pair: str, timeframe: str, candle_type: CandleType): + self, pair: str, new_pair: str, timeframe: str, candle_type: CandleType + ): """ Temporary method to migrate data from old naming to new naming (BTC/USDT -> BTC/USDT:USDT) Only used for binance to support the binance futures naming unification. @@ -435,18 +465,19 @@ class IDataHandler(ABC): if funding_rate_combs: logger.warning( - f'Migrating {len(funding_rate_combs)} funding fees to correct timeframe.') + f"Migrating {len(funding_rate_combs)} funding fees to correct timeframe." + ) for pair, timeframe, candletype in funding_rate_combs: old_name = self._pair_data_filename(self._datadir, pair, timeframe, candletype) new_name = self._pair_data_filename(self._datadir, pair, ff_timeframe, candletype) if not Path(old_name).exists(): - logger.warning(f'{old_name} does not exist, skipping.') + logger.warning(f"{old_name} does not exist, skipping.") continue if Path(new_name).exists(): - logger.warning(f'{new_name} already exists, Removing.') + logger.warning(f"{new_name} already exists, Removing.") Path(new_name).unlink() Path(old_name).rename(new_name) @@ -461,27 +492,33 @@ def get_datahandlerclass(datatype: str) -> Type[IDataHandler]: :return: Datahandler class """ - if datatype == 'json': + if datatype == "json": from .jsondatahandler import JsonDataHandler + return JsonDataHandler - elif datatype == 'jsongz': + elif datatype == "jsongz": from .jsondatahandler import JsonGzDataHandler + return JsonGzDataHandler - elif datatype == 'hdf5': + elif datatype == "hdf5": from .hdf5datahandler import HDF5DataHandler + return HDF5DataHandler - elif datatype == 'feather': + elif datatype == "feather": from .featherdatahandler import FeatherDataHandler + return FeatherDataHandler - elif datatype == 'parquet': + elif datatype == "parquet": from .parquetdatahandler import ParquetDataHandler + return ParquetDataHandler else: raise ValueError(f"No datahandler for datatype {datatype} available.") -def get_datahandler(datadir: Path, data_format: Optional[str] = None, - data_handler: Optional[IDataHandler] = None) -> IDataHandler: +def get_datahandler( + datadir: Path, data_format: Optional[str] = None, data_handler: Optional[IDataHandler] = None +) -> IDataHandler: """ :param datadir: Folder to save data :param data_format: dataformat to use @@ -489,6 +526,6 @@ def get_datahandler(datadir: Path, data_format: Optional[str] = None, """ if not data_handler: - HandlerClass = get_datahandlerclass(data_format or 'feather') + HandlerClass = get_datahandlerclass(data_format or "feather") data_handler = HandlerClass(datadir) return data_handler diff --git a/freqtrade/data/history/datahandlers/jsondatahandler.py b/freqtrade/data/history/datahandlers/jsondatahandler.py index 2d0333fed..b97b4b867 100644 --- a/freqtrade/data/history/datahandlers/jsondatahandler.py +++ b/freqtrade/data/history/datahandlers/jsondatahandler.py @@ -17,12 +17,12 @@ logger = logging.getLogger(__name__) class JsonDataHandler(IDataHandler): - _use_zip = False _columns = DEFAULT_DATAFRAME_COLUMNS def ohlcv_store( - self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType) -> None: + self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType + ) -> None: """ Store data in json format "values". format looks as follows: @@ -37,16 +37,16 @@ class JsonDataHandler(IDataHandler): self.create_dir_if_needed(filename) _data = data.copy() # Convert date to int - _data['date'] = _data['date'].astype(np.int64) // 1000 // 1000 + _data["date"] = _data["date"].astype(np.int64) // 1000 // 1000 # Reset index, select only appropriate columns and save as json _data.reset_index(drop=True).loc[:, self._columns].to_json( - filename, orient="values", - compression='gzip' if self._use_zip else None) + filename, orient="values", compression="gzip" if self._use_zip else None + ) - def _ohlcv_load(self, pair: str, timeframe: str, - timerange: Optional[TimeRange], candle_type: CandleType - ) -> DataFrame: + def _ohlcv_load( + self, pair: str, timeframe: str, timerange: Optional[TimeRange], candle_type: CandleType + ) -> DataFrame: """ Internal method used to load data for one pair from disk. Implements the loading and conversion to a Pandas dataframe. @@ -59,31 +59,34 @@ class JsonDataHandler(IDataHandler): :param candle_type: Any of the enum CandleType (must match trading mode!) :return: DataFrame with ohlcv data, or empty DataFrame """ - filename = self._pair_data_filename( - self._datadir, pair, timeframe, candle_type=candle_type) + filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type=candle_type) if not filename.exists(): # Fallback mode for 1M files filename = self._pair_data_filename( - self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True) + self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True + ) if not filename.exists(): return DataFrame(columns=self._columns) try: - pairdata = read_json(filename, orient='values') + pairdata = read_json(filename, orient="values") pairdata.columns = self._columns except ValueError: logger.error(f"Could not load data for {pair}.") return DataFrame(columns=self._columns) - pairdata = pairdata.astype(dtype={'open': 'float', 'high': 'float', - 'low': 'float', 'close': 'float', 'volume': 'float'}) - pairdata['date'] = to_datetime(pairdata['date'], unit='ms', utc=True) + pairdata = pairdata.astype( + dtype={ + "open": "float", + "high": "float", + "low": "float", + "close": "float", + "volume": "float", + } + ) + pairdata["date"] = to_datetime(pairdata["date"], unit="ms", utc=True) return pairdata def ohlcv_append( - self, - pair: str, - timeframe: str, - data: DataFrame, - candle_type: CandleType + self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType ) -> None: """ Append data to existing data structures @@ -145,5 +148,4 @@ class JsonDataHandler(IDataHandler): class JsonGzDataHandler(JsonDataHandler): - _use_zip = True diff --git a/freqtrade/data/history/datahandlers/parquetdatahandler.py b/freqtrade/data/history/datahandlers/parquetdatahandler.py index 01becdc84..e226d4749 100644 --- a/freqtrade/data/history/datahandlers/parquetdatahandler.py +++ b/freqtrade/data/history/datahandlers/parquetdatahandler.py @@ -14,11 +14,11 @@ logger = logging.getLogger(__name__) class ParquetDataHandler(IDataHandler): - _columns = DEFAULT_DATAFRAME_COLUMNS def ohlcv_store( - self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType) -> None: + self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType + ) -> None: """ Store data in json format "values". format looks as follows: @@ -34,9 +34,9 @@ class ParquetDataHandler(IDataHandler): data.reset_index(drop=True).loc[:, self._columns].to_parquet(filename) - def _ohlcv_load(self, pair: str, timeframe: str, - timerange: Optional[TimeRange], candle_type: CandleType - ) -> DataFrame: + def _ohlcv_load( + self, pair: str, timeframe: str, timerange: Optional[TimeRange], candle_type: CandleType + ) -> DataFrame: """ Internal method used to load data for one pair from disk. Implements the loading and conversion to a Pandas dataframe. @@ -49,28 +49,31 @@ class ParquetDataHandler(IDataHandler): :param candle_type: Any of the enum CandleType (must match trading mode!) :return: DataFrame with ohlcv data, or empty DataFrame """ - filename = self._pair_data_filename( - self._datadir, pair, timeframe, candle_type=candle_type) + filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type=candle_type) if not filename.exists(): # Fallback mode for 1M files filename = self._pair_data_filename( - self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True) + self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True + ) if not filename.exists(): return DataFrame(columns=self._columns) pairdata = read_parquet(filename) pairdata.columns = self._columns - pairdata = pairdata.astype(dtype={'open': 'float', 'high': 'float', - 'low': 'float', 'close': 'float', 'volume': 'float'}) - pairdata['date'] = to_datetime(pairdata['date'], unit='ms', utc=True) + pairdata = pairdata.astype( + dtype={ + "open": "float", + "high": "float", + "low": "float", + "close": "float", + "volume": "float", + } + ) + pairdata["date"] = to_datetime(pairdata["date"], unit="ms", utc=True) return pairdata def ohlcv_append( - self, - pair: str, - timeframe: str, - data: DataFrame, - candle_type: CandleType + self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType ) -> None: """ Append data to existing data structures diff --git a/freqtrade/data/history/history_utils.py b/freqtrade/data/history/history_utils.py index 370c47467..c14263e0c 100644 --- a/freqtrade/data/history/history_utils.py +++ b/freqtrade/data/history/history_utils.py @@ -34,17 +34,19 @@ from freqtrade.util.migrations import migrate_data logger = logging.getLogger(__name__) -def load_pair_history(pair: str, - timeframe: str, - datadir: Path, *, - timerange: Optional[TimeRange] = None, - fill_up_missing: bool = True, - drop_incomplete: bool = False, - startup_candles: int = 0, - data_format: Optional[str] = None, - data_handler: Optional[IDataHandler] = None, - candle_type: CandleType = CandleType.SPOT - ) -> DataFrame: +def load_pair_history( + pair: str, + timeframe: str, + datadir: Path, + *, + timerange: Optional[TimeRange] = None, + fill_up_missing: bool = True, + drop_incomplete: bool = False, + startup_candles: int = 0, + data_format: Optional[str] = None, + data_handler: Optional[IDataHandler] = None, + candle_type: CandleType = CandleType.SPOT, +) -> DataFrame: """ Load cached ohlcv history for the given pair. @@ -63,27 +65,30 @@ def load_pair_history(pair: str, """ data_handler = get_datahandler(datadir, data_format, data_handler) - return data_handler.ohlcv_load(pair=pair, - timeframe=timeframe, - timerange=timerange, - fill_missing=fill_up_missing, - drop_incomplete=drop_incomplete, - startup_candles=startup_candles, - candle_type=candle_type, - ) + return data_handler.ohlcv_load( + pair=pair, + timeframe=timeframe, + timerange=timerange, + fill_missing=fill_up_missing, + drop_incomplete=drop_incomplete, + startup_candles=startup_candles, + candle_type=candle_type, + ) -def load_data(datadir: Path, - timeframe: str, - pairs: List[str], *, - timerange: Optional[TimeRange] = None, - fill_up_missing: bool = True, - startup_candles: int = 0, - fail_without_data: bool = False, - data_format: str = 'feather', - candle_type: CandleType = CandleType.SPOT, - user_futures_funding_rate: Optional[int] = None, - ) -> Dict[str, DataFrame]: +def load_data( + datadir: Path, + timeframe: str, + pairs: List[str], + *, + timerange: Optional[TimeRange] = None, + fill_up_missing: bool = True, + startup_candles: int = 0, + fail_without_data: bool = False, + data_format: str = "feather", + candle_type: CandleType = CandleType.SPOT, + user_futures_funding_rate: Optional[int] = None, +) -> Dict[str, DataFrame]: """ Load ohlcv history data for a list of pairs. @@ -100,18 +105,21 @@ def load_data(datadir: Path, """ result: Dict[str, DataFrame] = {} if startup_candles > 0 and timerange: - logger.info(f'Using indicator startup period: {startup_candles} ...') + logger.info(f"Using indicator startup period: {startup_candles} ...") data_handler = get_datahandler(datadir, data_format) for pair in pairs: - hist = load_pair_history(pair=pair, timeframe=timeframe, - datadir=datadir, timerange=timerange, - fill_up_missing=fill_up_missing, - startup_candles=startup_candles, - data_handler=data_handler, - candle_type=candle_type, - ) + hist = load_pair_history( + pair=pair, + timeframe=timeframe, + datadir=datadir, + timerange=timerange, + fill_up_missing=fill_up_missing, + startup_candles=startup_candles, + data_handler=data_handler, + candle_type=candle_type, + ) if not hist.empty: result[pair] = hist else: @@ -125,14 +133,16 @@ def load_data(datadir: Path, return result -def refresh_data(*, datadir: Path, - timeframe: str, - pairs: List[str], - exchange: Exchange, - data_format: Optional[str] = None, - timerange: Optional[TimeRange] = None, - candle_type: CandleType, - ) -> None: +def refresh_data( + *, + datadir: Path, + timeframe: str, + pairs: List[str], + exchange: Exchange, + data_format: Optional[str] = None, + timerange: Optional[TimeRange] = None, + candle_type: CandleType, +) -> None: """ Refresh ohlcv history data for a list of pairs. @@ -146,11 +156,17 @@ def refresh_data(*, datadir: Path, """ data_handler = get_datahandler(datadir, data_format) for idx, pair in enumerate(pairs): - process = f'{idx}/{len(pairs)}' - _download_pair_history(pair=pair, process=process, - timeframe=timeframe, datadir=datadir, - timerange=timerange, exchange=exchange, data_handler=data_handler, - candle_type=candle_type) + process = f"{idx}/{len(pairs)}" + _download_pair_history( + pair=pair, + process=process, + timeframe=timeframe, + datadir=datadir, + timerange=timerange, + exchange=exchange, + data_handler=data_handler, + candle_type=candle_type, + ) def _load_cached_data_for_updating( @@ -172,42 +188,49 @@ def _load_cached_data_for_updating( start = None end = None if timerange: - if timerange.starttype == 'date': + if timerange.starttype == "date": start = timerange.startdt - if timerange.stoptype == 'date': + if timerange.stoptype == "date": end = timerange.stopdt # Intentionally don't pass timerange in - since we need to load the full dataset. - data = data_handler.ohlcv_load(pair, timeframe=timeframe, - timerange=None, fill_missing=False, - drop_incomplete=True, warn_no_data=False, - candle_type=candle_type) + data = data_handler.ohlcv_load( + pair, + timeframe=timeframe, + timerange=None, + fill_missing=False, + drop_incomplete=True, + warn_no_data=False, + candle_type=candle_type, + ) if not data.empty: - if not prepend and start and start < data.iloc[0]['date']: + if not prepend and start and start < data.iloc[0]["date"]: # Earlier data than existing data requested, redownload all data = DataFrame(columns=DEFAULT_DATAFRAME_COLUMNS) else: if prepend: - end = data.iloc[0]['date'] + end = data.iloc[0]["date"] else: - start = data.iloc[-1]['date'] + start = data.iloc[-1]["date"] start_ms = int(start.timestamp() * 1000) if start else None end_ms = int(end.timestamp() * 1000) if end else None return data, start_ms, end_ms -def _download_pair_history(pair: str, *, - datadir: Path, - exchange: Exchange, - timeframe: str = '5m', - process: str = '', - new_pairs_days: int = 30, - data_handler: Optional[IDataHandler] = None, - timerange: Optional[TimeRange] = None, - candle_type: CandleType, - erase: bool = False, - prepend: bool = False, - ) -> bool: +def _download_pair_history( + pair: str, + *, + datadir: Path, + exchange: Exchange, + timeframe: str = "5m", + process: str = "", + new_pairs_days: int = 30, + data_handler: Optional[IDataHandler] = None, + timerange: Optional[TimeRange] = None, + candle_type: CandleType, + erase: bool = False, + prepend: bool = False, +) -> bool: """ Download latest candles from the exchange for the pair and timeframe passed in parameters The data is downloaded starting from the last correct data that @@ -226,54 +249,69 @@ def _download_pair_history(pair: str, *, try: if erase: if data_handler.ohlcv_purge(pair, timeframe, candle_type=candle_type): - logger.info(f'Deleting existing data for pair {pair}, {timeframe}, {candle_type}.') + logger.info(f"Deleting existing data for pair {pair}, {timeframe}, {candle_type}.") data, since_ms, until_ms = _load_cached_data_for_updating( - pair, timeframe, timerange, + pair, + timeframe, + timerange, data_handler=data_handler, candle_type=candle_type, - prepend=prepend) + prepend=prepend, + ) - logger.info(f'({process}) - Download history data for "{pair}", {timeframe}, ' - f'{candle_type} and store in {datadir}. ' - f'From {format_ms_time(since_ms) if since_ms else "start"} to ' - f'{format_ms_time(until_ms) if until_ms else "now"}' - ) + logger.info( + f'({process}) - Download history data for "{pair}", {timeframe}, ' + f'{candle_type} and store in {datadir}. ' + f'From {format_ms_time(since_ms) if since_ms else "start"} to ' + f'{format_ms_time(until_ms) if until_ms else "now"}' + ) - logger.debug("Current Start: %s", - f"{data.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}" - if not data.empty else 'None') - logger.debug("Current End: %s", - f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" - if not data.empty else 'None') + logger.debug( + "Current Start: %s", + f"{data.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None", + ) + logger.debug( + "Current End: %s", + f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None", + ) # Default since_ms to 30 days if nothing is given - new_data = exchange.get_historic_ohlcv(pair=pair, - timeframe=timeframe, - since_ms=since_ms if since_ms else - int((datetime.now() - timedelta(days=new_pairs_days) - ).timestamp()) * 1000, - is_new_pair=data.empty, - candle_type=candle_type, - until_ms=until_ms if until_ms else None - ) + new_data = exchange.get_historic_ohlcv( + pair=pair, + timeframe=timeframe, + since_ms=since_ms + if since_ms + else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000, + is_new_pair=data.empty, + candle_type=candle_type, + until_ms=until_ms if until_ms else None, + ) # TODO: Maybe move parsing to exchange class (?) - new_dataframe = ohlcv_to_dataframe(new_data, timeframe, pair, - fill_missing=False, drop_incomplete=True) + new_dataframe = ohlcv_to_dataframe( + new_data, timeframe, pair, fill_missing=False, drop_incomplete=True + ) if data.empty: data = new_dataframe else: # Run cleaning again to ensure there were no duplicate candles # Especially between existing and new data. - data = clean_ohlcv_dataframe(concat([data, new_dataframe], axis=0), timeframe, pair, - fill_missing=False, drop_incomplete=False) + data = clean_ohlcv_dataframe( + concat([data, new_dataframe], axis=0), + timeframe, + pair, + fill_missing=False, + drop_incomplete=False, + ) - logger.debug("New Start: %s", - f"{data.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}" - if not data.empty else 'None') - logger.debug("New End: %s", - f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" - if not data.empty else 'None') + logger.debug( + "New Start: %s", + f"{data.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None", + ) + logger.debug( + "New End: %s", + f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None", + ) data_handler.ohlcv_store(pair, timeframe, data=data, candle_type=candle_type) return True @@ -285,13 +323,18 @@ def _download_pair_history(pair: str, *, return False -def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes: List[str], - datadir: Path, trading_mode: str, - timerange: Optional[TimeRange] = None, - new_pairs_days: int = 30, erase: bool = False, - data_format: Optional[str] = None, - prepend: bool = False, - ) -> List[str]: +def refresh_backtest_ohlcv_data( + exchange: Exchange, + pairs: List[str], + timeframes: List[str], + datadir: Path, + trading_mode: str, + timerange: Optional[TimeRange] = None, + new_pairs_days: int = 30, + erase: bool = False, + data_format: Optional[str] = None, + prepend: bool = False, +) -> List[str]: """ Refresh stored ohlcv data for backtesting and hyperopt operations. Used by freqtrade download-data subcommand. @@ -300,63 +343,77 @@ def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes pairs_not_available = [] data_handler = get_datahandler(datadir, data_format) candle_type = CandleType.get_default(trading_mode) - process = '' + process = "" for idx, pair in enumerate(pairs, start=1): if pair not in exchange.markets: pairs_not_available.append(pair) logger.info(f"Skipping pair {pair}...") continue for timeframe in timeframes: - - logger.debug(f'Downloading pair {pair}, {candle_type}, interval {timeframe}.') - process = f'{idx}/{len(pairs)}' - _download_pair_history(pair=pair, process=process, - datadir=datadir, exchange=exchange, - timerange=timerange, data_handler=data_handler, - timeframe=str(timeframe), new_pairs_days=new_pairs_days, - candle_type=candle_type, - erase=erase, prepend=prepend) - if trading_mode == 'futures': + logger.debug(f"Downloading pair {pair}, {candle_type}, interval {timeframe}.") + process = f"{idx}/{len(pairs)}" + _download_pair_history( + pair=pair, + process=process, + datadir=datadir, + exchange=exchange, + timerange=timerange, + data_handler=data_handler, + timeframe=str(timeframe), + new_pairs_days=new_pairs_days, + candle_type=candle_type, + erase=erase, + prepend=prepend, + ) + if trading_mode == "futures": # Predefined candletype (and timeframe) depending on exchange # Downloads what is necessary to backtest based on futures data. - tf_mark = exchange.get_option('mark_ohlcv_timeframe') - tf_funding_rate = exchange.get_option('funding_fee_timeframe') + tf_mark = exchange.get_option("mark_ohlcv_timeframe") + tf_funding_rate = exchange.get_option("funding_fee_timeframe") - fr_candle_type = CandleType.from_string(exchange.get_option('mark_ohlcv_price')) + fr_candle_type = CandleType.from_string(exchange.get_option("mark_ohlcv_price")) # All exchanges need FundingRate for futures trading. # The timeframe is aligned to the mark-price timeframe. combs = ((CandleType.FUNDING_RATE, tf_funding_rate), (fr_candle_type, tf_mark)) for candle_type_f, tf in combs: - logger.debug(f'Downloading pair {pair}, {candle_type_f}, interval {tf}.') - _download_pair_history(pair=pair, process=process, - datadir=datadir, exchange=exchange, - timerange=timerange, data_handler=data_handler, - timeframe=str(tf), new_pairs_days=new_pairs_days, - candle_type=candle_type_f, - erase=erase, prepend=prepend) + logger.debug(f"Downloading pair {pair}, {candle_type_f}, interval {tf}.") + _download_pair_history( + pair=pair, + process=process, + datadir=datadir, + exchange=exchange, + timerange=timerange, + data_handler=data_handler, + timeframe=str(tf), + new_pairs_days=new_pairs_days, + candle_type=candle_type_f, + erase=erase, + prepend=prepend, + ) return pairs_not_available -def _download_trades_history(exchange: Exchange, - pair: str, *, - new_pairs_days: int = 30, - timerange: Optional[TimeRange] = None, - data_handler: IDataHandler, - trading_mode: TradingMode, - ) -> bool: +def _download_trades_history( + exchange: Exchange, + pair: str, + *, + new_pairs_days: int = 30, + timerange: Optional[TimeRange] = None, + data_handler: IDataHandler, + trading_mode: TradingMode, +) -> bool: """ Download trade history from the exchange. Appends to previously downloaded trades data. """ try: - until = None since = 0 if timerange: - if timerange.starttype == 'date': + if timerange.starttype == "date": since = timerange.startts * 1000 - if timerange.stoptype == 'date': + if timerange.stoptype == "date": until = timerange.stopts * 1000 trades = data_handler.trades_load(pair, trading_mode) @@ -365,60 +422,76 @@ def _download_trades_history(exchange: Exchange, # DEFAULT_TRADES_COLUMNS: 0 -> timestamp # DEFAULT_TRADES_COLUMNS: 1 -> id - if not trades.empty and since > 0 and since < trades.iloc[0]['timestamp']: + if not trades.empty and since > 0 and since < trades.iloc[0]["timestamp"]: # since is before the first trade - logger.info(f"Start ({trades.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}) earlier than " - f"available data. Redownloading trades for {pair}...") + logger.info( + f"Start ({trades.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}) earlier than " + f"available data. Redownloading trades for {pair}..." + ) trades = trades_list_to_df([]) - from_id = trades.iloc[-1]['id'] if not trades.empty else None - if not trades.empty and since < trades.iloc[-1]['timestamp']: + from_id = trades.iloc[-1]["id"] if not trades.empty else None + if not trades.empty and since < trades.iloc[-1]["timestamp"]: # Reset since to the last available point # - 5 seconds (to ensure we're getting all trades) - since = trades.iloc[-1]['timestamp'] - (5 * 1000) - logger.info(f"Using last trade date -5s - Downloading trades for {pair} " - f"since: {format_ms_time(since)}.") + since = trades.iloc[-1]["timestamp"] - (5 * 1000) + logger.info( + f"Using last trade date -5s - Downloading trades for {pair} " + f"since: {format_ms_time(since)}." + ) if not since: since = dt_ts(dt_now() - timedelta(days=new_pairs_days)) - logger.debug("Current Start: %s", 'None' if trades.empty else - f"{trades.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}") - logger.debug("Current End: %s", 'None' if trades.empty else - f"{trades.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}") + logger.debug( + "Current Start: %s", + "None" if trades.empty else f"{trades.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}", + ) + logger.debug( + "Current End: %s", + "None" if trades.empty else f"{trades.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}", + ) logger.info(f"Current Amount of trades: {len(trades)}") # Default since_ms to 30 days if nothing is given - new_trades = exchange.get_historic_trades(pair=pair, - since=since, - until=until, - from_id=from_id, - ) + new_trades = exchange.get_historic_trades( + pair=pair, + since=since, + until=until, + from_id=from_id, + ) new_trades_df = trades_list_to_df(new_trades[1]) trades = concat([trades, new_trades_df], axis=0) # Remove duplicates to make sure we're not storing data we don't need trades = trades_df_remove_duplicates(trades) data_handler.trades_store(pair, trades, trading_mode) - logger.debug("New Start: %s", 'None' if trades.empty else - f"{trades.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}") - logger.debug("New End: %s", 'None' if trades.empty else - f"{trades.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}") + logger.debug( + "New Start: %s", + "None" if trades.empty else f"{trades.iloc[0]['date']:{DATETIME_PRINT_FORMAT}}", + ) + logger.debug( + "New End: %s", + "None" if trades.empty else f"{trades.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}", + ) logger.info(f"New Amount of trades: {len(trades)}") return True except Exception: - logger.exception( - f'Failed to download historic trades for pair: "{pair}". ' - ) + logger.exception(f'Failed to download historic trades for pair: "{pair}". ') return False -def refresh_backtest_trades_data(exchange: Exchange, pairs: List[str], datadir: Path, - timerange: TimeRange, trading_mode: TradingMode, - new_pairs_days: int = 30, - erase: bool = False, data_format: str = 'feather', - ) -> List[str]: +def refresh_backtest_trades_data( + exchange: Exchange, + pairs: List[str], + datadir: Path, + timerange: TimeRange, + trading_mode: TradingMode, + new_pairs_days: int = 30, + erase: bool = False, + data_format: str = "feather", +) -> List[str]: """ Refresh stored trades data for backtesting and hyperopt operations. Used by freqtrade download-data subcommand. @@ -434,15 +507,17 @@ def refresh_backtest_trades_data(exchange: Exchange, pairs: List[str], datadir: if erase: if data_handler.trades_purge(pair, trading_mode): - logger.info(f'Deleting existing data for pair {pair}.') + logger.info(f"Deleting existing data for pair {pair}.") - logger.info(f'Downloading trades for pair {pair}.') - _download_trades_history(exchange=exchange, - pair=pair, - new_pairs_days=new_pairs_days, - timerange=timerange, - data_handler=data_handler, - trading_mode=trading_mode) + logger.info(f"Downloading trades for pair {pair}.") + _download_trades_history( + exchange=exchange, + pair=pair, + new_pairs_days=new_pairs_days, + timerange=timerange, + data_handler=data_handler, + trading_mode=trading_mode, + ) return pairs_not_available @@ -454,15 +529,18 @@ def get_timerange(data: Dict[str, DataFrame]) -> Tuple[datetime, datetime]: :return: tuple containing min_date, max_date """ timeranges = [ - (frame['date'].min().to_pydatetime(), frame['date'].max().to_pydatetime()) + (frame["date"].min().to_pydatetime(), frame["date"].max().to_pydatetime()) for frame in data.values() ] - return (min(timeranges, key=operator.itemgetter(0))[0], - max(timeranges, key=operator.itemgetter(1))[1]) + return ( + min(timeranges, key=operator.itemgetter(0))[0], + max(timeranges, key=operator.itemgetter(1))[1], + ) -def validate_backtest_data(data: DataFrame, pair: str, min_date: datetime, - max_date: datetime, timeframe_min: int) -> bool: +def validate_backtest_data( + data: DataFrame, pair: str, min_date: datetime, max_date: datetime, timeframe_min: int +) -> bool: """ Validates preprocessed backtesting data for missing values and shows warnings about it that. @@ -478,89 +556,111 @@ def validate_backtest_data(data: DataFrame, pair: str, min_date: datetime, dflen = len(data) if dflen < expected_frames: found_missing = True - logger.warning("%s has missing frames: expected %s, got %s, that's %s missing values", - pair, expected_frames, dflen, expected_frames - dflen) + logger.warning( + "%s has missing frames: expected %s, got %s, that's %s missing values", + pair, + expected_frames, + dflen, + expected_frames - dflen, + ) return found_missing def download_data_main(config: Config) -> None: - timerange = TimeRange() - if 'days' in config: - time_since = (datetime.now() - timedelta(days=config['days'])).strftime("%Y%m%d") - timerange = TimeRange.parse_timerange(f'{time_since}-') + if "days" in config: + time_since = (datetime.now() - timedelta(days=config["days"])).strftime("%Y%m%d") + timerange = TimeRange.parse_timerange(f"{time_since}-") - if 'timerange' in config: - timerange = timerange.parse_timerange(config['timerange']) + if "timerange" in config: + timerange = timerange.parse_timerange(config["timerange"]) # Remove stake-currency to skip checks which are not relevant for datadownload - config['stake_currency'] = '' + config["stake_currency"] = "" pairs_not_available: List[str] = [] # Init exchange from freqtrade.resolvers.exchange_resolver import ExchangeResolver + exchange = ExchangeResolver.load_exchange(config, validate=False) available_pairs = [ - p for p in exchange.get_markets( - tradable_only=True, active_only=not config.get('include_inactive') - ).keys() + p + for p in exchange.get_markets( + tradable_only=True, active_only=not config.get("include_inactive") + ).keys() ] expanded_pairs = dynamic_expand_pairlist(config, available_pairs) - if 'timeframes' not in config: - config['timeframes'] = DL_DATA_TIMEFRAMES + if "timeframes" not in config: + config["timeframes"] = DL_DATA_TIMEFRAMES # Manual validations of relevant settings - if not config['exchange'].get('skip_pair_validation', False): + if not config["exchange"].get("skip_pair_validation", False): exchange.validate_pairs(expanded_pairs) - logger.info(f"About to download pairs: {expanded_pairs}, " - f"intervals: {config['timeframes']} to {config['datadir']}") + logger.info( + f"About to download pairs: {expanded_pairs}, " + f"intervals: {config['timeframes']} to {config['datadir']}" + ) if len(expanded_pairs) == 0: logger.warning( "No pairs available for download. " "Please make sure you're using the correct Pair naming for your selected trade mode. \n" - f"More info: {DOCS_LINK}/bot-basics/#pair-naming") + f"More info: {DOCS_LINK}/bot-basics/#pair-naming" + ) - for timeframe in config['timeframes']: + for timeframe in config["timeframes"]: exchange.validate_timeframes(timeframe) # Start downloading try: - if config.get('download_trades'): + if config.get("download_trades"): pairs_not_available = refresh_backtest_trades_data( - exchange, pairs=expanded_pairs, datadir=config['datadir'], - timerange=timerange, new_pairs_days=config['new_pairs_days'], - erase=bool(config.get('erase')), data_format=config['dataformat_trades'], - trading_mode=config.get('trading_mode', TradingMode.SPOT), - ) + exchange, + pairs=expanded_pairs, + datadir=config["datadir"], + timerange=timerange, + new_pairs_days=config["new_pairs_days"], + erase=bool(config.get("erase")), + data_format=config["dataformat_trades"], + trading_mode=config.get("trading_mode", TradingMode.SPOT), + ) # Convert downloaded trade data to different timeframes convert_trades_to_ohlcv( - pairs=expanded_pairs, timeframes=config['timeframes'], - datadir=config['datadir'], timerange=timerange, erase=bool(config.get('erase')), - data_format_ohlcv=config['dataformat_ohlcv'], - data_format_trades=config['dataformat_trades'], - candle_type=config.get('candle_type_def', CandleType.SPOT), + pairs=expanded_pairs, + timeframes=config["timeframes"], + datadir=config["datadir"], + timerange=timerange, + erase=bool(config.get("erase")), + data_format_ohlcv=config["dataformat_ohlcv"], + data_format_trades=config["dataformat_trades"], + candle_type=config.get("candle_type_def", CandleType.SPOT), ) else: - if not exchange.get_option('ohlcv_has_history', True): + if not exchange.get_option("ohlcv_has_history", True): raise OperationalException( f"Historic klines not available for {exchange.name}. " "Please use `--dl-trades` instead for this exchange " "(will unfortunately take a long time)." - ) + ) migrate_data(config, exchange) pairs_not_available = refresh_backtest_ohlcv_data( - exchange, pairs=expanded_pairs, timeframes=config['timeframes'], - datadir=config['datadir'], timerange=timerange, - new_pairs_days=config['new_pairs_days'], - erase=bool(config.get('erase')), data_format=config['dataformat_ohlcv'], - trading_mode=config.get('trading_mode', 'spot'), - prepend=config.get('prepend_data', False) + exchange, + pairs=expanded_pairs, + timeframes=config["timeframes"], + datadir=config["datadir"], + timerange=timerange, + new_pairs_days=config["new_pairs_days"], + erase=bool(config.get("erase")), + data_format=config["dataformat_ohlcv"], + trading_mode=config.get("trading_mode", "spot"), + prepend=config.get("prepend_data", False), ) finally: if pairs_not_available: - logger.info(f"Pairs [{','.join(pairs_not_available)}] not available " - f"on exchange {exchange.name}.") + logger.info( + f"Pairs [{','.join(pairs_not_available)}] not available " + f"on exchange {exchange.name}." + ) diff --git a/freqtrade/data/metrics.py b/freqtrade/data/metrics.py index 43a33fa0d..0bee68326 100644 --- a/freqtrade/data/metrics.py +++ b/freqtrade/data/metrics.py @@ -31,7 +31,8 @@ def calculate_market_change(data: Dict[str, pd.DataFrame], column: str = "close" def combine_dataframes_by_column( - data: Dict[str, pd.DataFrame], column: str = "close") -> pd.DataFrame: + data: Dict[str, pd.DataFrame], column: str = "close" +) -> pd.DataFrame: """ Combine multiple dataframes "column" :param data: Dict of Dataframes, dict key should be pair. @@ -41,14 +42,15 @@ def combine_dataframes_by_column( """ if not data: raise ValueError("No data provided.") - df_comb = pd.concat([data[pair].set_index('date').rename( - {column: pair}, axis=1)[pair] for pair in data], axis=1) + df_comb = pd.concat( + [data[pair].set_index("date").rename({column: pair}, axis=1)[pair] for pair in data], axis=1 + ) return df_comb def combined_dataframes_with_rel_mean( - data: Dict[str, pd.DataFrame], fromdt: datetime, todt: datetime, - column: str = "close") -> pd.DataFrame: + data: Dict[str, pd.DataFrame], fromdt: datetime, todt: datetime, column: str = "close" +) -> pd.DataFrame: """ Combine multiple dataframes "column" :param data: Dict of Dataframes, dict key should be pair. @@ -60,14 +62,15 @@ def combined_dataframes_with_rel_mean( df_comb = combine_dataframes_by_column(data, column) # Trim dataframes to the given timeframe df_comb = df_comb.iloc[(df_comb.index >= fromdt) & (df_comb.index < todt)] - df_comb['count'] = df_comb.count(axis=1) - df_comb['mean'] = df_comb.mean(axis=1) - df_comb['rel_mean'] = df_comb['mean'].pct_change().fillna(0).cumsum() - return df_comb[['mean', 'rel_mean', 'count']] + df_comb["count"] = df_comb.count(axis=1) + df_comb["mean"] = df_comb.mean(axis=1) + df_comb["rel_mean"] = df_comb["mean"].pct_change().fillna(0).cumsum() + return df_comb[["mean", "rel_mean", "count"]] def combine_dataframes_with_mean( - data: Dict[str, pd.DataFrame], column: str = "close") -> pd.DataFrame: + data: Dict[str, pd.DataFrame], column: str = "close" +) -> pd.DataFrame: """ Combine multiple dataframes "column" :param data: Dict of Dataframes, dict key should be pair. @@ -78,13 +81,14 @@ def combine_dataframes_with_mean( """ df_comb = combine_dataframes_by_column(data, column) - df_comb['mean'] = df_comb.mean(axis=1) + df_comb["mean"] = df_comb.mean(axis=1) return df_comb -def create_cum_profit(df: pd.DataFrame, trades: pd.DataFrame, col_name: str, - timeframe: str) -> pd.DataFrame: +def create_cum_profit( + df: pd.DataFrame, trades: pd.DataFrame, col_name: str, timeframe: str +) -> pd.DataFrame: """ Adds a column `col_name` with the cumulative profit for the given trades array. :param df: DataFrame with date index @@ -97,11 +101,11 @@ def create_cum_profit(df: pd.DataFrame, trades: pd.DataFrame, col_name: str, if len(trades) == 0: raise ValueError("Trade dataframe empty.") from freqtrade.exchange import timeframe_to_resample_freq + timeframe_freq = timeframe_to_resample_freq(timeframe) # Resample to timeframe to make sure trades match candles - _trades_sum = trades.resample(timeframe_freq, on='close_date' - )[['profit_abs']].sum() - df.loc[:, col_name] = _trades_sum['profit_abs'].cumsum() + _trades_sum = trades.resample(timeframe_freq, on="close_date")[["profit_abs"]].sum() + df.loc[:, col_name] = _trades_sum["profit_abs"].cumsum() # Set first value to 0 df.loc[df.iloc[0].name, col_name] = 0 # FFill to get continuous @@ -109,29 +113,34 @@ def create_cum_profit(df: pd.DataFrame, trades: pd.DataFrame, col_name: str, return df -def _calc_drawdown_series(profit_results: pd.DataFrame, *, date_col: str, value_col: str, - starting_balance: float) -> pd.DataFrame: +def _calc_drawdown_series( + profit_results: pd.DataFrame, *, date_col: str, value_col: str, starting_balance: float +) -> pd.DataFrame: max_drawdown_df = pd.DataFrame() - max_drawdown_df['cumulative'] = profit_results[value_col].cumsum() - max_drawdown_df['high_value'] = max_drawdown_df['cumulative'].cummax() - max_drawdown_df['drawdown'] = max_drawdown_df['cumulative'] - max_drawdown_df['high_value'] - max_drawdown_df['date'] = profit_results.loc[:, date_col] + max_drawdown_df["cumulative"] = profit_results[value_col].cumsum() + max_drawdown_df["high_value"] = max_drawdown_df["cumulative"].cummax() + max_drawdown_df["drawdown"] = max_drawdown_df["cumulative"] - max_drawdown_df["high_value"] + max_drawdown_df["date"] = profit_results.loc[:, date_col] if starting_balance: - cumulative_balance = starting_balance + max_drawdown_df['cumulative'] - max_balance = starting_balance + max_drawdown_df['high_value'] - max_drawdown_df['drawdown_relative'] = ((max_balance - cumulative_balance) / max_balance) + cumulative_balance = starting_balance + max_drawdown_df["cumulative"] + max_balance = starting_balance + max_drawdown_df["high_value"] + max_drawdown_df["drawdown_relative"] = (max_balance - cumulative_balance) / max_balance else: # NOTE: This is not completely accurate, # but might good enough if starting_balance is not available - max_drawdown_df['drawdown_relative'] = ( - (max_drawdown_df['high_value'] - max_drawdown_df['cumulative']) - / max_drawdown_df['high_value']) + max_drawdown_df["drawdown_relative"] = ( + max_drawdown_df["high_value"] - max_drawdown_df["cumulative"] + ) / max_drawdown_df["high_value"] return max_drawdown_df -def calculate_underwater(trades: pd.DataFrame, *, date_col: str = 'close_date', - value_col: str = 'profit_ratio', starting_balance: float = 0.0 - ): +def calculate_underwater( + trades: pd.DataFrame, + *, + date_col: str = "close_date", + value_col: str = "profit_ratio", + starting_balance: float = 0.0, +): """ Calculate max drawdown and the corresponding close dates :param trades: DataFrame containing trades (requires columns close_date and profit_ratio) @@ -145,18 +154,20 @@ def calculate_underwater(trades: pd.DataFrame, *, date_col: str = 'close_date', raise ValueError("Trade dataframe empty.") profit_results = trades.sort_values(date_col).reset_index(drop=True) max_drawdown_df = _calc_drawdown_series( - profit_results, - date_col=date_col, - value_col=value_col, - starting_balance=starting_balance) + profit_results, date_col=date_col, value_col=value_col, starting_balance=starting_balance + ) return max_drawdown_df -def calculate_max_drawdown(trades: pd.DataFrame, *, date_col: str = 'close_date', - value_col: str = 'profit_abs', starting_balance: float = 0, - relative: bool = False - ) -> Tuple[float, pd.Timestamp, pd.Timestamp, float, float, float]: +def calculate_max_drawdown( + trades: pd.DataFrame, + *, + date_col: str = "close_date", + value_col: str = "profit_abs", + starting_balance: float = 0, + relative: bool = False, +) -> Tuple[float, pd.Timestamp, pd.Timestamp, float, float, float]: """ Calculate max drawdown and the corresponding close dates :param trades: DataFrame containing trades (requires columns close_date and profit_ratio) @@ -172,32 +183,31 @@ def calculate_max_drawdown(trades: pd.DataFrame, *, date_col: str = 'close_date' raise ValueError("Trade dataframe empty.") profit_results = trades.sort_values(date_col).reset_index(drop=True) max_drawdown_df = _calc_drawdown_series( - profit_results, - date_col=date_col, - value_col=value_col, - starting_balance=starting_balance + profit_results, date_col=date_col, value_col=value_col, starting_balance=starting_balance ) idxmin = ( - max_drawdown_df['drawdown_relative'].idxmax() - if relative else max_drawdown_df['drawdown'].idxmin() + max_drawdown_df["drawdown_relative"].idxmax() + if relative + else max_drawdown_df["drawdown"].idxmin() ) if idxmin == 0: raise ValueError("No losing trade, therefore no drawdown.") - high_date = profit_results.loc[max_drawdown_df.iloc[:idxmin]['high_value'].idxmax(), date_col] + high_date = profit_results.loc[max_drawdown_df.iloc[:idxmin]["high_value"].idxmax(), date_col] low_date = profit_results.loc[idxmin, date_col] - high_val = max_drawdown_df.loc[max_drawdown_df.iloc[:idxmin] - ['high_value'].idxmax(), 'cumulative'] - low_val = max_drawdown_df.loc[idxmin, 'cumulative'] - max_drawdown_rel = max_drawdown_df.loc[idxmin, 'drawdown_relative'] + high_val = max_drawdown_df.loc[ + max_drawdown_df.iloc[:idxmin]["high_value"].idxmax(), "cumulative" + ] + low_val = max_drawdown_df.loc[idxmin, "cumulative"] + max_drawdown_rel = max_drawdown_df.loc[idxmin, "drawdown_relative"] return ( - abs(max_drawdown_df.loc[idxmin, 'drawdown']), + abs(max_drawdown_df.loc[idxmin, "drawdown"]), high_date, low_date, high_val, low_val, - max_drawdown_rel + max_drawdown_rel, ) @@ -213,9 +223,9 @@ def calculate_csum(trades: pd.DataFrame, starting_balance: float = 0) -> Tuple[f raise ValueError("Trade dataframe empty.") csum_df = pd.DataFrame() - csum_df['sum'] = trades['profit_abs'].cumsum() - csum_min = csum_df['sum'].min() + starting_balance - csum_max = csum_df['sum'].max() + starting_balance + csum_df["sum"] = trades["profit_abs"].cumsum() + csum_min = csum_df["sum"].min() + starting_balance + csum_max = csum_df["sum"].max() + starting_balance return csum_min, csum_max @@ -245,28 +255,29 @@ def calculate_expectancy(trades: pd.DataFrame) -> Tuple[float, float]: expectancy_ratio = 100 if len(trades) > 0: - winning_trades = trades.loc[trades['profit_abs'] > 0] - losing_trades = trades.loc[trades['profit_abs'] < 0] - profit_sum = winning_trades['profit_abs'].sum() - loss_sum = abs(losing_trades['profit_abs'].sum()) + winning_trades = trades.loc[trades["profit_abs"] > 0] + losing_trades = trades.loc[trades["profit_abs"] < 0] + profit_sum = winning_trades["profit_abs"].sum() + loss_sum = abs(losing_trades["profit_abs"].sum()) nb_win_trades = len(winning_trades) nb_loss_trades = len(losing_trades) average_win = (profit_sum / nb_win_trades) if nb_win_trades > 0 else 0 average_loss = (loss_sum / nb_loss_trades) if nb_loss_trades > 0 else 0 - winrate = (nb_win_trades / len(trades)) - loserate = (nb_loss_trades / len(trades)) + winrate = nb_win_trades / len(trades) + loserate = nb_loss_trades / len(trades) expectancy = (winrate * average_win) - (loserate * average_loss) - if (average_loss > 0): + if average_loss > 0: risk_reward_ratio = average_win / average_loss expectancy_ratio = ((1 + risk_reward_ratio) * winrate) - 1 return expectancy, expectancy_ratio -def calculate_sortino(trades: pd.DataFrame, min_date: datetime, max_date: datetime, - starting_balance: float) -> float: +def calculate_sortino( + trades: pd.DataFrame, min_date: datetime, max_date: datetime, starting_balance: float +) -> float: """ Calculate sortino :param trades: DataFrame containing trades (requires columns profit_abs) @@ -275,12 +286,12 @@ def calculate_sortino(trades: pd.DataFrame, min_date: datetime, max_date: dateti if (len(trades) == 0) or (min_date is None) or (max_date is None) or (min_date == max_date): return 0 - total_profit = trades['profit_abs'] / starting_balance + total_profit = trades["profit_abs"] / starting_balance days_period = max(1, (max_date - min_date).days) expected_returns_mean = total_profit.sum() / days_period - down_stdev = np.std(trades.loc[trades['profit_abs'] < 0, 'profit_abs'] / starting_balance) + down_stdev = np.std(trades.loc[trades["profit_abs"] < 0, "profit_abs"] / starting_balance) if down_stdev != 0 and not np.isnan(down_stdev): sortino_ratio = expected_returns_mean / down_stdev * np.sqrt(365) @@ -292,8 +303,9 @@ def calculate_sortino(trades: pd.DataFrame, min_date: datetime, max_date: dateti return sortino_ratio -def calculate_sharpe(trades: pd.DataFrame, min_date: datetime, max_date: datetime, - starting_balance: float) -> float: +def calculate_sharpe( + trades: pd.DataFrame, min_date: datetime, max_date: datetime, starting_balance: float +) -> float: """ Calculate sharpe :param trades: DataFrame containing trades (requires column profit_abs) @@ -302,7 +314,7 @@ def calculate_sharpe(trades: pd.DataFrame, min_date: datetime, max_date: datetim if (len(trades) == 0) or (min_date is None) or (max_date is None) or (min_date == max_date): return 0 - total_profit = trades['profit_abs'] / starting_balance + total_profit = trades["profit_abs"] / starting_balance days_period = max(1, (max_date - min_date).days) expected_returns_mean = total_profit.sum() / days_period @@ -318,8 +330,9 @@ def calculate_sharpe(trades: pd.DataFrame, min_date: datetime, max_date: datetim return sharp_ratio -def calculate_calmar(trades: pd.DataFrame, min_date: datetime, max_date: datetime, - starting_balance: float) -> float: +def calculate_calmar( + trades: pd.DataFrame, min_date: datetime, max_date: datetime, starting_balance: float +) -> float: """ Calculate calmar :param trades: DataFrame containing trades (requires columns close_date and profit_abs) @@ -328,7 +341,7 @@ def calculate_calmar(trades: pd.DataFrame, min_date: datetime, max_date: datetim if (len(trades) == 0) or (min_date is None) or (max_date is None) or (min_date == max_date): return 0 - total_profit = trades['profit_abs'].sum() / starting_balance + total_profit = trades["profit_abs"].sum() / starting_balance days_period = max(1, (max_date - min_date).days) # adding slippage of 0.1% per trade