mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-10 10:21:59 +00:00
2b416d3b62
- optimized pairs for entry_varholder and exit_varholder to only check a single pair instead of all pairs. - bias-check of freqai strategies now possible - added condition to not crash when compared_df is empty (meaning no differences have been found)
192 lines
7.5 KiB
Python
Executable File
192 lines
7.5 KiB
Python
Executable File
import logging
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict
|
|
|
|
import pandas as pd
|
|
from tabulate import tabulate
|
|
|
|
from freqtrade.configuration import setup_utils_configuration
|
|
from freqtrade.enums import RunMode
|
|
from freqtrade.resolvers import StrategyResolver
|
|
from freqtrade.strategy.backtest_lookahead_bias_checker import BacktestLookaheadBiasChecker
|
|
from freqtrade.strategy.strategyupdater import StrategyUpdater
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def start_strategy_update(args: Dict[str, Any]) -> None:
|
|
"""
|
|
Start the strategy updating script
|
|
:param args: Cli args from Arguments()
|
|
:return: None
|
|
"""
|
|
|
|
if sys.version_info == (3, 8): # pragma: no cover
|
|
sys.exit("Freqtrade strategy updater requires Python version >= 3.9")
|
|
|
|
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
|
|
|
|
strategy_objs = StrategyResolver.search_all_objects(
|
|
config, enum_failed=False, recursive=config.get('recursive_strategy_search', False))
|
|
|
|
filtered_strategy_objs = []
|
|
if args['strategy_list']:
|
|
filtered_strategy_objs = [
|
|
strategy_obj for strategy_obj in strategy_objs
|
|
if strategy_obj['name'] in args['strategy_list']
|
|
]
|
|
|
|
else:
|
|
# Use all available entries.
|
|
filtered_strategy_objs = strategy_objs
|
|
|
|
processed_locations = set()
|
|
for strategy_obj in filtered_strategy_objs:
|
|
if strategy_obj['location'] not in processed_locations:
|
|
processed_locations.add(strategy_obj['location'])
|
|
start_conversion(strategy_obj, config)
|
|
|
|
|
|
def start_conversion(strategy_obj, config):
|
|
print(f"Conversion of {Path(strategy_obj['location']).name} started.")
|
|
instance_strategy_updater = StrategyUpdater()
|
|
start = time.perf_counter()
|
|
instance_strategy_updater.start(config, strategy_obj)
|
|
elapsed = time.perf_counter() - start
|
|
print(f"Conversion of {Path(strategy_obj['location']).name} took {elapsed:.1f} seconds.")
|
|
|
|
# except:
|
|
# pass
|
|
|
|
|
|
def start_backtest_lookahead_bias_checker(args: Dict[str, Any]) -> None:
|
|
"""
|
|
Start the backtest bias tester script
|
|
:param args: Cli args from Arguments()
|
|
:return: None
|
|
"""
|
|
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
|
|
|
|
if args['targeted_trade_amount'] < args['minimum_trade_amount']:
|
|
# add logic that tells the user to check the configuration
|
|
# since this combo doesn't make any sense.
|
|
pass
|
|
|
|
strategy_objs = StrategyResolver.search_all_objects(
|
|
config, enum_failed=False, recursive=config.get('recursive_strategy_search', False))
|
|
|
|
bias_checker_instances = []
|
|
filtered_strategy_objs = []
|
|
if 'strategy_list' in args and args['strategy_list'] is not None:
|
|
for args_strategy in args['strategy_list']:
|
|
for strategy_obj in strategy_objs:
|
|
if (strategy_obj['name'] == args_strategy
|
|
and strategy_obj not in filtered_strategy_objs):
|
|
filtered_strategy_objs.append(strategy_obj)
|
|
break
|
|
|
|
for filtered_strategy_obj in filtered_strategy_objs:
|
|
bias_checker_instances.append(
|
|
initialize_single_lookahead_bias_checker(filtered_strategy_obj, config, args))
|
|
elif 'strategy' in args and args['strategy'] is not None:
|
|
for strategy_obj in strategy_objs:
|
|
if strategy_obj['name'] == args['strategy']:
|
|
bias_checker_instances.append(
|
|
initialize_single_lookahead_bias_checker(strategy_obj, config, args))
|
|
break
|
|
else:
|
|
processed_locations = set()
|
|
for strategy_obj in strategy_objs:
|
|
if strategy_obj['location'] not in processed_locations:
|
|
processed_locations.add(strategy_obj['location'])
|
|
bias_checker_instances.append(
|
|
initialize_single_lookahead_bias_checker(strategy_obj, config, args))
|
|
text_table_bias_checker_instances(bias_checker_instances)
|
|
export_to_csv(args, bias_checker_instances)
|
|
|
|
|
|
def text_table_bias_checker_instances(bias_checker_instances):
|
|
headers = ['filename', 'strategy', 'has_bias',
|
|
'total_signals', 'biased_entry_signals', 'biased_exit_signals', 'biased_indicators']
|
|
data = []
|
|
for current_instance in bias_checker_instances:
|
|
if current_instance.failed_bias_check:
|
|
data.append(
|
|
[
|
|
current_instance.strategy_obj['location'].parts[-1],
|
|
current_instance.strategy_obj['name'],
|
|
'error while checking'
|
|
]
|
|
)
|
|
else:
|
|
data.append(
|
|
[
|
|
current_instance.strategy_obj['location'].parts[-1],
|
|
current_instance.strategy_obj['name'],
|
|
current_instance.current_analysis.has_bias,
|
|
current_instance.current_analysis.total_signals,
|
|
current_instance.current_analysis.false_entry_signals,
|
|
current_instance.current_analysis.false_exit_signals,
|
|
", ".join(current_instance.current_analysis.false_indicators)
|
|
]
|
|
)
|
|
table = tabulate(data, headers=headers, tablefmt="orgtbl")
|
|
print(table)
|
|
|
|
|
|
def export_to_csv(args, bias_checker_instances):
|
|
def add_or_update_row(df, row_data):
|
|
if (
|
|
(df['filename'] == row_data['filename']) &
|
|
(df['strategy'] == row_data['strategy'])
|
|
).any():
|
|
# Update existing row
|
|
pd_series = pd.DataFrame([row_data])
|
|
df.loc[
|
|
(df['filename'] == row_data['filename']) &
|
|
(df['strategy'] == row_data['strategy'])
|
|
] = pd_series
|
|
else:
|
|
# Add new row
|
|
df = pd.concat([df, pd.DataFrame([row_data], columns=df.columns)])
|
|
|
|
return df
|
|
|
|
if Path(args['exportfilename']).exists():
|
|
# Read CSV file into a pandas dataframe
|
|
csv_df = pd.read_csv(args['exportfilename'])
|
|
else:
|
|
# Create a new empty DataFrame with the desired column names and set the index
|
|
csv_df = pd.DataFrame(columns=[
|
|
'filename', 'strategy', 'has_bias', 'total_signals',
|
|
'biased_entry_signals', 'biased_exit_signals', 'biased_indicators'
|
|
],
|
|
index=None)
|
|
|
|
for inst in bias_checker_instances:
|
|
new_row_data = {'filename': inst.strategy_obj['location'].parts[-1],
|
|
'strategy': inst.strategy_obj['name'],
|
|
'has_bias': inst.current_analysis.has_bias,
|
|
'total_signals': inst.current_analysis.total_signals,
|
|
'biased_entry_signals': inst.current_analysis.false_entry_signals,
|
|
'biased_exit_signals': inst.current_analysis.false_exit_signals,
|
|
'biased_indicators': ",".join(inst.current_analysis.false_indicators)}
|
|
csv_df = add_or_update_row(csv_df, new_row_data)
|
|
|
|
print(f"saving {args['exportfilename']}")
|
|
csv_df.to_csv(args['exportfilename'], index=False)
|
|
|
|
|
|
def initialize_single_lookahead_bias_checker(strategy_obj, config, args):
|
|
print(f"Bias test of {Path(strategy_obj['location']).name} started.")
|
|
start = time.perf_counter()
|
|
current_instance = BacktestLookaheadBiasChecker()
|
|
current_instance.start(config, strategy_obj, args)
|
|
elapsed = time.perf_counter() - start
|
|
print(f"checking look ahead bias via backtests of {Path(strategy_obj['location']).name} "
|
|
f"took {elapsed:.1f} seconds.")
|
|
return current_instance
|