freqtrade_origin/freqtrade/strategy/backtest_lookahead_bias_checker.py

242 lines
10 KiB
Python

# pragma pylint: disable=missing-docstring, W0212, line-too-long, C0103, unused-argument
from copy import deepcopy
from datetime import datetime, timedelta, timezone
import pandas
from freqtrade.configuration import TimeRange
from freqtrade.data.history import get_timerange
from freqtrade.exchange import timeframe_to_minutes
from freqtrade.optimize.backtesting import Backtesting
class backtest_lookahead_bias_checker:
class varHolder:
timerange: TimeRange
data: pandas.DataFrame
indicators: pandas.DataFrame
result: pandas.DataFrame
compared: pandas.DataFrame
from_dt: datetime
to_dt: datetime
compared_dt: datetime
class analysis:
def __init__(self):
self.total_signals = 0
self.false_entry_signals = 0
self.false_exit_signals = 0
self.false_indicators = []
self.has_bias = False
total_signals: int
false_entry_signals: int
false_exit_signals: int
false_indicators: list
has_bias: bool
def __init__(self):
self.strategy_obj
self.current_analysis
self.config
self.full_varHolder
self.entry_varholder
self.exit_varholder
self.backtesting
self.signals_to_check: int = 20
self.current_analysis
self.full_varHolder.from_dt
self.full_varHolder.to_dt
@staticmethod
def dt_to_timestamp(dt):
timestamp = int(dt.replace(tzinfo=timezone.utc).timestamp())
return timestamp
def get_result(self, backtesting, processed):
min_date, max_date = get_timerange(processed)
result = backtesting.backtest(
processed=deepcopy(processed),
start_date=min_date,
end_date=max_date
)
return result
# analyzes two data frames with processed indicators and shows differences between them.
def analyze_indicators(self, full_vars, cut_vars, current_pair):
# extract dataframes
cut_df = cut_vars.indicators[current_pair]
full_df = full_vars.indicators[current_pair]
# cut longer dataframe to length of the shorter
full_df_cut = full_df[
(full_df.date == cut_vars.compared_dt)
].reset_index(drop=True)
cut_df_cut = cut_df[
(cut_df.date == cut_vars.compared_dt)
].reset_index(drop=True)
# compare dataframes
if full_df_cut.shape[0] != 0:
if cut_df_cut.shape[0] != 0:
compare_df = full_df_cut.compare(cut_df_cut)
# skippedColumns = ["date", "open", "high", "low", "close", "volume"]
for col_name, values in compare_df.items():
col_idx = compare_df.columns.get_loc(col_name)
compare_df_row = compare_df.iloc[0]
# compare_df now is comprised of tuples with [1] having either 'self' or 'other'
if 'other' in col_name[1]:
continue
self_value = compare_df_row[col_idx]
other_value = compare_df_row[col_idx + 1]
other_value = compare_df_row[col_idx + 1]
# output differences
if self_value != other_value:
if not self.current_analysis.false_indicators.__contains__(col_name[0]):
self.current_analysis.false_indicators.append(col_name[0])
print(f"=> found look ahead bias in indicator {col_name[0]}. " +
f"{str(self_value)} != {str(other_value)}")
# return compare_df
def report_signal(self, result, column_name, checked_timestamp):
df = result['results']
row_count = df[column_name].shape[0]
if row_count == 0:
return False
else:
df_cut = df[(df[column_name] == checked_timestamp)]
if df_cut[column_name].shape[0] == 0:
# print("did NOT find the same signal in column " + column_name +
# " at timestamp " + str(checked_timestamp))
return False
else:
return True
return False
def prepare_data(self, varholder, var_pairs):
self.config['timerange'] = \
str(int(self.dt_to_timestamp(varholder.from_dt))) + "-" + \
str(int(self.dt_to_timestamp(varholder.to_dt)))
self.backtesting = Backtesting(self.config)
self.backtesting._set_strategy(self.backtesting.strategylist[0])
varholder.data, varholder.timerange = self.backtesting.load_bt_data()
varholder.indicators = self.backtesting.strategy.advise_all_indicators(varholder.data)
varholder.result = self.get_result(self.backtesting, varholder.indicators)
def start(self, config, strategy_obj: dict) -> None:
self.strategy_obj = strategy_obj
self.config = config
self.current_analysis = backtest_lookahead_bias_checker.analysis()
max_try_signals: int = 3
found_signals: int = 0
continue_with_strategy = True
# first we need to get the necessary entry/exit signals
# so we start by 14 days and increase in 1 month steps
# until we have the desired trade amount.
for try_buysignals in range(max_try_signals): # range(3) = 0..2
# re-initialize backtesting-variable
self.full_varHolder = backtest_lookahead_bias_checker.varHolder()
# define datetimes in human readable format
self.full_varHolder.from_dt = datetime(2022, 9, 1)
self.full_varHolder.to_dt = datetime(2022, 9, 15) + timedelta(days=30 * try_buysignals)
self.prepare_data(self.full_varHolder, self.config['pairs'])
found_signals = self.full_varHolder.result['results'].shape[0] + 1
if try_buysignals == max_try_signals - 1:
if found_signals < self.signals_to_check / 2:
print(f"... only found {str(int(found_signals / 2))} "
f"buy signals for {self.strategy_obj['name']}. "
f"Cancelling...")
continue_with_strategy = False
else:
print(
f"Found {str(found_signals)} buy signals. "
f"Going with max {str(self.signals_to_check)} "
f" buy signals in the full timerange from "
f"{str(self.full_varHolder.from_dt)} to {str(self.full_varHolder.to_dt)}")
break
elif found_signals < self.signals_to_check:
print(
f"Only found {str(found_signals)} buy signals in the full timerange from "
f"{str(self.full_varHolder.from_dt)} to "
f"{str(self.full_varHolder.to_dt)}. "
f"will increase timerange trying to get at least "
f"{str(self.signals_to_check)} signals.")
else:
print(
f"Found {str(found_signals)} buy signals, more than necessary. "
f"Reducing to {str(self.signals_to_check)} "
f"checked buy signals in the full timerange from "
f"{str(self.full_varHolder.from_dt)} to {str(self.full_varHolder.to_dt)}")
break
if not continue_with_strategy:
return
for idx, result_row in self.full_varHolder.result['results'].iterrows():
if self.current_analysis.total_signals == self.signals_to_check:
break
# if force-sold, ignore this signal since here it will unconditionally exit.
if result_row.close_date == self.dt_to_timestamp(self.full_varHolder.to_dt):
continue
self.current_analysis.total_signals += 1
self.entry_varholder = backtest_lookahead_bias_checker.varHolder()
self.exit_varholder = backtest_lookahead_bias_checker.varHolder()
self.entry_varholder.from_dt = self.full_varHolder.from_dt # result_row['open_date']
self.entry_varholder.compared_dt = result_row['open_date']
# to_dt needs +1 candle since it won't buy on the last candle
self.entry_varholder.to_dt = result_row['open_date'] + \
timedelta(minutes=timeframe_to_minutes(self.config['timeframe']) * 2)
self.prepare_data(self.entry_varholder, [result_row['pair']])
# ---
# print("analyzing the sell signal")
# to_dt needs +1 candle since it will always sell all trades on the last candle
self.exit_varholder.from_dt = self.full_varHolder.from_dt # result_row['open_date']
self.exit_varholder.to_dt = \
result_row['close_date'] + \
timedelta(minutes=timeframe_to_minutes(self.config['timeframe']))
self.exit_varholder.compared_dt = result_row['close_date']
self.prepare_data(self.exit_varholder, [result_row['pair']])
# register if buy signal is broken
if not self.report_signal(
self.entry_varholder.result,
"open_date", self.entry_varholder.compared_dt):
self.current_analysis.false_entry_signals += 1
# register if buy or sell signal is broken
if not self.report_signal(self.entry_varholder.result,
"open_date", self.entry_varholder.compared_dt) \
or not self.report_signal(self.exit_varholder.result,
"close_date", self.exit_varholder.compared_dt):
self.current_analysis.false_exit_signals += 1
self.analyze_indicators(self.full_varHolder, self.entry_varholder, result_row['pair'])
self.analyze_indicators(self.full_varHolder, self.exit_varholder, result_row['pair'])
if self.current_analysis.false_entry_signals > 0 or \
self.current_analysis.false_exit_signals > 0 or \
len(self.current_analysis.false_indicators) > 0:
print(" => " + self.strategy_obj['name'] + ": bias detected!")
self.current_analysis.has_bias = True
else:
print(self.strategy_obj['name'] + ": no bias detected")