freqtrade_origin/freqtrade/data/btanalysis.py

517 lines
18 KiB
Python
Raw Normal View History

2019-03-07 20:20:32 +00:00
"""
Helpers when analyzing backtest data
"""
2024-05-12 15:41:55 +00:00
2019-06-16 08:41:05 +00:00
import logging
from copy import copy
from datetime import datetime, timezone
2019-03-07 20:20:32 +00:00
from pathlib import Path
from typing import Any, Literal, Optional, Union
2019-03-07 20:20:32 +00:00
import numpy as np
import pandas as pd
from freqtrade.constants import LAST_BT_RESULT_FN, IntOrInf
2024-03-19 06:13:17 +00:00
from freqtrade.exceptions import ConfigurationError, OperationalException
2024-09-04 04:44:48 +00:00
from freqtrade.ft_types import BacktestHistoryEntryType, BacktestResultType
2023-07-31 19:16:25 +00:00
from freqtrade.misc import file_dump_json, json_load
from freqtrade.optimize.backtest_caching import get_backtest_metadata_filename
from freqtrade.persistence import LocalTrade, Trade, init_db
2019-06-16 08:41:05 +00:00
2020-09-28 17:39:41 +00:00
2019-06-16 08:41:05 +00:00
logger = logging.getLogger(__name__)
2019-03-07 20:20:32 +00:00
# Newest format
2024-05-12 15:41:55 +00:00
BT_DATA_COLUMNS = [
"pair",
"stake_amount",
"max_stake_amount",
"amount",
"open_date",
"close_date",
"open_rate",
"close_rate",
"fee_open",
"fee_close",
"trade_duration",
"profit_ratio",
"profit_abs",
"exit_reason",
"initial_stop_loss_abs",
"initial_stop_loss_ratio",
"stop_loss_abs",
"stop_loss_ratio",
"min_rate",
"max_rate",
"is_open",
"enter_tag",
"leverage",
"is_short",
"open_timestamp",
"close_timestamp",
"orders",
]
2019-03-07 20:20:32 +00:00
2020-09-27 14:50:22 +00:00
def get_latest_optimize_filename(directory: Union[Path, str], variant: str) -> str:
2020-06-26 07:34:18 +00:00
"""
Get latest backtest export based on '.last_result.json'.
:param directory: Directory to search for last result
2020-09-27 14:50:22 +00:00
:param variant: 'backtest' or 'hyperopt' - the method to return
2020-06-26 07:34:18 +00:00
:return: string containing the filename of the latest backtest result
:raises: ValueError in the following cases:
* Directory does not exist
* `directory/.last_result.json` does not exist
* `directory/.last_result.json` has the wrong content
"""
if isinstance(directory, str):
directory = Path(directory)
if not directory.is_dir():
2020-06-27 04:46:54 +00:00
raise ValueError(f"Directory '{directory}' does not exist.")
2020-06-28 07:27:19 +00:00
filename = directory / LAST_BT_RESULT_FN
2020-06-26 07:34:18 +00:00
if not filename.is_file():
raise ValueError(
2024-05-12 15:41:55 +00:00
f"Directory '{directory}' does not seem to contain backtest statistics yet."
)
2020-06-26 07:34:18 +00:00
with filename.open() as file:
data = json_load(file)
2024-05-12 15:41:55 +00:00
if f"latest_{variant}" not in data:
2020-06-28 07:27:19 +00:00
raise ValueError(f"Invalid '{LAST_BT_RESULT_FN}' format.")
2020-06-26 07:34:18 +00:00
2024-05-12 15:41:55 +00:00
return data[f"latest_{variant}"]
2020-09-27 14:50:22 +00:00
def get_latest_backtest_filename(directory: Union[Path, str]) -> str:
"""
Get latest backtest export based on '.last_result.json'.
:param directory: Directory to search for last result
:return: string containing the filename of the latest backtest result
:raises: ValueError in the following cases:
* Directory does not exist
* `directory/.last_result.json` does not exist
* `directory/.last_result.json` has the wrong content
"""
2024-05-12 15:41:55 +00:00
return get_latest_optimize_filename(directory, "backtest")
2020-09-27 14:50:22 +00:00
def get_latest_hyperopt_filename(directory: Union[Path, str]) -> str:
"""
Get latest hyperopt export based on '.last_result.json'.
:param directory: Directory to search for last result
:return: string containing the filename of the latest hyperopt result
:raises: ValueError in the following cases:
* Directory does not exist
* `directory/.last_result.json` does not exist
* `directory/.last_result.json` has the wrong content
"""
try:
2024-05-12 15:41:55 +00:00
return get_latest_optimize_filename(directory, "hyperopt")
2020-09-27 14:50:22 +00:00
except ValueError:
# Return default (legacy) pickle filename
2024-05-12 15:41:55 +00:00
return "hyperopt_results.pickle"
2020-09-27 14:50:22 +00:00
2023-01-21 14:01:56 +00:00
def get_latest_hyperopt_file(
2024-05-12 15:41:55 +00:00
directory: Union[Path, str], predef_filename: Optional[str] = None
) -> Path:
2020-09-27 14:50:22 +00:00
"""
Get latest hyperopt export based on '.last_result.json'.
:param directory: Directory to search for last result
:return: string containing the filename of the latest hyperopt result
:raises: ValueError in the following cases:
* Directory does not exist
* `directory/.last_result.json` does not exist
* `directory/.last_result.json` has the wrong content
"""
2020-09-27 17:40:55 +00:00
if isinstance(directory, str):
directory = Path(directory)
if predef_filename:
if Path(predef_filename).is_absolute():
2024-03-19 06:13:17 +00:00
raise ConfigurationError(
2024-05-12 15:41:55 +00:00
"--hyperopt-filename expects only the filename, not an absolute path."
)
return directory / predef_filename
2020-09-27 14:50:22 +00:00
return directory / get_latest_hyperopt_filename(directory)
2020-06-26 07:34:18 +00:00
def load_backtest_metadata(filename: Union[Path, str]) -> dict[str, Any]:
"""
Read metadata dictionary from backtest results file without reading and deserializing entire
file.
:param filename: path to backtest results file.
:return: metadata dict or None if metadata is not present.
"""
filename = get_backtest_metadata_filename(filename)
try:
with filename.open() as fp:
return json_load(fp)
except FileNotFoundError:
return {}
except Exception as e:
2024-05-12 15:41:55 +00:00
raise OperationalException("Unexpected error while loading backtest metadata.") from e
2023-07-30 08:52:23 +00:00
def load_backtest_stats(filename: Union[Path, str]) -> BacktestResultType:
2020-06-26 05:46:59 +00:00
"""
Load backtest statistics file.
:param filename: pathlib.Path object, or string pointing to the file.
:return: a dictionary containing the resulting file.
"""
if isinstance(filename, str):
filename = Path(filename)
if filename.is_dir():
2020-06-28 07:51:49 +00:00
filename = filename / get_latest_backtest_filename(filename)
2020-06-26 05:46:59 +00:00
if not filename.is_file():
raise ValueError(f"File {filename} does not exist.")
logger.info(f"Loading backtest result from {filename}")
2020-06-26 05:46:59 +00:00
with filename.open() as file:
data = json_load(file)
# Legacy list format does not contain metadata.
if isinstance(data, dict):
2024-05-12 15:41:55 +00:00
data["metadata"] = load_backtest_metadata(filename)
2020-06-26 05:46:59 +00:00
return data
def load_and_merge_backtest_result(strategy_name: str, filename: Path, results: dict[str, Any]):
2022-04-13 04:47:39 +00:00
"""
2023-07-30 08:52:23 +00:00
Load one strategy from multi-strategy result and merge it with results
2022-04-13 04:47:39 +00:00
:param strategy_name: Name of the strategy contained in the result
:param filename: Backtest-result-filename to load
:param results: dict to merge the result to.
"""
bt_data = load_backtest_stats(filename)
2024-05-12 15:41:55 +00:00
k: Literal["metadata", "strategy"]
for k in ("metadata", "strategy"):
results[k][strategy_name] = bt_data[k][strategy_name]
2024-05-12 15:41:55 +00:00
results["metadata"][strategy_name]["filename"] = filename.stem
comparison = bt_data["strategy_comparison"]
for i in range(len(comparison)):
2024-05-12 15:41:55 +00:00
if comparison[i]["key"] == strategy_name:
results["strategy_comparison"].append(comparison[i])
break
def _get_backtest_files(dirname: Path) -> list[Path]:
2023-07-26 05:07:11 +00:00
# Weird glob expression here avoids including .meta.json files.
2024-05-12 15:41:55 +00:00
return list(reversed(sorted(dirname.glob("backtest-result-*-[0-9][0-9].json"))))
def _extract_backtest_result(filename: Path) -> list[BacktestHistoryEntryType]:
metadata = load_backtest_metadata(filename)
2023-07-31 19:15:33 +00:00
return [
{
2024-05-12 15:41:55 +00:00
"filename": filename.stem,
"strategy": s,
"run_id": v["run_id"],
"notes": v.get("notes", ""),
# Backtest "run" time
2024-05-12 15:41:55 +00:00
"backtest_start_time": v["backtest_start_time"],
# Backtest timerange
2024-05-12 15:41:55 +00:00
"backtest_start_ts": v.get("backtest_start_ts", None),
"backtest_end_ts": v.get("backtest_end_ts", None),
"timeframe": v.get("timeframe", None),
"timeframe_detail": v.get("timeframe_detail", None),
}
for s, v in metadata.items()
2023-07-31 19:15:33 +00:00
]
def get_backtest_result(filename: Path) -> list[BacktestHistoryEntryType]:
"""
Get backtest result read from metadata file
"""
return _extract_backtest_result(filename)
def get_backtest_resultlist(dirname: Path) -> list[BacktestHistoryEntryType]:
"""
Get list of backtest results read from metadata files
"""
return [
result
for filename in _get_backtest_files(dirname)
for result in _extract_backtest_result(filename)
]
def delete_backtest_result(file_abs: Path):
"""
Delete backtest result file and corresponding metadata file.
"""
# *.meta.json
logger.info(f"Deleting backtest result file: {file_abs.name}")
2024-05-12 15:41:55 +00:00
file_abs_meta = file_abs.with_suffix(".meta.json")
file_abs.unlink()
file_abs_meta.unlink()
def update_backtest_metadata(filename: Path, strategy: str, content: dict[str, Any]):
2023-07-31 19:16:25 +00:00
"""
Updates backtest metadata file with new content.
:raises: ValueError if metadata file does not exist, or strategy is not in this file.
"""
metadata = load_backtest_metadata(filename)
if not metadata:
raise ValueError("File does not exist.")
if strategy not in metadata:
raise ValueError("Strategy not in metadata.")
metadata[strategy].update(content)
# Write data again.
file_dump_json(get_backtest_metadata_filename(filename), metadata)
def get_backtest_market_change(filename: Path, include_ts: bool = True) -> pd.DataFrame:
"""
Read backtest market change file.
"""
df = pd.read_feather(filename)
if include_ts:
2024-05-12 15:41:55 +00:00
df.loc[:, "__date_ts"] = df.loc[:, "date"].astype(np.int64) // 1000 // 1000
return df
2024-05-12 15:41:55 +00:00
def find_existing_backtest_stats(
dirname: Union[Path, str], run_ids: dict[str, str], min_backtest_date: Optional[datetime] = None
) -> dict[str, Any]:
"""
Find existing backtest stats that match specified run IDs and load them.
:param dirname: pathlib.Path object, or string pointing to the file.
:param run_ids: {strategy_name: id_string} dictionary.
:param min_backtest_date: do not load a backtest older than specified date.
:return: results dict.
"""
# Copy so we can modify this dict without affecting parent scope.
run_ids = copy(run_ids)
dirname = Path(dirname)
results: dict[str, Any] = {
2024-05-12 15:41:55 +00:00
"metadata": {},
"strategy": {},
"strategy_comparison": [],
}
for filename in _get_backtest_files(dirname):
metadata = load_backtest_metadata(filename)
if not metadata:
# Files are sorted from newest to oldest. When file without metadata is encountered it
# is safe to assume older files will also not have any metadata.
break
for strategy_name, run_id in list(run_ids.items()):
strategy_metadata = metadata.get(strategy_name, None)
if not strategy_metadata:
# This strategy is not present in analyzed backtest.
continue
if min_backtest_date is not None:
2024-05-12 15:41:55 +00:00
backtest_date = strategy_metadata["backtest_start_time"]
backtest_date = datetime.fromtimestamp(backtest_date, tz=timezone.utc)
if backtest_date < min_backtest_date:
# Do not use a cached result for this strategy as first result is too old.
del run_ids[strategy_name]
continue
2024-05-12 15:41:55 +00:00
if strategy_metadata["run_id"] == run_id:
del run_ids[strategy_name]
2022-04-13 04:47:39 +00:00
load_and_merge_backtest_result(strategy_name, filename, results)
if len(run_ids) == 0:
break
return results
def _load_backtest_data_df_compatibility(df: pd.DataFrame) -> pd.DataFrame:
"""
Compatibility support for older backtest data.
"""
2024-05-12 15:41:55 +00:00
df["open_date"] = pd.to_datetime(df["open_date"], utc=True)
df["close_date"] = pd.to_datetime(df["close_date"], utc=True)
# Compatibility support for pre short Columns
2024-05-12 15:41:55 +00:00
if "is_short" not in df.columns:
df["is_short"] = False
if "leverage" not in df.columns:
df["leverage"] = 1.0
if "enter_tag" not in df.columns:
df["enter_tag"] = df["buy_tag"]
df = df.drop(["buy_tag"], axis=1)
if "max_stake_amount" not in df.columns:
df["max_stake_amount"] = df["stake_amount"]
if "orders" not in df.columns:
df["orders"] = None
return df
2020-06-27 07:56:37 +00:00
def load_backtest_data(filename: Union[Path, str], strategy: Optional[str] = None) -> pd.DataFrame:
2019-03-07 20:20:32 +00:00
"""
Load backtest data file.
:param filename: pathlib.Path object, or string pointing to a file or directory
2020-06-27 07:56:37 +00:00
:param strategy: Strategy to load - mainly relevant for multi-strategy backtests
Can also serve as protection to load the correct result.
2019-06-23 20:10:37 +00:00
:return: a dataframe with the analysis results
:raise: ValueError if loading goes wrong.
2019-03-07 20:20:32 +00:00
"""
data = load_backtest_stats(filename)
if not isinstance(data, list):
2020-06-27 07:56:37 +00:00
# new, nested format
2024-05-12 15:41:55 +00:00
if "strategy" not in data:
2020-06-27 07:56:37 +00:00
raise ValueError("Unknown dataformat.")
if not strategy:
2024-05-12 15:41:55 +00:00
if len(data["strategy"]) == 1:
strategy = list(data["strategy"].keys())[0]
2020-06-27 07:56:37 +00:00
else:
2024-05-12 15:41:55 +00:00
raise ValueError(
"Detected backtest result with more than one strategy. "
"Please specify a strategy."
)
2020-06-27 07:56:37 +00:00
2024-05-12 15:41:55 +00:00
if strategy not in data["strategy"]:
raise ValueError(
f"Strategy {strategy} not available in the backtest result. "
f"Available strategies are '{','.join(data['strategy'].keys())}'"
2024-05-12 15:41:55 +00:00
)
2020-06-27 07:56:37 +00:00
2024-05-12 15:41:55 +00:00
data = data["strategy"][strategy]["trades"]
df = pd.DataFrame(data)
if not df.empty:
df = _load_backtest_data_df_compatibility(df)
else:
# old format - only with lists.
2022-01-06 18:49:25 +00:00
raise OperationalException(
2024-05-12 15:41:55 +00:00
"Backtest-results with only trades data are no longer supported."
)
if not df.empty:
df = df.sort_values("open_date").reset_index(drop=True)
2019-03-07 20:20:32 +00:00
return df
def analyze_trade_parallelism(results: pd.DataFrame, timeframe: str) -> pd.DataFrame:
2019-03-07 20:20:32 +00:00
"""
Find overlapping trades by expanding each trade once per period it was open
and then counting overlaps.
2019-03-07 20:20:32 +00:00
:param results: Results Dataframe - can be loaded
:param timeframe: Timeframe used for backtest
:return: dataframe with open-counts per time-period in timeframe
2019-03-07 20:20:32 +00:00
"""
from freqtrade.exchange import timeframe_to_resample_freq
2024-05-12 15:41:55 +00:00
timeframe_freq = timeframe_to_resample_freq(timeframe)
2024-05-12 15:41:55 +00:00
dates = [
pd.Series(
pd.date_range(
row[1]["open_date"],
row[1]["close_date"],
freq=timeframe_freq,
# Exclude right boundary - the date is the candle open date.
inclusive="left",
)
)
2024-05-12 15:41:55 +00:00
for row in results[["open_date", "close_date"]].iterrows()
]
2019-03-07 20:20:32 +00:00
deltas = [len(x) for x in dates]
2024-05-12 15:41:55 +00:00
dates = pd.Series(pd.concat(dates).values, name="date")
2019-03-07 20:20:32 +00:00
df2 = pd.DataFrame(np.repeat(results.values, deltas, axis=0), columns=results.columns)
df2 = pd.concat([dates, df2], axis=1)
2024-05-12 15:41:55 +00:00
df2 = df2.set_index("date")
df_final = df2.resample(timeframe_freq)[["pair"]].count()
df_final = df_final.rename({"pair": "open_trades"}, axis=1)
return df_final
2024-05-12 15:41:55 +00:00
def evaluate_result_multi(
results: pd.DataFrame, timeframe: str, max_open_trades: IntOrInf
) -> pd.DataFrame:
"""
Find overlapping trades by expanding each trade once per period it was open
and then counting overlaps
:param results: Results Dataframe - can be loaded
:param timeframe: Frequency used for the backtest
:param max_open_trades: parameter max_open_trades used during backtest run
:return: dataframe with open-counts per time-period in freq
"""
df_final = analyze_trade_parallelism(results, timeframe)
2024-05-12 15:41:55 +00:00
return df_final[df_final["open_trades"] > max_open_trades]
2019-06-16 08:41:05 +00:00
def trade_list_to_dataframe(trades: Union[list[Trade], list[LocalTrade]]) -> pd.DataFrame:
"""
Convert list of Trade objects to pandas Dataframe
:param trades: List of trade objects
:return: Dataframe with BT_DATA_COLUMNS
"""
2022-05-26 05:11:43 +00:00
df = pd.DataFrame.from_records([t.to_json(True) for t in trades], columns=BT_DATA_COLUMNS)
if len(df) > 0:
2024-05-12 15:41:55 +00:00
df["close_date"] = pd.to_datetime(df["close_date"], utc=True)
df["open_date"] = pd.to_datetime(df["open_date"], utc=True)
df["close_rate"] = df["close_rate"].astype("float64")
return df
2020-06-27 07:56:37 +00:00
def load_trades_from_db(db_url: str, strategy: Optional[str] = None) -> pd.DataFrame:
2019-06-16 08:41:05 +00:00
"""
2019-06-22 13:45:20 +00:00
Load trades from a DB (using dburl)
2019-06-16 08:41:05 +00:00
:param db_url: Sqlite url (default format sqlite:///tradesv3.dry-run.sqlite)
2020-06-27 07:56:37 +00:00
:param strategy: Strategy to load - mainly relevant for multi-strategy backtests
Can also serve as protection to load the correct result.
2019-06-23 20:10:37 +00:00
:return: Dataframe containing Trades
2019-06-16 08:41:05 +00:00
"""
2022-05-19 04:45:20 +00:00
init_db(db_url)
2020-06-27 07:56:37 +00:00
filters = []
if strategy:
2020-08-18 13:20:37 +00:00
filters.append(Trade.strategy == strategy)
2023-03-15 20:09:25 +00:00
trades = trade_list_to_dataframe(list(Trade.get_trades(filters).all()))
2019-06-16 08:41:05 +00:00
return trades
2024-05-12 15:41:55 +00:00
def load_trades(
source: str,
db_url: str,
exportfilename: Path,
no_trades: bool = False,
strategy: Optional[str] = None,
) -> pd.DataFrame:
"""
2020-08-26 18:52:09 +00:00
Based on configuration option 'trade_source':
* loads data from DB (using `db_url`)
2019-07-03 04:26:39 +00:00
* loads data from backtestfile (using `exportfilename`)
:param source: "DB" or "file" - specify source to load from
:param db_url: sqlalchemy formatted url to a database
:param exportfilename: Json file generated by backtesting
2020-03-15 20:20:32 +00:00
:param no_trades: Skip using trades, only return backtesting data columns
:return: DataFrame containing trades
"""
2020-03-15 20:20:32 +00:00
if no_trades:
2020-03-14 21:15:03 +00:00
df = pd.DataFrame(columns=BT_DATA_COLUMNS)
return df
2019-08-22 18:17:36 +00:00
if source == "DB":
return load_trades_from_db(db_url)
elif source == "file":
2020-06-27 07:56:37 +00:00
return load_backtest_data(exportfilename, strategy)
2024-05-12 15:41:55 +00:00
def extract_trades_of_period(
dataframe: pd.DataFrame, trades: pd.DataFrame, date_index=False
) -> pd.DataFrame:
"""
Compare trades and backtested pair DataFrames to get trades performed on backtested period
:return: the DataFrame of a trades of period
"""
if date_index:
trades_start = dataframe.index[0]
trades_stop = dataframe.index[-1]
else:
2024-05-12 15:41:55 +00:00
trades_start = dataframe.iloc[0]["date"]
trades_stop = dataframe.iloc[-1]["date"]
trades = trades.loc[
(trades["open_date"] >= trades_start) & (trades["close_date"] <= trades_stop)
]
return trades