mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-13 03:33:55 +00:00
284 lines
12 KiB
Python
Executable File
284 lines
12 KiB
Python
Executable File
import logging
|
|
import shutil
|
|
from copy import deepcopy
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
|
|
from pandas import DataFrame
|
|
|
|
from freqtrade.data.history import get_timerange
|
|
from freqtrade.exchange import timeframe_to_minutes
|
|
from freqtrade.loggers.set_log_levels import (
|
|
reduce_verbosity_for_bias_tester,
|
|
restore_verbosity_for_bias_tester,
|
|
)
|
|
from freqtrade.optimize.backtesting import Backtesting
|
|
from freqtrade.optimize.base_analysis import BaseAnalysis, VarHolder
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Analysis:
|
|
def __init__(self) -> None:
|
|
self.total_signals = 0
|
|
self.false_entry_signals = 0
|
|
self.false_exit_signals = 0
|
|
self.false_indicators: List[str] = []
|
|
self.has_bias = False
|
|
|
|
|
|
class LookaheadAnalysis(BaseAnalysis):
|
|
def __init__(self, config: Dict[str, Any], strategy_obj: Dict):
|
|
super().__init__(config, strategy_obj)
|
|
|
|
self.entry_varHolders: List[VarHolder] = []
|
|
self.exit_varHolders: List[VarHolder] = []
|
|
|
|
self.current_analysis = Analysis()
|
|
self.minimum_trade_amount = config["minimum_trade_amount"]
|
|
self.targeted_trade_amount = config["targeted_trade_amount"]
|
|
|
|
@staticmethod
|
|
def get_result(backtesting: Backtesting, processed: DataFrame):
|
|
min_date, max_date = get_timerange(processed)
|
|
|
|
result = backtesting.backtest(
|
|
processed=deepcopy(processed), start_date=min_date, end_date=max_date
|
|
)
|
|
return result
|
|
|
|
@staticmethod
|
|
def report_signal(result: dict, column_name: str, checked_timestamp: datetime):
|
|
df = result["results"]
|
|
row_count = df[column_name].shape[0]
|
|
|
|
if row_count == 0:
|
|
return False
|
|
else:
|
|
df_cut = df[(df[column_name] == checked_timestamp)]
|
|
if df_cut[column_name].shape[0] == 0:
|
|
return False
|
|
else:
|
|
return True
|
|
return False
|
|
|
|
# analyzes two data frames with processed indicators and shows differences between them.
|
|
def analyze_indicators(self, full_vars: VarHolder, cut_vars: VarHolder, current_pair: str):
|
|
# extract dataframes
|
|
cut_df: DataFrame = cut_vars.indicators[current_pair]
|
|
full_df: DataFrame = full_vars.indicators[current_pair]
|
|
|
|
# cut longer dataframe to length of the shorter
|
|
full_df_cut = full_df[(full_df.date == cut_vars.compared_dt)].reset_index(drop=True)
|
|
cut_df_cut = cut_df[(cut_df.date == cut_vars.compared_dt)].reset_index(drop=True)
|
|
|
|
# check if dataframes are not empty
|
|
if full_df_cut.shape[0] != 0 and cut_df_cut.shape[0] != 0:
|
|
# compare dataframes
|
|
compare_df = full_df_cut.compare(cut_df_cut)
|
|
|
|
if compare_df.shape[0] > 0:
|
|
for col_name, values in compare_df.items():
|
|
col_idx = compare_df.columns.get_loc(col_name)
|
|
compare_df_row = compare_df.iloc[0]
|
|
# compare_df now comprises tuples with [1] having either 'self' or 'other'
|
|
if "other" in col_name[1]:
|
|
continue
|
|
self_value = compare_df_row.iloc[col_idx]
|
|
other_value = compare_df_row.iloc[col_idx + 1]
|
|
|
|
# output differences
|
|
if self_value != other_value:
|
|
if not self.current_analysis.false_indicators.__contains__(col_name[0]):
|
|
self.current_analysis.false_indicators.append(col_name[0])
|
|
logger.info(
|
|
f"=> found look ahead bias in indicator "
|
|
f"{col_name[0]}. "
|
|
f"{str(self_value)} != {str(other_value)}"
|
|
)
|
|
|
|
def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]):
|
|
if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]:
|
|
# purge previous data if the freqai model is defined
|
|
# (to be sure nothing is carried over from older backtests)
|
|
path_to_current_identifier = Path(
|
|
f"{self.local_config['user_data_dir']}/models/"
|
|
f"{self.local_config['freqai']['identifier']}"
|
|
).resolve()
|
|
# remove folder and its contents
|
|
if Path.exists(path_to_current_identifier):
|
|
shutil.rmtree(path_to_current_identifier)
|
|
|
|
prepare_data_config = deepcopy(self.local_config)
|
|
prepare_data_config["timerange"] = (
|
|
str(self.dt_to_timestamp(varholder.from_dt))
|
|
+ "-"
|
|
+ str(self.dt_to_timestamp(varholder.to_dt))
|
|
)
|
|
prepare_data_config["exchange"]["pair_whitelist"] = pairs_to_load
|
|
|
|
if self._fee is not None:
|
|
# Don't re-calculate fee per pair, as fee might differ per pair.
|
|
prepare_data_config["fee"] = self._fee
|
|
|
|
backtesting = Backtesting(prepare_data_config, self.exchange)
|
|
self.exchange = backtesting.exchange
|
|
self._fee = backtesting.fee
|
|
backtesting._set_strategy(backtesting.strategylist[0])
|
|
|
|
varholder.data, varholder.timerange = backtesting.load_bt_data()
|
|
backtesting.load_bt_data_detail()
|
|
varholder.timeframe = backtesting.timeframe
|
|
|
|
varholder.indicators = backtesting.strategy.advise_all_indicators(varholder.data)
|
|
varholder.result = self.get_result(backtesting, varholder.indicators)
|
|
|
|
def fill_entry_and_exit_varHolders(self, result_row):
|
|
# entry_varHolder
|
|
entry_varHolder = VarHolder()
|
|
self.entry_varHolders.append(entry_varHolder)
|
|
entry_varHolder.from_dt = self.full_varHolder.from_dt
|
|
entry_varHolder.compared_dt = result_row["open_date"]
|
|
# to_dt needs +1 candle since it won't buy on the last candle
|
|
entry_varHolder.to_dt = result_row["open_date"] + timedelta(
|
|
minutes=timeframe_to_minutes(self.full_varHolder.timeframe)
|
|
)
|
|
self.prepare_data(entry_varHolder, [result_row["pair"]])
|
|
|
|
# exit_varHolder
|
|
exit_varHolder = VarHolder()
|
|
self.exit_varHolders.append(exit_varHolder)
|
|
# to_dt needs +1 candle since it will always exit/force-exit trades on the last candle
|
|
exit_varHolder.from_dt = self.full_varHolder.from_dt
|
|
exit_varHolder.to_dt = result_row["close_date"] + timedelta(
|
|
minutes=timeframe_to_minutes(self.full_varHolder.timeframe)
|
|
)
|
|
exit_varHolder.compared_dt = result_row["close_date"]
|
|
self.prepare_data(exit_varHolder, [result_row["pair"]])
|
|
|
|
# now we analyze a full trade of full_varholder and look for analyze its bias
|
|
def analyze_row(self, idx: int, result_row):
|
|
# if force-sold, ignore this signal since here it will unconditionally exit.
|
|
if result_row.close_date == self.dt_to_timestamp(self.full_varHolder.to_dt):
|
|
return
|
|
|
|
# keep track of how many signals are processed at total
|
|
self.current_analysis.total_signals += 1
|
|
|
|
# fill entry_varHolder and exit_varHolder
|
|
self.fill_entry_and_exit_varHolders(result_row)
|
|
|
|
# this will trigger a logger-message
|
|
buy_or_sell_biased: bool = False
|
|
|
|
# register if buy signal is broken
|
|
if not self.report_signal(
|
|
self.entry_varHolders[idx].result, "open_date", self.entry_varHolders[idx].compared_dt
|
|
):
|
|
self.current_analysis.false_entry_signals += 1
|
|
buy_or_sell_biased = True
|
|
|
|
# register if buy or sell signal is broken
|
|
if not self.report_signal(
|
|
self.exit_varHolders[idx].result, "close_date", self.exit_varHolders[idx].compared_dt
|
|
):
|
|
self.current_analysis.false_exit_signals += 1
|
|
buy_or_sell_biased = True
|
|
|
|
if buy_or_sell_biased:
|
|
logger.info(
|
|
f"found lookahead-bias in trade "
|
|
f"pair: {result_row['pair']}, "
|
|
f"timerange:{result_row['open_date']} - {result_row['close_date']}, "
|
|
f"idx: {idx}"
|
|
)
|
|
|
|
# check if the indicators themselves contain biased data
|
|
self.analyze_indicators(self.full_varHolder, self.entry_varHolders[idx], result_row["pair"])
|
|
self.analyze_indicators(self.full_varHolder, self.exit_varHolders[idx], result_row["pair"])
|
|
|
|
def start(self) -> None:
|
|
super().start()
|
|
|
|
reduce_verbosity_for_bias_tester()
|
|
|
|
# check if requirements have been met of full_varholder
|
|
found_signals: int = self.full_varHolder.result["results"].shape[0] + 1
|
|
if found_signals >= self.targeted_trade_amount:
|
|
logger.info(
|
|
f"Found {found_signals} trades, "
|
|
f"calculating {self.targeted_trade_amount} trades."
|
|
)
|
|
elif self.targeted_trade_amount >= found_signals >= self.minimum_trade_amount:
|
|
logger.info(f"Only found {found_signals} trades. Calculating all available trades.")
|
|
else:
|
|
logger.info(
|
|
f"found {found_signals} trades "
|
|
f"which is less than minimum_trade_amount {self.minimum_trade_amount}. "
|
|
f"Cancelling this backtest lookahead bias test."
|
|
)
|
|
return
|
|
|
|
# now we loop through all signals
|
|
# starting from the same datetime to avoid miss-reports of bias
|
|
for idx, result_row in self.full_varHolder.result["results"].iterrows():
|
|
if self.current_analysis.total_signals == self.targeted_trade_amount:
|
|
logger.info(f"Found targeted trade amount = {self.targeted_trade_amount} signals.")
|
|
break
|
|
if found_signals < self.minimum_trade_amount:
|
|
logger.info(
|
|
f"only found {found_signals} "
|
|
f"which is smaller than "
|
|
f"minimum trade amount = {self.minimum_trade_amount}. "
|
|
f"Exiting this lookahead-analysis"
|
|
)
|
|
return None
|
|
if "force_exit" in result_row["exit_reason"]:
|
|
logger.info(
|
|
"found force-exit in pair: {result_row['pair']}, "
|
|
f"timerange:{result_row['open_date']}-{result_row['close_date']}, "
|
|
f"idx: {idx}, skipping this one to avoid a false-positive."
|
|
)
|
|
|
|
# just to keep the IDs of both full, entry and exit varholders the same
|
|
# to achieve a better debugging experience
|
|
self.entry_varHolders.append(VarHolder())
|
|
self.exit_varHolders.append(VarHolder())
|
|
continue
|
|
|
|
self.analyze_row(idx, result_row)
|
|
|
|
if len(self.entry_varHolders) < self.minimum_trade_amount:
|
|
logger.info(
|
|
f"only found {found_signals} after skipping forced exits "
|
|
f"which is smaller than "
|
|
f"minimum trade amount = {self.minimum_trade_amount}. "
|
|
f"Exiting this lookahead-analysis"
|
|
)
|
|
|
|
# Restore verbosity, so it's not too quiet for the next strategy
|
|
restore_verbosity_for_bias_tester()
|
|
# check and report signals
|
|
if self.current_analysis.total_signals < self.local_config["minimum_trade_amount"]:
|
|
logger.info(
|
|
f" -> {self.local_config['strategy']} : too few trades. "
|
|
f"We only found {self.current_analysis.total_signals} trades. "
|
|
f"Hint: Extend the timerange "
|
|
f"to get at least {self.local_config['minimum_trade_amount']} "
|
|
f"or lower the value of minimum_trade_amount."
|
|
)
|
|
self.failed_bias_check = True
|
|
elif (
|
|
self.current_analysis.false_entry_signals > 0
|
|
or self.current_analysis.false_exit_signals > 0
|
|
or len(self.current_analysis.false_indicators) > 0
|
|
):
|
|
logger.info(f" => {self.local_config['strategy']} : bias detected!")
|
|
self.current_analysis.has_bias = True
|
|
self.failed_bias_check = False
|
|
else:
|
|
logger.info(self.local_config["strategy"] + ": no bias detected")
|
|
self.failed_bias_check = False
|