mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-10 10:21:59 +00:00
509 lines
18 KiB
Python
509 lines
18 KiB
Python
"""
|
|
Helpers when analyzing backtest data
|
|
"""
|
|
|
|
import logging
|
|
from copy import copy
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Literal, Optional, Union
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from freqtrade.constants import LAST_BT_RESULT_FN, IntOrInf
|
|
from freqtrade.exceptions import ConfigurationError, OperationalException
|
|
from freqtrade.misc import file_dump_json, json_load
|
|
from freqtrade.optimize.backtest_caching import get_backtest_metadata_filename
|
|
from freqtrade.persistence import LocalTrade, Trade, init_db
|
|
from freqtrade.types import BacktestHistoryEntryType, BacktestResultType
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Newest format
|
|
BT_DATA_COLUMNS = [
|
|
"pair",
|
|
"stake_amount",
|
|
"max_stake_amount",
|
|
"amount",
|
|
"open_date",
|
|
"close_date",
|
|
"open_rate",
|
|
"close_rate",
|
|
"fee_open",
|
|
"fee_close",
|
|
"trade_duration",
|
|
"profit_ratio",
|
|
"profit_abs",
|
|
"exit_reason",
|
|
"initial_stop_loss_abs",
|
|
"initial_stop_loss_ratio",
|
|
"stop_loss_abs",
|
|
"stop_loss_ratio",
|
|
"min_rate",
|
|
"max_rate",
|
|
"is_open",
|
|
"enter_tag",
|
|
"leverage",
|
|
"is_short",
|
|
"open_timestamp",
|
|
"close_timestamp",
|
|
"orders",
|
|
]
|
|
|
|
|
|
def get_latest_optimize_filename(directory: Union[Path, str], variant: str) -> str:
|
|
"""
|
|
Get latest backtest export based on '.last_result.json'.
|
|
:param directory: Directory to search for last result
|
|
:param variant: 'backtest' or 'hyperopt' - the method to return
|
|
:return: string containing the filename of the latest backtest result
|
|
:raises: ValueError in the following cases:
|
|
* Directory does not exist
|
|
* `directory/.last_result.json` does not exist
|
|
* `directory/.last_result.json` has the wrong content
|
|
"""
|
|
if isinstance(directory, str):
|
|
directory = Path(directory)
|
|
if not directory.is_dir():
|
|
raise ValueError(f"Directory '{directory}' does not exist.")
|
|
filename = directory / LAST_BT_RESULT_FN
|
|
|
|
if not filename.is_file():
|
|
raise ValueError(
|
|
f"Directory '{directory}' does not seem to contain backtest statistics yet."
|
|
)
|
|
|
|
with filename.open() as file:
|
|
data = json_load(file)
|
|
|
|
if f"latest_{variant}" not in data:
|
|
raise ValueError(f"Invalid '{LAST_BT_RESULT_FN}' format.")
|
|
|
|
return data[f"latest_{variant}"]
|
|
|
|
|
|
def get_latest_backtest_filename(directory: Union[Path, str]) -> str:
|
|
"""
|
|
Get latest backtest export based on '.last_result.json'.
|
|
:param directory: Directory to search for last result
|
|
:return: string containing the filename of the latest backtest result
|
|
:raises: ValueError in the following cases:
|
|
* Directory does not exist
|
|
* `directory/.last_result.json` does not exist
|
|
* `directory/.last_result.json` has the wrong content
|
|
"""
|
|
return get_latest_optimize_filename(directory, "backtest")
|
|
|
|
|
|
def get_latest_hyperopt_filename(directory: Union[Path, str]) -> str:
|
|
"""
|
|
Get latest hyperopt export based on '.last_result.json'.
|
|
:param directory: Directory to search for last result
|
|
:return: string containing the filename of the latest hyperopt result
|
|
:raises: ValueError in the following cases:
|
|
* Directory does not exist
|
|
* `directory/.last_result.json` does not exist
|
|
* `directory/.last_result.json` has the wrong content
|
|
"""
|
|
try:
|
|
return get_latest_optimize_filename(directory, "hyperopt")
|
|
except ValueError:
|
|
# Return default (legacy) pickle filename
|
|
return "hyperopt_results.pickle"
|
|
|
|
|
|
def get_latest_hyperopt_file(
|
|
directory: Union[Path, str], predef_filename: Optional[str] = None
|
|
) -> Path:
|
|
"""
|
|
Get latest hyperopt export based on '.last_result.json'.
|
|
:param directory: Directory to search for last result
|
|
:return: string containing the filename of the latest hyperopt result
|
|
:raises: ValueError in the following cases:
|
|
* Directory does not exist
|
|
* `directory/.last_result.json` does not exist
|
|
* `directory/.last_result.json` has the wrong content
|
|
"""
|
|
if isinstance(directory, str):
|
|
directory = Path(directory)
|
|
if predef_filename:
|
|
if Path(predef_filename).is_absolute():
|
|
raise ConfigurationError(
|
|
"--hyperopt-filename expects only the filename, not an absolute path."
|
|
)
|
|
return directory / predef_filename
|
|
return directory / get_latest_hyperopt_filename(directory)
|
|
|
|
|
|
def load_backtest_metadata(filename: Union[Path, str]) -> Dict[str, Any]:
|
|
"""
|
|
Read metadata dictionary from backtest results file without reading and deserializing entire
|
|
file.
|
|
:param filename: path to backtest results file.
|
|
:return: metadata dict or None if metadata is not present.
|
|
"""
|
|
filename = get_backtest_metadata_filename(filename)
|
|
try:
|
|
with filename.open() as fp:
|
|
return json_load(fp)
|
|
except FileNotFoundError:
|
|
return {}
|
|
except Exception as e:
|
|
raise OperationalException("Unexpected error while loading backtest metadata.") from e
|
|
|
|
|
|
def load_backtest_stats(filename: Union[Path, str]) -> BacktestResultType:
|
|
"""
|
|
Load backtest statistics file.
|
|
:param filename: pathlib.Path object, or string pointing to the file.
|
|
:return: a dictionary containing the resulting file.
|
|
"""
|
|
if isinstance(filename, str):
|
|
filename = Path(filename)
|
|
if filename.is_dir():
|
|
filename = filename / get_latest_backtest_filename(filename)
|
|
if not filename.is_file():
|
|
raise ValueError(f"File {filename} does not exist.")
|
|
logger.info(f"Loading backtest result from {filename}")
|
|
with filename.open() as file:
|
|
data = json_load(file)
|
|
|
|
# Legacy list format does not contain metadata.
|
|
if isinstance(data, dict):
|
|
data["metadata"] = load_backtest_metadata(filename)
|
|
return data
|
|
|
|
|
|
def load_and_merge_backtest_result(strategy_name: str, filename: Path, results: Dict[str, Any]):
|
|
"""
|
|
Load one strategy from multi-strategy result and merge it with results
|
|
:param strategy_name: Name of the strategy contained in the result
|
|
:param filename: Backtest-result-filename to load
|
|
:param results: dict to merge the result to.
|
|
"""
|
|
bt_data = load_backtest_stats(filename)
|
|
k: Literal["metadata", "strategy"]
|
|
for k in ("metadata", "strategy"): # type: ignore
|
|
results[k][strategy_name] = bt_data[k][strategy_name]
|
|
results["metadata"][strategy_name]["filename"] = filename.stem
|
|
comparison = bt_data["strategy_comparison"]
|
|
for i in range(len(comparison)):
|
|
if comparison[i]["key"] == strategy_name:
|
|
results["strategy_comparison"].append(comparison[i])
|
|
break
|
|
|
|
|
|
def _get_backtest_files(dirname: Path) -> List[Path]:
|
|
# Weird glob expression here avoids including .meta.json files.
|
|
return list(reversed(sorted(dirname.glob("backtest-result-*-[0-9][0-9].json"))))
|
|
|
|
|
|
def _extract_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]:
|
|
metadata = load_backtest_metadata(filename)
|
|
return [
|
|
{
|
|
"filename": filename.stem,
|
|
"strategy": s,
|
|
"run_id": v["run_id"],
|
|
"notes": v.get("notes", ""),
|
|
# Backtest "run" time
|
|
"backtest_start_time": v["backtest_start_time"],
|
|
# Backtest timerange
|
|
"backtest_start_ts": v.get("backtest_start_ts", None),
|
|
"backtest_end_ts": v.get("backtest_end_ts", None),
|
|
"timeframe": v.get("timeframe", None),
|
|
"timeframe_detail": v.get("timeframe_detail", None),
|
|
}
|
|
for s, v in metadata.items()
|
|
]
|
|
|
|
|
|
def get_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]:
|
|
"""
|
|
Get backtest result read from metadata file
|
|
"""
|
|
return _extract_backtest_result(filename)
|
|
|
|
|
|
def get_backtest_resultlist(dirname: Path) -> List[BacktestHistoryEntryType]:
|
|
"""
|
|
Get list of backtest results read from metadata files
|
|
"""
|
|
return [
|
|
result
|
|
for filename in _get_backtest_files(dirname)
|
|
for result in _extract_backtest_result(filename)
|
|
]
|
|
|
|
|
|
def delete_backtest_result(file_abs: Path):
|
|
"""
|
|
Delete backtest result file and corresponding metadata file.
|
|
"""
|
|
# *.meta.json
|
|
logger.info(f"Deleting backtest result file: {file_abs.name}")
|
|
file_abs_meta = file_abs.with_suffix(".meta.json")
|
|
file_abs.unlink()
|
|
file_abs_meta.unlink()
|
|
|
|
|
|
def update_backtest_metadata(filename: Path, strategy: str, content: Dict[str, Any]):
|
|
"""
|
|
Updates backtest metadata file with new content.
|
|
:raises: ValueError if metadata file does not exist, or strategy is not in this file.
|
|
"""
|
|
metadata = load_backtest_metadata(filename)
|
|
if not metadata:
|
|
raise ValueError("File does not exist.")
|
|
if strategy not in metadata:
|
|
raise ValueError("Strategy not in metadata.")
|
|
metadata[strategy].update(content)
|
|
# Write data again.
|
|
file_dump_json(get_backtest_metadata_filename(filename), metadata)
|
|
|
|
|
|
def get_backtest_market_change(filename: Path, include_ts: bool = True) -> pd.DataFrame:
|
|
"""
|
|
Read backtest market change file.
|
|
"""
|
|
df = pd.read_feather(filename)
|
|
if include_ts:
|
|
df.loc[:, "__date_ts"] = df.loc[:, "date"].astype(np.int64) // 1000 // 1000
|
|
return df
|
|
|
|
|
|
def find_existing_backtest_stats(
|
|
dirname: Union[Path, str], run_ids: Dict[str, str], min_backtest_date: Optional[datetime] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Find existing backtest stats that match specified run IDs and load them.
|
|
:param dirname: pathlib.Path object, or string pointing to the file.
|
|
:param run_ids: {strategy_name: id_string} dictionary.
|
|
:param min_backtest_date: do not load a backtest older than specified date.
|
|
:return: results dict.
|
|
"""
|
|
# Copy so we can modify this dict without affecting parent scope.
|
|
run_ids = copy(run_ids)
|
|
dirname = Path(dirname)
|
|
results: Dict[str, Any] = {
|
|
"metadata": {},
|
|
"strategy": {},
|
|
"strategy_comparison": [],
|
|
}
|
|
|
|
for filename in _get_backtest_files(dirname):
|
|
metadata = load_backtest_metadata(filename)
|
|
if not metadata:
|
|
# Files are sorted from newest to oldest. When file without metadata is encountered it
|
|
# is safe to assume older files will also not have any metadata.
|
|
break
|
|
|
|
for strategy_name, run_id in list(run_ids.items()):
|
|
strategy_metadata = metadata.get(strategy_name, None)
|
|
if not strategy_metadata:
|
|
# This strategy is not present in analyzed backtest.
|
|
continue
|
|
|
|
if min_backtest_date is not None:
|
|
backtest_date = strategy_metadata["backtest_start_time"]
|
|
backtest_date = datetime.fromtimestamp(backtest_date, tz=timezone.utc)
|
|
if backtest_date < min_backtest_date:
|
|
# Do not use a cached result for this strategy as first result is too old.
|
|
del run_ids[strategy_name]
|
|
continue
|
|
|
|
if strategy_metadata["run_id"] == run_id:
|
|
del run_ids[strategy_name]
|
|
load_and_merge_backtest_result(strategy_name, filename, results)
|
|
|
|
if len(run_ids) == 0:
|
|
break
|
|
return results
|
|
|
|
|
|
def _load_backtest_data_df_compatibility(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Compatibility support for older backtest data.
|
|
"""
|
|
df["open_date"] = pd.to_datetime(df["open_date"], utc=True)
|
|
df["close_date"] = pd.to_datetime(df["close_date"], utc=True)
|
|
# Compatibility support for pre short Columns
|
|
if "is_short" not in df.columns:
|
|
df["is_short"] = False
|
|
if "leverage" not in df.columns:
|
|
df["leverage"] = 1.0
|
|
if "enter_tag" not in df.columns:
|
|
df["enter_tag"] = df["buy_tag"]
|
|
df = df.drop(["buy_tag"], axis=1)
|
|
if "max_stake_amount" not in df.columns:
|
|
df["max_stake_amount"] = df["stake_amount"]
|
|
if "orders" not in df.columns:
|
|
df["orders"] = None
|
|
return df
|
|
|
|
|
|
def load_backtest_data(filename: Union[Path, str], strategy: Optional[str] = None) -> pd.DataFrame:
|
|
"""
|
|
Load backtest data file.
|
|
:param filename: pathlib.Path object, or string pointing to a file or directory
|
|
:param strategy: Strategy to load - mainly relevant for multi-strategy backtests
|
|
Can also serve as protection to load the correct result.
|
|
:return: a dataframe with the analysis results
|
|
:raise: ValueError if loading goes wrong.
|
|
"""
|
|
data = load_backtest_stats(filename)
|
|
if not isinstance(data, list):
|
|
# new, nested format
|
|
if "strategy" not in data:
|
|
raise ValueError("Unknown dataformat.")
|
|
|
|
if not strategy:
|
|
if len(data["strategy"]) == 1:
|
|
strategy = list(data["strategy"].keys())[0]
|
|
else:
|
|
raise ValueError(
|
|
"Detected backtest result with more than one strategy. "
|
|
"Please specify a strategy."
|
|
)
|
|
|
|
if strategy not in data["strategy"]:
|
|
raise ValueError(
|
|
f"Strategy {strategy} not available in the backtest result. "
|
|
f"Available strategies are '{','.join(data['strategy'].keys())}'"
|
|
)
|
|
|
|
data = data["strategy"][strategy]["trades"]
|
|
df = pd.DataFrame(data)
|
|
if not df.empty:
|
|
df = _load_backtest_data_df_compatibility(df)
|
|
|
|
else:
|
|
# old format - only with lists.
|
|
raise OperationalException(
|
|
"Backtest-results with only trades data are no longer supported."
|
|
)
|
|
if not df.empty:
|
|
df = df.sort_values("open_date").reset_index(drop=True)
|
|
return df
|
|
|
|
|
|
def analyze_trade_parallelism(results: pd.DataFrame, timeframe: str) -> pd.DataFrame:
|
|
"""
|
|
Find overlapping trades by expanding each trade once per period it was open
|
|
and then counting overlaps.
|
|
:param results: Results Dataframe - can be loaded
|
|
:param timeframe: Timeframe used for backtest
|
|
:return: dataframe with open-counts per time-period in timeframe
|
|
"""
|
|
from freqtrade.exchange import timeframe_to_resample_freq
|
|
|
|
timeframe_freq = timeframe_to_resample_freq(timeframe)
|
|
dates = [
|
|
pd.Series(pd.date_range(row[1]["open_date"], row[1]["close_date"], freq=timeframe_freq))
|
|
for row in results[["open_date", "close_date"]].iterrows()
|
|
]
|
|
deltas = [len(x) for x in dates]
|
|
dates = pd.Series(pd.concat(dates).values, name="date")
|
|
df2 = pd.DataFrame(np.repeat(results.values, deltas, axis=0), columns=results.columns)
|
|
|
|
df2 = pd.concat([dates, df2], axis=1)
|
|
df2 = df2.set_index("date")
|
|
df_final = df2.resample(timeframe_freq)[["pair"]].count()
|
|
df_final = df_final.rename({"pair": "open_trades"}, axis=1)
|
|
return df_final
|
|
|
|
|
|
def evaluate_result_multi(
|
|
results: pd.DataFrame, timeframe: str, max_open_trades: IntOrInf
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Find overlapping trades by expanding each trade once per period it was open
|
|
and then counting overlaps
|
|
:param results: Results Dataframe - can be loaded
|
|
:param timeframe: Frequency used for the backtest
|
|
:param max_open_trades: parameter max_open_trades used during backtest run
|
|
:return: dataframe with open-counts per time-period in freq
|
|
"""
|
|
df_final = analyze_trade_parallelism(results, timeframe)
|
|
return df_final[df_final["open_trades"] > max_open_trades]
|
|
|
|
|
|
def trade_list_to_dataframe(trades: Union[List[Trade], List[LocalTrade]]) -> pd.DataFrame:
|
|
"""
|
|
Convert list of Trade objects to pandas Dataframe
|
|
:param trades: List of trade objects
|
|
:return: Dataframe with BT_DATA_COLUMNS
|
|
"""
|
|
df = pd.DataFrame.from_records([t.to_json(True) for t in trades], columns=BT_DATA_COLUMNS)
|
|
if len(df) > 0:
|
|
df["close_date"] = pd.to_datetime(df["close_date"], utc=True)
|
|
df["open_date"] = pd.to_datetime(df["open_date"], utc=True)
|
|
df["close_rate"] = df["close_rate"].astype("float64")
|
|
return df
|
|
|
|
|
|
def load_trades_from_db(db_url: str, strategy: Optional[str] = None) -> pd.DataFrame:
|
|
"""
|
|
Load trades from a DB (using dburl)
|
|
:param db_url: Sqlite url (default format sqlite:///tradesv3.dry-run.sqlite)
|
|
:param strategy: Strategy to load - mainly relevant for multi-strategy backtests
|
|
Can also serve as protection to load the correct result.
|
|
:return: Dataframe containing Trades
|
|
"""
|
|
init_db(db_url)
|
|
|
|
filters = []
|
|
if strategy:
|
|
filters.append(Trade.strategy == strategy)
|
|
trades = trade_list_to_dataframe(list(Trade.get_trades(filters).all()))
|
|
|
|
return trades
|
|
|
|
|
|
def load_trades(
|
|
source: str,
|
|
db_url: str,
|
|
exportfilename: Path,
|
|
no_trades: bool = False,
|
|
strategy: Optional[str] = None,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Based on configuration option 'trade_source':
|
|
* loads data from DB (using `db_url`)
|
|
* loads data from backtestfile (using `exportfilename`)
|
|
:param source: "DB" or "file" - specify source to load from
|
|
:param db_url: sqlalchemy formatted url to a database
|
|
:param exportfilename: Json file generated by backtesting
|
|
:param no_trades: Skip using trades, only return backtesting data columns
|
|
:return: DataFrame containing trades
|
|
"""
|
|
if no_trades:
|
|
df = pd.DataFrame(columns=BT_DATA_COLUMNS)
|
|
return df
|
|
|
|
if source == "DB":
|
|
return load_trades_from_db(db_url)
|
|
elif source == "file":
|
|
return load_backtest_data(exportfilename, strategy)
|
|
|
|
|
|
def extract_trades_of_period(
|
|
dataframe: pd.DataFrame, trades: pd.DataFrame, date_index=False
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Compare trades and backtested pair DataFrames to get trades performed on backtested period
|
|
:return: the DataFrame of a trades of period
|
|
"""
|
|
if date_index:
|
|
trades_start = dataframe.index[0]
|
|
trades_stop = dataframe.index[-1]
|
|
else:
|
|
trades_start = dataframe.iloc[0]["date"]
|
|
trades_stop = dataframe.iloc[-1]["date"]
|
|
trades = trades.loc[
|
|
(trades["open_date"] >= trades_start) & (trades["close_date"] <= trades_stop)
|
|
]
|
|
return trades
|