mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-10 10:21:59 +00:00
462 lines
15 KiB
Python
462 lines
15 KiB
Python
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
|
|
import joblib
|
|
import pandas as pd
|
|
|
|
from freqtrade.configuration import TimeRange
|
|
from freqtrade.constants import Config
|
|
from freqtrade.data.btanalysis import (
|
|
BT_DATA_COLUMNS,
|
|
get_latest_backtest_filename,
|
|
load_backtest_data,
|
|
load_backtest_stats,
|
|
)
|
|
from freqtrade.exceptions import OperationalException
|
|
from freqtrade.util import print_df_rich_table
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _load_backtest_analysis_data(backtest_dir: Path, name: str):
|
|
if backtest_dir.is_dir():
|
|
scpf = Path(
|
|
backtest_dir,
|
|
Path(get_latest_backtest_filename(backtest_dir)).stem + "_" + name + ".pkl",
|
|
)
|
|
else:
|
|
scpf = Path(backtest_dir.parent / f"{backtest_dir.stem}_{name}.pkl")
|
|
|
|
try:
|
|
with scpf.open("rb") as scp:
|
|
loaded_data = joblib.load(scp)
|
|
logger.info(f"Loaded {name} candles: {str(scpf)}")
|
|
except Exception as e:
|
|
logger.error(f"Cannot load {name} data from pickled results: ", e)
|
|
return None
|
|
|
|
return loaded_data
|
|
|
|
|
|
def _load_rejected_signals(backtest_dir: Path):
|
|
return _load_backtest_analysis_data(backtest_dir, "rejected")
|
|
|
|
|
|
def _load_signal_candles(backtest_dir: Path):
|
|
return _load_backtest_analysis_data(backtest_dir, "signals")
|
|
|
|
|
|
def _load_exit_signal_candles(backtest_dir: Path) -> Dict[str, Dict[str, pd.DataFrame]]:
|
|
return _load_backtest_analysis_data(backtest_dir, "exited")
|
|
|
|
|
|
def _process_candles_and_indicators(
|
|
pairlist, strategy_name, trades, signal_candles, date_col: str = "open_date"
|
|
):
|
|
analysed_trades_dict: Dict[str, Dict] = {strategy_name: {}}
|
|
|
|
try:
|
|
logger.info(f"Processing {strategy_name} : {len(pairlist)} pairs")
|
|
|
|
for pair in pairlist:
|
|
if pair in signal_candles[strategy_name]:
|
|
analysed_trades_dict[strategy_name][pair] = _analyze_candles_and_indicators(
|
|
pair, trades, signal_candles[strategy_name][pair], date_col
|
|
)
|
|
except Exception as e:
|
|
print(f"Cannot process entry/exit reasons for {strategy_name}: ", e)
|
|
|
|
return analysed_trades_dict
|
|
|
|
|
|
def _analyze_candles_and_indicators(
|
|
pair: str, trades: pd.DataFrame, signal_candles: pd.DataFrame, date_col: str = "open_date"
|
|
) -> pd.DataFrame:
|
|
buyf = signal_candles
|
|
|
|
if len(buyf) > 0:
|
|
buyf = buyf.set_index("date", drop=False)
|
|
trades_red = trades.loc[trades["pair"] == pair].copy()
|
|
|
|
trades_inds = pd.DataFrame()
|
|
|
|
if trades_red.shape[0] > 0 and buyf.shape[0] > 0:
|
|
for t, v in trades_red.iterrows():
|
|
allinds = buyf.loc[(buyf["date"] < v[date_col])]
|
|
if allinds.shape[0] > 0:
|
|
tmp_inds = allinds.iloc[[-1]]
|
|
|
|
trades_red.loc[t, "signal_date"] = tmp_inds["date"].values[0]
|
|
trades_red.loc[t, "enter_reason"] = trades_red.loc[t, "enter_tag"]
|
|
tmp_inds.index.rename("signal_date", inplace=True)
|
|
trades_inds = pd.concat([trades_inds, tmp_inds])
|
|
|
|
if "signal_date" in trades_red:
|
|
trades_red["signal_date"] = pd.to_datetime(trades_red["signal_date"], utc=True)
|
|
trades_red.set_index("signal_date", inplace=True)
|
|
|
|
try:
|
|
trades_red = pd.merge(trades_red, trades_inds, on="signal_date", how="outer")
|
|
except Exception as e:
|
|
raise e
|
|
return trades_red
|
|
else:
|
|
return pd.DataFrame()
|
|
|
|
|
|
def _do_group_table_output(
|
|
bigdf,
|
|
glist,
|
|
csv_path: Path,
|
|
to_csv=False,
|
|
):
|
|
for g in glist:
|
|
# 0: summary wins/losses grouped by enter tag
|
|
if g == "0":
|
|
group_mask = ["enter_reason"]
|
|
wins = (
|
|
bigdf.loc[bigdf["profit_abs"] >= 0].groupby(group_mask).agg({"profit_abs": ["sum"]})
|
|
)
|
|
|
|
wins.columns = ["profit_abs_wins"]
|
|
loss = (
|
|
bigdf.loc[bigdf["profit_abs"] < 0].groupby(group_mask).agg({"profit_abs": ["sum"]})
|
|
)
|
|
loss.columns = ["profit_abs_loss"]
|
|
|
|
new = bigdf.groupby(group_mask).agg(
|
|
{"profit_abs": ["count", lambda x: sum(x > 0), lambda x: sum(x <= 0)]}
|
|
)
|
|
new = pd.concat([new, wins, loss], axis=1).fillna(0)
|
|
|
|
new["profit_tot"] = new["profit_abs_wins"] - abs(new["profit_abs_loss"])
|
|
new["wl_ratio_pct"] = (new.iloc[:, 1] / new.iloc[:, 0] * 100).fillna(0)
|
|
new["avg_win"] = (new["profit_abs_wins"] / new.iloc[:, 1]).fillna(0)
|
|
new["avg_loss"] = (new["profit_abs_loss"] / new.iloc[:, 2]).fillna(0)
|
|
|
|
new["exp_ratio"] = (
|
|
((1 + (new["avg_win"] / abs(new["avg_loss"]))) * (new["wl_ratio_pct"] / 100)) - 1
|
|
).fillna(0)
|
|
|
|
new.columns = [
|
|
"total_num_buys",
|
|
"wins",
|
|
"losses",
|
|
"profit_abs_wins",
|
|
"profit_abs_loss",
|
|
"profit_tot",
|
|
"wl_ratio_pct",
|
|
"avg_win",
|
|
"avg_loss",
|
|
"exp_ratio",
|
|
]
|
|
|
|
sortcols = ["total_num_buys"]
|
|
|
|
_print_table(
|
|
new, sortcols, show_index=True, name="Group 0:", to_csv=to_csv, csv_path=csv_path
|
|
)
|
|
|
|
else:
|
|
agg_mask = {
|
|
"profit_abs": ["count", "sum", "median", "mean"],
|
|
"profit_ratio": ["median", "mean", "sum"],
|
|
}
|
|
agg_cols = [
|
|
"num_buys",
|
|
"profit_abs_sum",
|
|
"profit_abs_median",
|
|
"profit_abs_mean",
|
|
"median_profit_pct",
|
|
"mean_profit_pct",
|
|
"total_profit_pct",
|
|
]
|
|
sortcols = ["profit_abs_sum", "enter_reason"]
|
|
|
|
# 1: profit summaries grouped by enter_tag
|
|
if g == "1":
|
|
group_mask = ["enter_reason"]
|
|
|
|
# 2: profit summaries grouped by enter_tag and exit_tag
|
|
if g == "2":
|
|
group_mask = ["enter_reason", "exit_reason"]
|
|
|
|
# 3: profit summaries grouped by pair and enter_tag
|
|
if g == "3":
|
|
group_mask = ["pair", "enter_reason"]
|
|
|
|
# 4: profit summaries grouped by pair, enter_ and exit_tag (this can get quite large)
|
|
if g == "4":
|
|
group_mask = ["pair", "enter_reason", "exit_reason"]
|
|
|
|
# 5: profit summaries grouped by exit_tag
|
|
if g == "5":
|
|
group_mask = ["exit_reason"]
|
|
sortcols = ["exit_reason"]
|
|
|
|
if group_mask:
|
|
new = bigdf.groupby(group_mask).agg(agg_mask).reset_index()
|
|
new.columns = group_mask + agg_cols
|
|
new["median_profit_pct"] = new["median_profit_pct"] * 100
|
|
new["mean_profit_pct"] = new["mean_profit_pct"] * 100
|
|
new["total_profit_pct"] = new["total_profit_pct"] * 100
|
|
|
|
_print_table(new, sortcols, name=f"Group {g}:", to_csv=to_csv, csv_path=csv_path)
|
|
else:
|
|
logger.warning("Invalid group mask specified.")
|
|
|
|
|
|
def _do_rejected_signals_output(
|
|
rejected_signals_df: pd.DataFrame, to_csv: bool = False, csv_path=None
|
|
) -> None:
|
|
cols = ["pair", "date", "enter_tag"]
|
|
sortcols = ["date", "pair", "enter_tag"]
|
|
_print_table(
|
|
rejected_signals_df[cols],
|
|
sortcols,
|
|
show_index=False,
|
|
name="Rejected Signals:",
|
|
to_csv=to_csv,
|
|
csv_path=csv_path,
|
|
)
|
|
|
|
|
|
def _select_rows_within_dates(df, timerange=None, df_date_col: str = "date"):
|
|
if timerange:
|
|
if timerange.starttype == "date":
|
|
df = df.loc[(df[df_date_col] >= timerange.startdt)]
|
|
if timerange.stoptype == "date":
|
|
df = df.loc[(df[df_date_col] < timerange.stopdt)]
|
|
return df
|
|
|
|
|
|
def _select_rows_by_tags(df, enter_reason_list, exit_reason_list):
|
|
if enter_reason_list and "all" not in enter_reason_list:
|
|
df = df.loc[(df["enter_reason"].isin(enter_reason_list))]
|
|
|
|
if exit_reason_list and "all" not in exit_reason_list:
|
|
df = df.loc[(df["exit_reason"].isin(exit_reason_list))]
|
|
return df
|
|
|
|
|
|
def prepare_results(
|
|
analysed_trades, stratname, enter_reason_list, exit_reason_list, timerange=None
|
|
) -> pd.DataFrame:
|
|
res_df = pd.DataFrame()
|
|
for pair, trades in analysed_trades[stratname].items():
|
|
if trades.shape[0] > 0:
|
|
trades.dropna(subset=["close_date"], inplace=True)
|
|
res_df = pd.concat([res_df, trades], ignore_index=True)
|
|
|
|
res_df = _select_rows_within_dates(res_df, timerange)
|
|
|
|
if res_df is not None and res_df.shape[0] > 0 and ("enter_reason" in res_df.columns):
|
|
res_df = _select_rows_by_tags(res_df, enter_reason_list, exit_reason_list)
|
|
|
|
return res_df
|
|
|
|
|
|
def print_results(
|
|
res_df: pd.DataFrame,
|
|
exit_df: pd.DataFrame,
|
|
analysis_groups: List[str],
|
|
indicator_list: List[str],
|
|
entry_only: bool,
|
|
exit_only: bool,
|
|
csv_path: Path,
|
|
rejected_signals=None,
|
|
to_csv=False,
|
|
):
|
|
if res_df.shape[0] > 0:
|
|
if analysis_groups:
|
|
_do_group_table_output(res_df, analysis_groups, to_csv=to_csv, csv_path=csv_path)
|
|
|
|
if rejected_signals is not None:
|
|
if rejected_signals.empty:
|
|
print("There were no rejected signals.")
|
|
else:
|
|
_do_rejected_signals_output(rejected_signals, to_csv=to_csv, csv_path=csv_path)
|
|
|
|
# NB this can be large for big dataframes!
|
|
if "all" in indicator_list:
|
|
_print_table(
|
|
res_df, show_index=False, name="Indicators:", to_csv=to_csv, csv_path=csv_path
|
|
)
|
|
elif indicator_list is not None and indicator_list:
|
|
available_inds = []
|
|
for ind in indicator_list:
|
|
if ind in res_df:
|
|
available_inds.append(ind)
|
|
|
|
merged_df = _merge_dfs(res_df, exit_df, available_inds, entry_only, exit_only)
|
|
|
|
_print_table(
|
|
merged_df,
|
|
sortcols=["exit_reason"],
|
|
show_index=False,
|
|
name="Indicators:",
|
|
to_csv=to_csv,
|
|
csv_path=csv_path,
|
|
)
|
|
else:
|
|
print("\\No trades to show")
|
|
|
|
|
|
def _merge_dfs(
|
|
entry_df: pd.DataFrame,
|
|
exit_df: pd.DataFrame,
|
|
available_inds: List[str],
|
|
entry_only: bool,
|
|
exit_only: bool,
|
|
):
|
|
merge_on = ["pair", "open_date"]
|
|
signal_wide_indicators = list(set(available_inds) - set(BT_DATA_COLUMNS))
|
|
columns_to_keep = merge_on + ["enter_reason", "exit_reason"]
|
|
|
|
if exit_df is None or exit_df.empty or entry_only is True:
|
|
return entry_df[columns_to_keep + available_inds]
|
|
|
|
if exit_only is True:
|
|
return pd.merge(
|
|
entry_df[columns_to_keep],
|
|
exit_df[merge_on + signal_wide_indicators],
|
|
on=merge_on,
|
|
suffixes=(" (entry)", " (exit)"),
|
|
)
|
|
|
|
return pd.merge(
|
|
entry_df[columns_to_keep + available_inds],
|
|
exit_df[merge_on + signal_wide_indicators],
|
|
on=merge_on,
|
|
suffixes=(" (entry)", " (exit)"),
|
|
)
|
|
|
|
|
|
def _print_table(
|
|
df: pd.DataFrame, sortcols=None, *, show_index=False, name=None, to_csv=False, csv_path: Path
|
|
):
|
|
if sortcols is not None:
|
|
data = df.sort_values(sortcols)
|
|
else:
|
|
data = df
|
|
|
|
if to_csv:
|
|
safe_name = Path(csv_path, name.lower().replace(" ", "_").replace(":", "") + ".csv")
|
|
data.to_csv(safe_name)
|
|
print(f"Saved {name} to {safe_name}")
|
|
else:
|
|
if name is not None:
|
|
print(name)
|
|
|
|
print_df_rich_table(data, data.keys(), show_index=show_index)
|
|
|
|
|
|
def process_entry_exit_reasons(config: Config):
|
|
try:
|
|
analysis_groups = config.get("analysis_groups", [])
|
|
enter_reason_list = config.get("enter_reason_list", ["all"])
|
|
exit_reason_list = config.get("exit_reason_list", ["all"])
|
|
indicator_list = config.get("indicator_list", [])
|
|
entry_only = config.get("entry_only", False)
|
|
exit_only = config.get("exit_only", False)
|
|
do_rejected = config.get("analysis_rejected", False)
|
|
to_csv = config.get("analysis_to_csv", False)
|
|
csv_path = Path(config.get("analysis_csv_path", config["exportfilename"]))
|
|
|
|
if entry_only is True and exit_only is True:
|
|
raise OperationalException(
|
|
"Cannot use --entry-only and --exit-only at the same time. Please choose one."
|
|
)
|
|
if to_csv and not csv_path.is_dir():
|
|
raise OperationalException(f"Specified directory {csv_path} does not exist.")
|
|
|
|
timerange = TimeRange.parse_timerange(
|
|
None if config.get("timerange") is None else str(config.get("timerange"))
|
|
)
|
|
|
|
backtest_stats = load_backtest_stats(config["exportfilename"])
|
|
|
|
for strategy_name, results in backtest_stats["strategy"].items():
|
|
trades = load_backtest_data(config["exportfilename"], strategy_name)
|
|
|
|
if trades is not None and not trades.empty:
|
|
signal_candles = _load_signal_candles(config["exportfilename"])
|
|
exit_signals = _load_exit_signal_candles(config["exportfilename"])
|
|
|
|
rej_df = None
|
|
if do_rejected:
|
|
rejected_signals_dict = _load_rejected_signals(config["exportfilename"])
|
|
rej_df = prepare_results(
|
|
rejected_signals_dict,
|
|
strategy_name,
|
|
enter_reason_list,
|
|
exit_reason_list,
|
|
timerange=timerange,
|
|
)
|
|
|
|
entry_df = _generate_dfs(
|
|
config["exchange"]["pair_whitelist"],
|
|
enter_reason_list,
|
|
exit_reason_list,
|
|
signal_candles,
|
|
strategy_name,
|
|
timerange,
|
|
trades,
|
|
"open_date",
|
|
)
|
|
|
|
exit_df = _generate_dfs(
|
|
config["exchange"]["pair_whitelist"],
|
|
enter_reason_list,
|
|
exit_reason_list,
|
|
exit_signals,
|
|
strategy_name,
|
|
timerange,
|
|
trades,
|
|
"close_date",
|
|
)
|
|
|
|
print_results(
|
|
entry_df,
|
|
exit_df,
|
|
analysis_groups,
|
|
indicator_list,
|
|
entry_only,
|
|
exit_only,
|
|
rejected_signals=rej_df,
|
|
to_csv=to_csv,
|
|
csv_path=csv_path,
|
|
)
|
|
|
|
except ValueError as e:
|
|
raise OperationalException(e) from e
|
|
|
|
|
|
def _generate_dfs(
|
|
pairlist: list,
|
|
enter_reason_list: list,
|
|
exit_reason_list: list,
|
|
signal_candles: Dict,
|
|
strategy_name: str,
|
|
timerange: TimeRange,
|
|
trades: pd.DataFrame,
|
|
date_col: str,
|
|
) -> pd.DataFrame:
|
|
analysed_trades_dict = _process_candles_and_indicators(
|
|
pairlist,
|
|
strategy_name,
|
|
trades,
|
|
signal_candles,
|
|
date_col,
|
|
)
|
|
res_df = prepare_results(
|
|
analysed_trades_dict,
|
|
strategy_name,
|
|
enter_reason_list,
|
|
exit_reason_list,
|
|
timerange=timerange,
|
|
)
|
|
return res_df
|