ruff format: freqtrade/configuration

This commit is contained in:
Matthias 2024-05-12 16:29:24 +02:00
parent 8ffc48e4f0
commit 9303ae29d3
10 changed files with 614 additions and 480 deletions

View File

@ -24,13 +24,13 @@ def sanitize_config(config: Config, *, show_sensitive: bool = False) -> Config:
]
config = deepcopy(config)
for key in keys_to_remove:
if '.' in key:
nested_keys = key.split('.')
if "." in key:
nested_keys = key.split(".")
nested_config = config
for nested_key in nested_keys[:-1]:
nested_config = nested_config.get(nested_key, {})
nested_config[nested_keys[-1]] = 'REDACTED'
nested_config[nested_keys[-1]] = "REDACTED"
else:
config[key] = 'REDACTED'
config[key] = "REDACTED"
return config

View File

@ -11,7 +11,8 @@ logger = logging.getLogger(__name__)
def setup_utils_configuration(
args: Dict[str, Any], method: RunMode, *, set_dry: bool = True) -> Dict[str, Any]:
args: Dict[str, Any], method: RunMode, *, set_dry: bool = True
) -> Dict[str, Any]:
"""
Prepare the configuration for utils subcommands
:param args: Cli args from Arguments()
@ -23,7 +24,7 @@ def setup_utils_configuration(
# Ensure these modes are using Dry-run
if set_dry:
config['dry_run'] = True
config["dry_run"] = True
validate_config_consistency(config, preliminary=True)
return config

View File

@ -20,18 +20,16 @@ def _extend_validator(validator_class):
Extended validator for the Freqtrade configuration JSON Schema.
Currently it only handles defaults for subschemas.
"""
validate_properties = validator_class.VALIDATORS['properties']
validate_properties = validator_class.VALIDATORS["properties"]
def set_defaults(validator, properties, instance, schema):
for prop, subschema in properties.items():
if 'default' in subschema:
instance.setdefault(prop, subschema['default'])
if "default" in subschema:
instance.setdefault(prop, subschema["default"])
yield from validate_properties(validator, properties, instance, schema)
return validators.extend(
validator_class, {'properties': set_defaults}
)
return validators.extend(validator_class, {"properties": set_defaults})
FreqtradeValidator = _extend_validator(Draft4Validator)
@ -44,27 +42,23 @@ def validate_config_schema(conf: Dict[str, Any], preliminary: bool = False) -> D
:return: Returns the config if valid, otherwise throw an exception
"""
conf_schema = deepcopy(constants.CONF_SCHEMA)
if conf.get('runmode', RunMode.OTHER) in (RunMode.DRY_RUN, RunMode.LIVE):
conf_schema['required'] = constants.SCHEMA_TRADE_REQUIRED
elif conf.get('runmode', RunMode.OTHER) in (RunMode.BACKTEST, RunMode.HYPEROPT):
if conf.get("runmode", RunMode.OTHER) in (RunMode.DRY_RUN, RunMode.LIVE):
conf_schema["required"] = constants.SCHEMA_TRADE_REQUIRED
elif conf.get("runmode", RunMode.OTHER) in (RunMode.BACKTEST, RunMode.HYPEROPT):
if preliminary:
conf_schema['required'] = constants.SCHEMA_BACKTEST_REQUIRED
conf_schema["required"] = constants.SCHEMA_BACKTEST_REQUIRED
else:
conf_schema['required'] = constants.SCHEMA_BACKTEST_REQUIRED_FINAL
elif conf.get('runmode', RunMode.OTHER) == RunMode.WEBSERVER:
conf_schema['required'] = constants.SCHEMA_MINIMAL_WEBSERVER
conf_schema["required"] = constants.SCHEMA_BACKTEST_REQUIRED_FINAL
elif conf.get("runmode", RunMode.OTHER) == RunMode.WEBSERVER:
conf_schema["required"] = constants.SCHEMA_MINIMAL_WEBSERVER
else:
conf_schema['required'] = constants.SCHEMA_MINIMAL_REQUIRED
conf_schema["required"] = constants.SCHEMA_MINIMAL_REQUIRED
try:
FreqtradeValidator(conf_schema).validate(conf)
return conf
except ValidationError as e:
logger.critical(
f"Invalid configuration. Reason: {e}"
)
raise ValidationError(
best_match(Draft4Validator(conf_schema).iter_errors(conf)).message
)
logger.critical(f"Invalid configuration. Reason: {e}")
raise ValidationError(best_match(Draft4Validator(conf_schema).iter_errors(conf)).message)
def validate_config_consistency(conf: Dict[str, Any], *, preliminary: bool = False) -> None:
@ -91,7 +85,7 @@ def validate_config_consistency(conf: Dict[str, Any], *, preliminary: bool = Fal
validate_migrated_strategy_settings(conf)
# validate configuration before returning
logger.info('Validating configuration ...')
logger.info("Validating configuration ...")
validate_config_schema(conf, preliminary=preliminary)
@ -100,9 +94,11 @@ def _validate_unlimited_amount(conf: Dict[str, Any]) -> None:
If edge is disabled, either max_open_trades or stake_amount need to be set.
:raise: ConfigurationError if config validation failed
"""
if (not conf.get('edge', {}).get('enabled')
and conf.get('max_open_trades') == float('inf')
and conf.get('stake_amount') == constants.UNLIMITED_STAKE_AMOUNT):
if (
not conf.get("edge", {}).get("enabled")
and conf.get("max_open_trades") == float("inf")
and conf.get("stake_amount") == constants.UNLIMITED_STAKE_AMOUNT
):
raise ConfigurationError("`max_open_trades` and `stake_amount` cannot both be unlimited.")
@ -111,45 +107,47 @@ def _validate_price_config(conf: Dict[str, Any]) -> None:
When using market orders, price sides must be using the "other" side of the price
"""
# TODO: The below could be an enforced setting when using market orders
if (conf.get('order_types', {}).get('entry') == 'market'
and conf.get('entry_pricing', {}).get('price_side') not in ('ask', 'other')):
raise ConfigurationError(
'Market entry orders require entry_pricing.price_side = "other".')
if conf.get("order_types", {}).get("entry") == "market" and conf.get("entry_pricing", {}).get(
"price_side"
) not in ("ask", "other"):
raise ConfigurationError('Market entry orders require entry_pricing.price_side = "other".')
if (conf.get('order_types', {}).get('exit') == 'market'
and conf.get('exit_pricing', {}).get('price_side') not in ('bid', 'other')):
if conf.get("order_types", {}).get("exit") == "market" and conf.get("exit_pricing", {}).get(
"price_side"
) not in ("bid", "other"):
raise ConfigurationError('Market exit orders require exit_pricing.price_side = "other".')
def _validate_trailing_stoploss(conf: Dict[str, Any]) -> None:
if conf.get('stoploss') == 0.0:
if conf.get("stoploss") == 0.0:
raise ConfigurationError(
'The config stoploss needs to be different from 0 to avoid problems with sell orders.'
"The config stoploss needs to be different from 0 to avoid problems with sell orders."
)
# Skip if trailing stoploss is not activated
if not conf.get('trailing_stop', False):
if not conf.get("trailing_stop", False):
return
tsl_positive = float(conf.get('trailing_stop_positive', 0))
tsl_offset = float(conf.get('trailing_stop_positive_offset', 0))
tsl_only_offset = conf.get('trailing_only_offset_is_reached', False)
tsl_positive = float(conf.get("trailing_stop_positive", 0))
tsl_offset = float(conf.get("trailing_stop_positive_offset", 0))
tsl_only_offset = conf.get("trailing_only_offset_is_reached", False)
if tsl_only_offset:
if tsl_positive == 0.0:
raise ConfigurationError(
'The config trailing_only_offset_is_reached needs '
'trailing_stop_positive_offset to be more than 0 in your config.')
"The config trailing_only_offset_is_reached needs "
"trailing_stop_positive_offset to be more than 0 in your config."
)
if tsl_positive > 0 and 0 < tsl_offset <= tsl_positive:
raise ConfigurationError(
'The config trailing_stop_positive_offset needs '
'to be greater than trailing_stop_positive in your config.')
"The config trailing_stop_positive_offset needs "
"to be greater than trailing_stop_positive in your config."
)
# Fetch again without default
if 'trailing_stop_positive' in conf and float(conf['trailing_stop_positive']) == 0.0:
if "trailing_stop_positive" in conf and float(conf["trailing_stop_positive"]) == 0.0:
raise ConfigurationError(
'The config trailing_stop_positive needs to be different from 0 '
'to avoid problems with sell orders.'
"The config trailing_stop_positive needs to be different from 0 "
"to avoid problems with sell orders."
)
@ -158,10 +156,10 @@ def _validate_edge(conf: Dict[str, Any]) -> None:
Edge and Dynamic whitelist should not both be enabled, since edge overrides dynamic whitelists.
"""
if not conf.get('edge', {}).get('enabled'):
if not conf.get("edge", {}).get("enabled"):
return
if not conf.get('use_exit_signal', True):
if not conf.get("use_exit_signal", True):
raise ConfigurationError(
"Edge requires `use_exit_signal` to be True, otherwise no sells will happen."
)
@ -171,13 +169,20 @@ def _validate_whitelist(conf: Dict[str, Any]) -> None:
"""
Dynamic whitelist does not require pair_whitelist to be set - however StaticWhitelist does.
"""
if conf.get('runmode', RunMode.OTHER) in [RunMode.OTHER, RunMode.PLOT,
RunMode.UTIL_NO_EXCHANGE, RunMode.UTIL_EXCHANGE]:
if conf.get("runmode", RunMode.OTHER) in [
RunMode.OTHER,
RunMode.PLOT,
RunMode.UTIL_NO_EXCHANGE,
RunMode.UTIL_EXCHANGE,
]:
return
for pl in conf.get('pairlists', [{'method': 'StaticPairList'}]):
if (isinstance(pl, dict) and pl.get('method') == 'StaticPairList'
and not conf.get('exchange', {}).get('pair_whitelist')):
for pl in conf.get("pairlists", [{"method": "StaticPairList"}]):
if (
isinstance(pl, dict)
and pl.get("method") == "StaticPairList"
and not conf.get("exchange", {}).get("pair_whitelist")
):
raise ConfigurationError("StaticPairList requires pair_whitelist to be set.")
@ -186,14 +191,14 @@ def _validate_protections(conf: Dict[str, Any]) -> None:
Validate protection configuration validity
"""
for prot in conf.get('protections', []):
if ('stop_duration' in prot and 'stop_duration_candles' in prot):
for prot in conf.get("protections", []):
if "stop_duration" in prot and "stop_duration_candles" in prot:
raise ConfigurationError(
"Protections must specify either `stop_duration` or `stop_duration_candles`.\n"
f"Please fix the protection {prot.get('method')}"
)
if ('lookback_period' in prot and 'lookback_period_candles' in prot):
if "lookback_period" in prot and "lookback_period_candles" in prot:
raise ConfigurationError(
"Protections must specify either `lookback_period` or `lookback_period_candles`.\n"
f"Please fix the protection {prot.get('method')}"
@ -201,10 +206,10 @@ def _validate_protections(conf: Dict[str, Any]) -> None:
def _validate_ask_orderbook(conf: Dict[str, Any]) -> None:
ask_strategy = conf.get('exit_pricing', {})
ob_min = ask_strategy.get('order_book_min')
ob_max = ask_strategy.get('order_book_max')
if ob_min is not None and ob_max is not None and ask_strategy.get('use_order_book'):
ask_strategy = conf.get("exit_pricing", {})
ob_min = ask_strategy.get("order_book_min")
ob_max = ask_strategy.get("order_book_max")
if ob_min is not None and ob_max is not None and ask_strategy.get("use_order_book"):
if ob_min != ob_max:
raise ConfigurationError(
"Using order_book_max != order_book_min in exit_pricing is no longer supported."
@ -212,7 +217,7 @@ def _validate_ask_orderbook(conf: Dict[str, Any]) -> None:
)
else:
# Move value to order_book_top
ask_strategy['order_book_top'] = ob_min
ask_strategy["order_book_top"] = ob_min
logger.warning(
"DEPRECATED: "
"Please use `order_book_top` instead of `order_book_min` and `order_book_max` "
@ -221,7 +226,6 @@ def _validate_ask_orderbook(conf: Dict[str, Any]) -> None:
def validate_migrated_strategy_settings(conf: Dict[str, Any]) -> None:
_validate_time_in_force(conf)
_validate_order_types(conf)
_validate_unfilledtimeout(conf)
@ -230,119 +234,129 @@ def validate_migrated_strategy_settings(conf: Dict[str, Any]) -> None:
def _validate_time_in_force(conf: Dict[str, Any]) -> None:
time_in_force = conf.get('order_time_in_force', {})
if 'buy' in time_in_force or 'sell' in time_in_force:
if conf.get('trading_mode', TradingMode.SPOT) != TradingMode.SPOT:
time_in_force = conf.get("order_time_in_force", {})
if "buy" in time_in_force or "sell" in time_in_force:
if conf.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT:
raise ConfigurationError(
"Please migrate your time_in_force settings to use 'entry' and 'exit'.")
"Please migrate your time_in_force settings to use 'entry' and 'exit'."
)
else:
logger.warning(
"DEPRECATED: Using 'buy' and 'sell' for time_in_force is deprecated."
"Please migrate your time_in_force settings to use 'entry' and 'exit'."
)
process_deprecated_setting(
conf, 'order_time_in_force', 'buy', 'order_time_in_force', 'entry')
conf, "order_time_in_force", "buy", "order_time_in_force", "entry"
)
process_deprecated_setting(
conf, 'order_time_in_force', 'sell', 'order_time_in_force', 'exit')
conf, "order_time_in_force", "sell", "order_time_in_force", "exit"
)
def _validate_order_types(conf: Dict[str, Any]) -> None:
order_types = conf.get('order_types', {})
old_order_types = ['buy', 'sell', 'emergencysell', 'forcebuy',
'forcesell', 'emergencyexit', 'forceexit', 'forceentry']
order_types = conf.get("order_types", {})
old_order_types = [
"buy",
"sell",
"emergencysell",
"forcebuy",
"forcesell",
"emergencyexit",
"forceexit",
"forceentry",
]
if any(x in order_types for x in old_order_types):
if conf.get('trading_mode', TradingMode.SPOT) != TradingMode.SPOT:
if conf.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT:
raise ConfigurationError(
"Please migrate your order_types settings to use the new wording.")
"Please migrate your order_types settings to use the new wording."
)
else:
logger.warning(
"DEPRECATED: Using 'buy' and 'sell' for order_types is deprecated."
"Please migrate your order_types settings to use 'entry' and 'exit' wording."
)
for o, n in [
('buy', 'entry'),
('sell', 'exit'),
('emergencysell', 'emergency_exit'),
('forcesell', 'force_exit'),
('forcebuy', 'force_entry'),
('emergencyexit', 'emergency_exit'),
('forceexit', 'force_exit'),
('forceentry', 'force_entry'),
("buy", "entry"),
("sell", "exit"),
("emergencysell", "emergency_exit"),
("forcesell", "force_exit"),
("forcebuy", "force_entry"),
("emergencyexit", "emergency_exit"),
("forceexit", "force_exit"),
("forceentry", "force_entry"),
]:
process_deprecated_setting(conf, 'order_types', o, 'order_types', n)
process_deprecated_setting(conf, "order_types", o, "order_types", n)
def _validate_unfilledtimeout(conf: Dict[str, Any]) -> None:
unfilledtimeout = conf.get('unfilledtimeout', {})
if any(x in unfilledtimeout for x in ['buy', 'sell']):
if conf.get('trading_mode', TradingMode.SPOT) != TradingMode.SPOT:
unfilledtimeout = conf.get("unfilledtimeout", {})
if any(x in unfilledtimeout for x in ["buy", "sell"]):
if conf.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT:
raise ConfigurationError(
"Please migrate your unfilledtimeout settings to use the new wording.")
"Please migrate your unfilledtimeout settings to use the new wording."
)
else:
logger.warning(
"DEPRECATED: Using 'buy' and 'sell' for unfilledtimeout is deprecated."
"Please migrate your unfilledtimeout settings to use 'entry' and 'exit' wording."
)
for o, n in [
('buy', 'entry'),
('sell', 'exit'),
("buy", "entry"),
("sell", "exit"),
]:
process_deprecated_setting(conf, 'unfilledtimeout', o, 'unfilledtimeout', n)
process_deprecated_setting(conf, "unfilledtimeout", o, "unfilledtimeout", n)
def _validate_pricing_rules(conf: Dict[str, Any]) -> None:
if conf.get('ask_strategy') or conf.get('bid_strategy'):
if conf.get('trading_mode', TradingMode.SPOT) != TradingMode.SPOT:
raise ConfigurationError(
"Please migrate your pricing settings to use the new wording.")
if conf.get("ask_strategy") or conf.get("bid_strategy"):
if conf.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT:
raise ConfigurationError("Please migrate your pricing settings to use the new wording.")
else:
logger.warning(
"DEPRECATED: Using 'ask_strategy' and 'bid_strategy' is deprecated."
"Please migrate your settings to use 'entry_pricing' and 'exit_pricing'."
)
conf['entry_pricing'] = {}
for obj in list(conf.get('bid_strategy', {}).keys()):
if obj == 'ask_last_balance':
process_deprecated_setting(conf, 'bid_strategy', obj,
'entry_pricing', 'price_last_balance')
conf["entry_pricing"] = {}
for obj in list(conf.get("bid_strategy", {}).keys()):
if obj == "ask_last_balance":
process_deprecated_setting(
conf, "bid_strategy", obj, "entry_pricing", "price_last_balance"
)
else:
process_deprecated_setting(conf, 'bid_strategy', obj, 'entry_pricing', obj)
del conf['bid_strategy']
process_deprecated_setting(conf, "bid_strategy", obj, "entry_pricing", obj)
del conf["bid_strategy"]
conf['exit_pricing'] = {}
for obj in list(conf.get('ask_strategy', {}).keys()):
if obj == 'bid_last_balance':
process_deprecated_setting(conf, 'ask_strategy', obj,
'exit_pricing', 'price_last_balance')
conf["exit_pricing"] = {}
for obj in list(conf.get("ask_strategy", {}).keys()):
if obj == "bid_last_balance":
process_deprecated_setting(
conf, "ask_strategy", obj, "exit_pricing", "price_last_balance"
)
else:
process_deprecated_setting(conf, 'ask_strategy', obj, 'exit_pricing', obj)
del conf['ask_strategy']
process_deprecated_setting(conf, "ask_strategy", obj, "exit_pricing", obj)
del conf["ask_strategy"]
def _validate_freqai_hyperopt(conf: Dict[str, Any]) -> None:
freqai_enabled = conf.get('freqai', {}).get('enabled', False)
analyze_per_epoch = conf.get('analyze_per_epoch', False)
freqai_enabled = conf.get("freqai", {}).get("enabled", False)
analyze_per_epoch = conf.get("analyze_per_epoch", False)
if analyze_per_epoch and freqai_enabled:
raise ConfigurationError(
'Using analyze-per-epoch parameter is not supported with a FreqAI strategy.')
"Using analyze-per-epoch parameter is not supported with a FreqAI strategy."
)
def _validate_freqai_include_timeframes(conf: Dict[str, Any], preliminary: bool) -> None:
freqai_enabled = conf.get('freqai', {}).get('enabled', False)
freqai_enabled = conf.get("freqai", {}).get("enabled", False)
if freqai_enabled:
main_tf = conf.get('timeframe', '5m')
freqai_include_timeframes = conf.get('freqai', {}).get('feature_parameters', {}
).get('include_timeframes', [])
main_tf = conf.get("timeframe", "5m")
freqai_include_timeframes = (
conf.get("freqai", {}).get("feature_parameters", {}).get("include_timeframes", [])
)
from freqtrade.exchange import timeframe_to_seconds
main_tf_s = timeframe_to_seconds(main_tf)
offending_lines = []
for tf in freqai_include_timeframes:
@ -352,57 +366,65 @@ def _validate_freqai_include_timeframes(conf: Dict[str, Any], preliminary: bool)
if offending_lines:
raise ConfigurationError(
f"Main timeframe of {main_tf} must be smaller or equal to FreqAI "
f"`include_timeframes`.Offending include-timeframes: {', '.join(offending_lines)}")
f"`include_timeframes`.Offending include-timeframes: {', '.join(offending_lines)}"
)
# Ensure that the base timeframe is included in the include_timeframes list
if not preliminary and main_tf not in freqai_include_timeframes:
feature_parameters = conf.get('freqai', {}).get('feature_parameters', {})
feature_parameters = conf.get("freqai", {}).get("feature_parameters", {})
include_timeframes = [main_tf] + freqai_include_timeframes
conf.get('freqai', {}).get('feature_parameters', {}) \
.update({**feature_parameters, 'include_timeframes': include_timeframes})
conf.get("freqai", {}).get("feature_parameters", {}).update(
{**feature_parameters, "include_timeframes": include_timeframes}
)
def _validate_freqai_backtest(conf: Dict[str, Any]) -> None:
if conf.get('runmode', RunMode.OTHER) == RunMode.BACKTEST:
freqai_enabled = conf.get('freqai', {}).get('enabled', False)
timerange = conf.get('timerange')
freqai_backtest_live_models = conf.get('freqai_backtest_live_models', False)
if conf.get("runmode", RunMode.OTHER) == RunMode.BACKTEST:
freqai_enabled = conf.get("freqai", {}).get("enabled", False)
timerange = conf.get("timerange")
freqai_backtest_live_models = conf.get("freqai_backtest_live_models", False)
if freqai_backtest_live_models and freqai_enabled and timerange:
raise ConfigurationError(
'Using timerange parameter is not supported with '
'--freqai-backtest-live-models parameter.')
"Using timerange parameter is not supported with "
"--freqai-backtest-live-models parameter."
)
if freqai_backtest_live_models and not freqai_enabled:
raise ConfigurationError(
'Using --freqai-backtest-live-models parameter is only '
'supported with a FreqAI strategy.')
"Using --freqai-backtest-live-models parameter is only "
"supported with a FreqAI strategy."
)
if freqai_enabled and not freqai_backtest_live_models and not timerange:
raise ConfigurationError(
'Please pass --timerange if you intend to use FreqAI for backtesting.')
"Please pass --timerange if you intend to use FreqAI for backtesting."
)
def _validate_consumers(conf: Dict[str, Any]) -> None:
emc_conf = conf.get('external_message_consumer', {})
if emc_conf.get('enabled', False):
if len(emc_conf.get('producers', [])) < 1:
emc_conf = conf.get("external_message_consumer", {})
if emc_conf.get("enabled", False):
if len(emc_conf.get("producers", [])) < 1:
raise ConfigurationError("You must specify at least 1 Producer to connect to.")
producer_names = [p['name'] for p in emc_conf.get('producers', [])]
producer_names = [p["name"] for p in emc_conf.get("producers", [])]
duplicates = [item for item, count in Counter(producer_names).items() if count > 1]
if duplicates:
raise ConfigurationError(
f"Producer names must be unique. Duplicate: {', '.join(duplicates)}")
if conf.get('process_only_new_candles', True):
f"Producer names must be unique. Duplicate: {', '.join(duplicates)}"
)
if conf.get("process_only_new_candles", True):
# Warning here or require it?
logger.warning("To receive best performance with external data, "
"please set `process_only_new_candles` to False")
logger.warning(
"To receive best performance with external data, "
"please set `process_only_new_candles` to False"
)
def _strategy_settings(conf: Dict[str, Any]) -> None:
process_deprecated_setting(conf, None, 'use_sell_signal', None, 'use_exit_signal')
process_deprecated_setting(conf, None, 'sell_profit_only', None, 'exit_profit_only')
process_deprecated_setting(conf, None, 'sell_profit_offset', None, 'exit_profit_offset')
process_deprecated_setting(conf, None, 'ignore_roi_if_buy_signal',
None, 'ignore_roi_if_entry_signal')
process_deprecated_setting(conf, None, "use_sell_signal", None, "use_exit_signal")
process_deprecated_setting(conf, None, "sell_profit_only", None, "exit_profit_only")
process_deprecated_setting(conf, None, "sell_profit_offset", None, "exit_profit_offset")
process_deprecated_setting(
conf, None, "ignore_roi_if_buy_signal", None, "ignore_roi_if_entry_signal"
)

View File

@ -1,6 +1,7 @@
"""
This module contains the configuration class
"""
import logging
import warnings
from copy import deepcopy
@ -56,7 +57,7 @@ class Configuration:
:return: configuration dictionary
"""
# Keep this method as staticmethod, so it can be used from interactive environments
c = Configuration({'config': files}, RunMode.OTHER)
c = Configuration({"config": files}, RunMode.OTHER)
return c.get_config()
def load_config(self) -> Dict[str, Any]:
@ -69,19 +70,20 @@ class Configuration:
# Load environment variables
from freqtrade.commands.arguments import NO_CONF_ALLOWED
if self.args.get('command') not in NO_CONF_ALLOWED:
if self.args.get("command") not in NO_CONF_ALLOWED:
env_data = enironment_vars_to_dict()
config = deep_merge_dicts(env_data, config)
# Normalize config
if 'internals' not in config:
config['internals'] = {}
if "internals" not in config:
config["internals"] = {}
if 'pairlists' not in config:
config['pairlists'] = []
if "pairlists" not in config:
config["pairlists"] = []
# Keep a copy of the original configuration file
config['original_config'] = deepcopy(config)
config["original_config"] = deepcopy(config)
self._process_logging_options(config)
@ -105,7 +107,7 @@ class Configuration:
from freqtrade.exchange.check_exchange import check_exchange
# Check if the exchange set by the user is supported
check_exchange(config, config.get('experimental', {}).get('block_bad_exchanges', True))
check_exchange(config, config.get("experimental", {}).get("block_bad_exchanges", True))
self._resolve_pairs_list(config)
@ -119,52 +121,56 @@ class Configuration:
the -v/--verbose, --logfile options
"""
# Log level
config.update({'verbosity': self.args.get('verbosity', 0)})
config.update({"verbosity": self.args.get("verbosity", 0)})
if 'logfile' in self.args and self.args['logfile']:
config.update({'logfile': self.args['logfile']})
if "logfile" in self.args and self.args["logfile"]:
config.update({"logfile": self.args["logfile"]})
setup_logging(config)
def _process_trading_options(self, config: Config) -> None:
if config['runmode'] not in TRADE_MODES:
if config["runmode"] not in TRADE_MODES:
return
if config.get('dry_run', False):
logger.info('Dry run is enabled')
if config.get('db_url') in [None, constants.DEFAULT_DB_PROD_URL]:
if config.get("dry_run", False):
logger.info("Dry run is enabled")
if config.get("db_url") in [None, constants.DEFAULT_DB_PROD_URL]:
# Default to in-memory db for dry_run if not specified
config['db_url'] = constants.DEFAULT_DB_DRYRUN_URL
config["db_url"] = constants.DEFAULT_DB_DRYRUN_URL
else:
if not config.get('db_url'):
config['db_url'] = constants.DEFAULT_DB_PROD_URL
logger.info('Dry run is disabled')
if not config.get("db_url"):
config["db_url"] = constants.DEFAULT_DB_PROD_URL
logger.info("Dry run is disabled")
logger.info(f'Using DB: "{parse_db_uri_for_logging(config["db_url"])}"')
def _process_common_options(self, config: Config) -> None:
# Set strategy if not specified in config and or if it's non default
if self.args.get('strategy') or not config.get('strategy'):
config.update({'strategy': self.args.get('strategy')})
if self.args.get("strategy") or not config.get("strategy"):
config.update({"strategy": self.args.get("strategy")})
self._args_to_config(config, argname='strategy_path',
logstring='Using additional Strategy lookup path: {}')
self._args_to_config(
config, argname="strategy_path", logstring="Using additional Strategy lookup path: {}"
)
if ('db_url' in self.args and self.args['db_url'] and
self.args['db_url'] != constants.DEFAULT_DB_PROD_URL):
config.update({'db_url': self.args['db_url']})
logger.info('Parameter --db-url detected ...')
if (
"db_url" in self.args
and self.args["db_url"]
and self.args["db_url"] != constants.DEFAULT_DB_PROD_URL
):
config.update({"db_url": self.args["db_url"]})
logger.info("Parameter --db-url detected ...")
self._args_to_config(config, argname='db_url_from',
logstring='Parameter --db-url-from detected ...')
self._args_to_config(
config, argname="db_url_from", logstring="Parameter --db-url-from detected ..."
)
if config.get('force_entry_enable', False):
logger.warning('`force_entry_enable` RPC message enabled.')
if config.get("force_entry_enable", False):
logger.warning("`force_entry_enable` RPC message enabled.")
# Support for sd_notify
if 'sd_notify' in self.args and self.args['sd_notify']:
config['internals'].update({'sd_notify': True})
if "sd_notify" in self.args and self.args["sd_notify"]:
config["internals"].update({"sd_notify": True})
def _process_datadir_options(self, config: Config) -> None:
"""
@ -172,245 +178,274 @@ class Configuration:
--user-data, --datadir
"""
# Check exchange parameter here - otherwise `datadir` might be wrong.
if 'exchange' in self.args and self.args['exchange']:
config['exchange']['name'] = self.args['exchange']
if "exchange" in self.args and self.args["exchange"]:
config["exchange"]["name"] = self.args["exchange"]
logger.info(f"Using exchange {config['exchange']['name']}")
if 'pair_whitelist' not in config['exchange']:
config['exchange']['pair_whitelist'] = []
if "pair_whitelist" not in config["exchange"]:
config["exchange"]["pair_whitelist"] = []
if 'user_data_dir' in self.args and self.args['user_data_dir']:
config.update({'user_data_dir': self.args['user_data_dir']})
elif 'user_data_dir' not in config:
if "user_data_dir" in self.args and self.args["user_data_dir"]:
config.update({"user_data_dir": self.args["user_data_dir"]})
elif "user_data_dir" not in config:
# Default to cwd/user_data (legacy option ...)
config.update({'user_data_dir': str(Path.cwd() / 'user_data')})
config.update({"user_data_dir": str(Path.cwd() / "user_data")})
# reset to user_data_dir so this contains the absolute path.
config['user_data_dir'] = create_userdata_dir(config['user_data_dir'], create_dir=False)
logger.info('Using user-data directory: %s ...', config['user_data_dir'])
config["user_data_dir"] = create_userdata_dir(config["user_data_dir"], create_dir=False)
logger.info("Using user-data directory: %s ...", config["user_data_dir"])
config.update({'datadir': create_datadir(config, self.args.get('datadir'))})
logger.info('Using data directory: %s ...', config.get('datadir'))
config.update({"datadir": create_datadir(config, self.args.get("datadir"))})
logger.info("Using data directory: %s ...", config.get("datadir"))
if self.args.get('exportfilename'):
self._args_to_config(config, argname='exportfilename',
logstring='Storing backtest results to {} ...')
config['exportfilename'] = Path(config['exportfilename'])
if self.args.get("exportfilename"):
self._args_to_config(
config, argname="exportfilename", logstring="Storing backtest results to {} ..."
)
config["exportfilename"] = Path(config["exportfilename"])
else:
config['exportfilename'] = (config['user_data_dir']
/ 'backtest_results')
config["exportfilename"] = config["user_data_dir"] / "backtest_results"
if self.args.get('show_sensitive'):
if self.args.get("show_sensitive"):
logger.warning(
"Sensitive information will be shown in the upcoming output. "
"Please make sure to never share this output without redacting "
"the information yourself.")
"the information yourself."
)
def _process_optimize_options(self, config: Config) -> None:
# This will override the strategy configuration
self._args_to_config(config, argname='timeframe',
logstring='Parameter -i/--timeframe detected ... '
'Using timeframe: {} ...')
self._args_to_config(config, argname='position_stacking',
logstring='Parameter --enable-position-stacking detected ...')
self._args_to_config(
config,
argname="timeframe",
logstring="Parameter -i/--timeframe detected ... " "Using timeframe: {} ...",
)
self._args_to_config(
config, argname='enable_protections',
logstring='Parameter --enable-protections detected, enabling Protections. ...')
config,
argname="position_stacking",
logstring="Parameter --enable-position-stacking detected ...",
)
if 'use_max_market_positions' in self.args and not self.args["use_max_market_positions"]:
config.update({'use_max_market_positions': False})
logger.info('Parameter --disable-max-market-positions detected ...')
logger.info('max_open_trades set to unlimited ...')
elif 'max_open_trades' in self.args and self.args['max_open_trades']:
config.update({'max_open_trades': self.args['max_open_trades']})
logger.info('Parameter --max-open-trades detected, '
'overriding max_open_trades to: %s ...', config.get('max_open_trades'))
elif config['runmode'] in NON_UTIL_MODES:
logger.info('Using max_open_trades: %s ...', config.get('max_open_trades'))
self._args_to_config(
config,
argname="enable_protections",
logstring="Parameter --enable-protections detected, enabling Protections. ...",
)
if "use_max_market_positions" in self.args and not self.args["use_max_market_positions"]:
config.update({"use_max_market_positions": False})
logger.info("Parameter --disable-max-market-positions detected ...")
logger.info("max_open_trades set to unlimited ...")
elif "max_open_trades" in self.args and self.args["max_open_trades"]:
config.update({"max_open_trades": self.args["max_open_trades"]})
logger.info(
"Parameter --max-open-trades detected, " "overriding max_open_trades to: %s ...",
config.get("max_open_trades"),
)
elif config["runmode"] in NON_UTIL_MODES:
logger.info("Using max_open_trades: %s ...", config.get("max_open_trades"))
# Setting max_open_trades to infinite if -1
if config.get('max_open_trades') == -1:
config['max_open_trades'] = float('inf')
if config.get("max_open_trades") == -1:
config["max_open_trades"] = float("inf")
if self.args.get('stake_amount'):
if self.args.get("stake_amount"):
# Convert explicitly to float to support CLI argument for both unlimited and value
try:
self.args['stake_amount'] = float(self.args['stake_amount'])
self.args["stake_amount"] = float(self.args["stake_amount"])
except ValueError:
pass
configurations = [
('timeframe_detail',
'Parameter --timeframe-detail detected, using {} for intra-candle backtesting ...'),
('backtest_show_pair_list', 'Parameter --show-pair-list detected.'),
('stake_amount',
'Parameter --stake-amount detected, overriding stake_amount to: {} ...'),
('dry_run_wallet',
'Parameter --dry-run-wallet detected, overriding dry_run_wallet to: {} ...'),
('fee', 'Parameter --fee detected, setting fee to: {} ...'),
('timerange', 'Parameter --timerange detected: {} ...'),
]
(
"timeframe_detail",
"Parameter --timeframe-detail detected, using {} for intra-candle backtesting ...",
),
("backtest_show_pair_list", "Parameter --show-pair-list detected."),
(
"stake_amount",
"Parameter --stake-amount detected, overriding stake_amount to: {} ...",
),
(
"dry_run_wallet",
"Parameter --dry-run-wallet detected, overriding dry_run_wallet to: {} ...",
),
("fee", "Parameter --fee detected, setting fee to: {} ..."),
("timerange", "Parameter --timerange detected: {} ..."),
]
self._args_to_config_loop(config, configurations)
self._process_datadir_options(config)
self._args_to_config(config, argname='strategy_list',
logstring='Using strategy list of {} strategies', logfun=len)
self._args_to_config(
config,
argname="strategy_list",
logstring="Using strategy list of {} strategies",
logfun=len,
)
configurations = [
('recursive_strategy_search',
'Recursively searching for a strategy in the strategies folder.'),
('timeframe', 'Overriding timeframe with Command line argument'),
('export', 'Parameter --export detected: {} ...'),
('backtest_breakdown', 'Parameter --breakdown detected ...'),
('backtest_cache', 'Parameter --cache={} detected ...'),
('disableparamexport', 'Parameter --disableparamexport detected: {} ...'),
('freqai_backtest_live_models',
'Parameter --freqai-backtest-live-models detected ...'),
(
"recursive_strategy_search",
"Recursively searching for a strategy in the strategies folder.",
),
("timeframe", "Overriding timeframe with Command line argument"),
("export", "Parameter --export detected: {} ..."),
("backtest_breakdown", "Parameter --breakdown detected ..."),
("backtest_cache", "Parameter --cache={} detected ..."),
("disableparamexport", "Parameter --disableparamexport detected: {} ..."),
("freqai_backtest_live_models", "Parameter --freqai-backtest-live-models detected ..."),
]
self._args_to_config_loop(config, configurations)
# Edge section:
if 'stoploss_range' in self.args and self.args["stoploss_range"]:
if "stoploss_range" in self.args and self.args["stoploss_range"]:
txt_range = eval(self.args["stoploss_range"])
config['edge'].update({'stoploss_range_min': txt_range[0]})
config['edge'].update({'stoploss_range_max': txt_range[1]})
config['edge'].update({'stoploss_range_step': txt_range[2]})
logger.info('Parameter --stoplosses detected: %s ...', self.args["stoploss_range"])
config["edge"].update({"stoploss_range_min": txt_range[0]})
config["edge"].update({"stoploss_range_max": txt_range[1]})
config["edge"].update({"stoploss_range_step": txt_range[2]})
logger.info("Parameter --stoplosses detected: %s ...", self.args["stoploss_range"])
# Hyperopt section
configurations = [
('hyperopt', 'Using Hyperopt class name: {}'),
('hyperopt_path', 'Using additional Hyperopt lookup path: {}'),
('hyperoptexportfilename', 'Using hyperopt file: {}'),
('lookahead_analysis_exportfilename', 'Saving lookahead analysis results into {} ...'),
('epochs', 'Parameter --epochs detected ... Will run Hyperopt with for {} epochs ...'),
('spaces', 'Parameter -s/--spaces detected: {}'),
('analyze_per_epoch', 'Parameter --analyze-per-epoch detected.'),
('print_all', 'Parameter --print-all detected ...'),
("hyperopt", "Using Hyperopt class name: {}"),
("hyperopt_path", "Using additional Hyperopt lookup path: {}"),
("hyperoptexportfilename", "Using hyperopt file: {}"),
("lookahead_analysis_exportfilename", "Saving lookahead analysis results into {} ..."),
("epochs", "Parameter --epochs detected ... Will run Hyperopt with for {} epochs ..."),
("spaces", "Parameter -s/--spaces detected: {}"),
("analyze_per_epoch", "Parameter --analyze-per-epoch detected."),
("print_all", "Parameter --print-all detected ..."),
]
self._args_to_config_loop(config, configurations)
if 'print_colorized' in self.args and not self.args["print_colorized"]:
logger.info('Parameter --no-color detected ...')
config.update({'print_colorized': False})
if "print_colorized" in self.args and not self.args["print_colorized"]:
logger.info("Parameter --no-color detected ...")
config.update({"print_colorized": False})
else:
config.update({'print_colorized': True})
config.update({"print_colorized": True})
configurations = [
('print_json', 'Parameter --print-json detected ...'),
('export_csv', 'Parameter --export-csv detected: {}'),
('hyperopt_jobs', 'Parameter -j/--job-workers detected: {}'),
('hyperopt_random_state', 'Parameter --random-state detected: {}'),
('hyperopt_min_trades', 'Parameter --min-trades detected: {}'),
('hyperopt_loss', 'Using Hyperopt loss class name: {}'),
('hyperopt_show_index', 'Parameter -n/--index detected: {}'),
('hyperopt_list_best', 'Parameter --best detected: {}'),
('hyperopt_list_profitable', 'Parameter --profitable detected: {}'),
('hyperopt_list_min_trades', 'Parameter --min-trades detected: {}'),
('hyperopt_list_max_trades', 'Parameter --max-trades detected: {}'),
('hyperopt_list_min_avg_time', 'Parameter --min-avg-time detected: {}'),
('hyperopt_list_max_avg_time', 'Parameter --max-avg-time detected: {}'),
('hyperopt_list_min_avg_profit', 'Parameter --min-avg-profit detected: {}'),
('hyperopt_list_max_avg_profit', 'Parameter --max-avg-profit detected: {}'),
('hyperopt_list_min_total_profit', 'Parameter --min-total-profit detected: {}'),
('hyperopt_list_max_total_profit', 'Parameter --max-total-profit detected: {}'),
('hyperopt_list_min_objective', 'Parameter --min-objective detected: {}'),
('hyperopt_list_max_objective', 'Parameter --max-objective detected: {}'),
('hyperopt_list_no_details', 'Parameter --no-details detected: {}'),
('hyperopt_show_no_header', 'Parameter --no-header detected: {}'),
('hyperopt_ignore_missing_space', 'Paramter --ignore-missing-space detected: {}'),
("print_json", "Parameter --print-json detected ..."),
("export_csv", "Parameter --export-csv detected: {}"),
("hyperopt_jobs", "Parameter -j/--job-workers detected: {}"),
("hyperopt_random_state", "Parameter --random-state detected: {}"),
("hyperopt_min_trades", "Parameter --min-trades detected: {}"),
("hyperopt_loss", "Using Hyperopt loss class name: {}"),
("hyperopt_show_index", "Parameter -n/--index detected: {}"),
("hyperopt_list_best", "Parameter --best detected: {}"),
("hyperopt_list_profitable", "Parameter --profitable detected: {}"),
("hyperopt_list_min_trades", "Parameter --min-trades detected: {}"),
("hyperopt_list_max_trades", "Parameter --max-trades detected: {}"),
("hyperopt_list_min_avg_time", "Parameter --min-avg-time detected: {}"),
("hyperopt_list_max_avg_time", "Parameter --max-avg-time detected: {}"),
("hyperopt_list_min_avg_profit", "Parameter --min-avg-profit detected: {}"),
("hyperopt_list_max_avg_profit", "Parameter --max-avg-profit detected: {}"),
("hyperopt_list_min_total_profit", "Parameter --min-total-profit detected: {}"),
("hyperopt_list_max_total_profit", "Parameter --max-total-profit detected: {}"),
("hyperopt_list_min_objective", "Parameter --min-objective detected: {}"),
("hyperopt_list_max_objective", "Parameter --max-objective detected: {}"),
("hyperopt_list_no_details", "Parameter --no-details detected: {}"),
("hyperopt_show_no_header", "Parameter --no-header detected: {}"),
("hyperopt_ignore_missing_space", "Paramter --ignore-missing-space detected: {}"),
]
self._args_to_config_loop(config, configurations)
def _process_plot_options(self, config: Config) -> None:
configurations = [
('pairs', 'Using pairs {}'),
('indicators1', 'Using indicators1: {}'),
('indicators2', 'Using indicators2: {}'),
('trade_ids', 'Filtering on trade_ids: {}'),
('plot_limit', 'Limiting plot to: {}'),
('plot_auto_open', 'Parameter --auto-open detected.'),
('trade_source', 'Using trades from: {}'),
('prepend_data', 'Prepend detected. Allowing data prepending.'),
('erase', 'Erase detected. Deleting existing data.'),
('no_trades', 'Parameter --no-trades detected.'),
('timeframes', 'timeframes --timeframes: {}'),
('days', 'Detected --days: {}'),
('include_inactive', 'Detected --include-inactive-pairs: {}'),
('download_trades', 'Detected --dl-trades: {}'),
('dataformat_ohlcv', 'Using "{}" to store OHLCV data.'),
('dataformat_trades', 'Using "{}" to store trades data.'),
('show_timerange', 'Detected --show-timerange'),
("pairs", "Using pairs {}"),
("indicators1", "Using indicators1: {}"),
("indicators2", "Using indicators2: {}"),
("trade_ids", "Filtering on trade_ids: {}"),
("plot_limit", "Limiting plot to: {}"),
("plot_auto_open", "Parameter --auto-open detected."),
("trade_source", "Using trades from: {}"),
("prepend_data", "Prepend detected. Allowing data prepending."),
("erase", "Erase detected. Deleting existing data."),
("no_trades", "Parameter --no-trades detected."),
("timeframes", "timeframes --timeframes: {}"),
("days", "Detected --days: {}"),
("include_inactive", "Detected --include-inactive-pairs: {}"),
("download_trades", "Detected --dl-trades: {}"),
("dataformat_ohlcv", 'Using "{}" to store OHLCV data.'),
("dataformat_trades", 'Using "{}" to store trades data.'),
("show_timerange", "Detected --show-timerange"),
]
self._args_to_config_loop(config, configurations)
def _process_data_options(self, config: Config) -> None:
self._args_to_config(config, argname='new_pairs_days',
logstring='Detected --new-pairs-days: {}')
self._args_to_config(config, argname='trading_mode',
logstring='Detected --trading-mode: {}')
config['candle_type_def'] = CandleType.get_default(
config.get('trading_mode', 'spot') or 'spot')
config['trading_mode'] = TradingMode(config.get('trading_mode', 'spot') or 'spot')
self._args_to_config(config, argname='candle_types',
logstring='Detected --candle-types: {}')
self._args_to_config(
config, argname="new_pairs_days", logstring="Detected --new-pairs-days: {}"
)
self._args_to_config(
config, argname="trading_mode", logstring="Detected --trading-mode: {}"
)
config["candle_type_def"] = CandleType.get_default(
config.get("trading_mode", "spot") or "spot"
)
config["trading_mode"] = TradingMode(config.get("trading_mode", "spot") or "spot")
self._args_to_config(
config, argname="candle_types", logstring="Detected --candle-types: {}"
)
def _process_analyze_options(self, config: Config) -> None:
configurations = [
('analysis_groups', 'Analysis reason groups: {}'),
('enter_reason_list', 'Analysis enter tag list: {}'),
('exit_reason_list', 'Analysis exit tag list: {}'),
('indicator_list', 'Analysis indicator list: {}'),
('timerange', 'Filter trades by timerange: {}'),
('analysis_rejected', 'Analyse rejected signals: {}'),
('analysis_to_csv', 'Store analysis tables to CSV: {}'),
('analysis_csv_path', 'Path to store analysis CSVs: {}'),
("analysis_groups", "Analysis reason groups: {}"),
("enter_reason_list", "Analysis enter tag list: {}"),
("exit_reason_list", "Analysis exit tag list: {}"),
("indicator_list", "Analysis indicator list: {}"),
("timerange", "Filter trades by timerange: {}"),
("analysis_rejected", "Analyse rejected signals: {}"),
("analysis_to_csv", "Store analysis tables to CSV: {}"),
("analysis_csv_path", "Path to store analysis CSVs: {}"),
# Lookahead analysis results
('targeted_trade_amount', 'Targeted Trade amount: {}'),
('minimum_trade_amount', 'Minimum Trade amount: {}'),
('lookahead_analysis_exportfilename', 'Path to store lookahead-analysis-results: {}'),
('startup_candle', 'Startup candle to be used on recursive analysis: {}'),
("targeted_trade_amount", "Targeted Trade amount: {}"),
("minimum_trade_amount", "Minimum Trade amount: {}"),
("lookahead_analysis_exportfilename", "Path to store lookahead-analysis-results: {}"),
("startup_candle", "Startup candle to be used on recursive analysis: {}"),
]
self._args_to_config_loop(config, configurations)
def _args_to_config_loop(self, config, configurations: List[Tuple[str, str]]) -> None:
for argname, logstring in configurations:
self._args_to_config(config, argname=argname, logstring=logstring)
def _process_runmode(self, config: Config) -> None:
self._args_to_config(config, argname='dry_run',
logstring='Parameter --dry-run detected, '
'overriding dry_run to: {} ...')
self._args_to_config(
config,
argname="dry_run",
logstring="Parameter --dry-run detected, " "overriding dry_run to: {} ...",
)
if not self.runmode:
# Handle real mode, infer dry/live from config
self.runmode = RunMode.DRY_RUN if config.get('dry_run', True) else RunMode.LIVE
self.runmode = RunMode.DRY_RUN if config.get("dry_run", True) else RunMode.LIVE
logger.info(f"Runmode set to {self.runmode.value}.")
config.update({'runmode': self.runmode})
config.update({"runmode": self.runmode})
def _process_freqai_options(self, config: Config) -> None:
self._args_to_config(
config, argname="freqaimodel", logstring="Using freqaimodel class name: {}"
)
self._args_to_config(config, argname='freqaimodel',
logstring='Using freqaimodel class name: {}')
self._args_to_config(config, argname='freqaimodel_path',
logstring='Using freqaimodel path: {}')
self._args_to_config(
config, argname="freqaimodel_path", logstring="Using freqaimodel path: {}"
)
return
def _args_to_config(self, config: Config, argname: str,
logstring: str, logfun: Optional[Callable] = None,
deprecated_msg: Optional[str] = None) -> None:
def _args_to_config(
self,
config: Config,
argname: str,
logstring: str,
logfun: Optional[Callable] = None,
deprecated_msg: Optional[str] = None,
) -> None:
"""
:param config: Configuration dictionary
:param argname: Argumentname in self.args - will be copied to config dict.
@ -420,9 +455,11 @@ class Configuration:
sample: logfun=len (prints the length of the found
configuration instead of the content)
"""
if (argname in self.args and self.args[argname] is not None
and self.args[argname] is not False):
if (
argname in self.args
and self.args[argname] is not None
and self.args[argname] is not False
):
config.update({argname: self.args[argname]})
if logfun:
logger.info(logstring.format(logfun(config[argname])))
@ -441,7 +478,7 @@ class Configuration:
"""
if "pairs" in config:
config['exchange']['pair_whitelist'] = config['pairs']
config["exchange"]["pair_whitelist"] = config["pairs"]
return
if "pairs_file" in self.args and self.args["pairs_file"]:
@ -451,19 +488,19 @@ class Configuration:
# or if pairs file is specified explicitly
if not pairs_file.exists():
raise OperationalException(f'No pairs file found with path "{pairs_file}".')
config['pairs'] = load_file(pairs_file)
if isinstance(config['pairs'], list):
config['pairs'].sort()
config["pairs"] = load_file(pairs_file)
if isinstance(config["pairs"], list):
config["pairs"].sort()
return
if 'config' in self.args and self.args['config']:
if "config" in self.args and self.args["config"]:
logger.info("Using pairlist from configuration.")
config['pairs'] = config.get('exchange', {}).get('pair_whitelist')
config["pairs"] = config.get("exchange", {}).get("pair_whitelist")
else:
# Fall back to /dl_path/pairs.json
pairs_file = config['datadir'] / 'pairs.json'
pairs_file = config["datadir"] / "pairs.json"
if pairs_file.exists():
logger.info(f'Reading pairs file "{pairs_file}".')
config['pairs'] = load_file(pairs_file)
if 'pairs' in config and isinstance(config['pairs'], list):
config['pairs'].sort()
config["pairs"] = load_file(pairs_file)
if "pairs" in config and isinstance(config["pairs"], list):
config["pairs"].sort()

View File

@ -12,9 +12,13 @@ from freqtrade.exceptions import ConfigurationError, OperationalException
logger = logging.getLogger(__name__)
def check_conflicting_settings(config: Config,
section_old: Optional[str], name_old: str,
section_new: Optional[str], name_new: str) -> None:
def check_conflicting_settings(
config: Config,
section_old: Optional[str],
name_old: str,
section_new: Optional[str],
name_new: str,
) -> None:
section_new_config = config.get(section_new, {}) if section_new else config
section_old_config = config.get(section_old, {}) if section_old else config
if name_new in section_new_config and name_old in section_old_config:
@ -29,9 +33,9 @@ def check_conflicting_settings(config: Config,
)
def process_removed_setting(config: Config,
section1: str, name1: str,
section2: Optional[str], name2: str) -> None:
def process_removed_setting(
config: Config, section1: str, name1: str, section2: Optional[str], name2: str
) -> None:
"""
:param section1: Removed section
:param name1: Removed setting name
@ -48,10 +52,13 @@ def process_removed_setting(config: Config,
)
def process_deprecated_setting(config: Config,
section_old: Optional[str], name_old: str,
section_new: Optional[str], name_new: str
) -> None:
def process_deprecated_setting(
config: Config,
section_old: Optional[str],
name_old: str,
section_new: Optional[str],
name_new: str,
) -> None:
check_conflicting_settings(config, section_old, name_old, section_new, name_new)
section_old_config = config.get(section_old, {}) if section_old else config
@ -71,57 +78,91 @@ def process_deprecated_setting(config: Config,
def process_temporary_deprecated_settings(config: Config) -> None:
# Kept for future deprecated / moved settings
# check_conflicting_settings(config, 'ask_strategy', 'use_sell_signal',
# 'experimental', 'use_sell_signal')
process_deprecated_setting(config, 'ask_strategy', 'ignore_buying_expired_candle_after',
None, 'ignore_buying_expired_candle_after')
process_deprecated_setting(
config,
"ask_strategy",
"ignore_buying_expired_candle_after",
None,
"ignore_buying_expired_candle_after",
)
process_deprecated_setting(config, None, 'forcebuy_enable', None, 'force_entry_enable')
process_deprecated_setting(config, None, "forcebuy_enable", None, "force_entry_enable")
# New settings
if config.get('telegram'):
process_deprecated_setting(config['telegram'], 'notification_settings', 'sell',
'notification_settings', 'exit')
process_deprecated_setting(config['telegram'], 'notification_settings', 'sell_fill',
'notification_settings', 'exit_fill')
process_deprecated_setting(config['telegram'], 'notification_settings', 'sell_cancel',
'notification_settings', 'exit_cancel')
process_deprecated_setting(config['telegram'], 'notification_settings', 'buy',
'notification_settings', 'entry')
process_deprecated_setting(config['telegram'], 'notification_settings', 'buy_fill',
'notification_settings', 'entry_fill')
process_deprecated_setting(config['telegram'], 'notification_settings', 'buy_cancel',
'notification_settings', 'entry_cancel')
if config.get('webhook'):
process_deprecated_setting(config, 'webhook', 'webhookbuy', 'webhook', 'webhookentry')
process_deprecated_setting(config, 'webhook', 'webhookbuycancel',
'webhook', 'webhookentrycancel')
process_deprecated_setting(config, 'webhook', 'webhookbuyfill',
'webhook', 'webhookentryfill')
process_deprecated_setting(config, 'webhook', 'webhooksell', 'webhook', 'webhookexit')
process_deprecated_setting(config, 'webhook', 'webhooksellcancel',
'webhook', 'webhookexitcancel')
process_deprecated_setting(config, 'webhook', 'webhooksellfill',
'webhook', 'webhookexitfill')
if config.get("telegram"):
process_deprecated_setting(
config["telegram"], "notification_settings", "sell", "notification_settings", "exit"
)
process_deprecated_setting(
config["telegram"],
"notification_settings",
"sell_fill",
"notification_settings",
"exit_fill",
)
process_deprecated_setting(
config["telegram"],
"notification_settings",
"sell_cancel",
"notification_settings",
"exit_cancel",
)
process_deprecated_setting(
config["telegram"], "notification_settings", "buy", "notification_settings", "entry"
)
process_deprecated_setting(
config["telegram"],
"notification_settings",
"buy_fill",
"notification_settings",
"entry_fill",
)
process_deprecated_setting(
config["telegram"],
"notification_settings",
"buy_cancel",
"notification_settings",
"entry_cancel",
)
if config.get("webhook"):
process_deprecated_setting(config, "webhook", "webhookbuy", "webhook", "webhookentry")
process_deprecated_setting(
config, "webhook", "webhookbuycancel", "webhook", "webhookentrycancel"
)
process_deprecated_setting(
config, "webhook", "webhookbuyfill", "webhook", "webhookentryfill"
)
process_deprecated_setting(config, "webhook", "webhooksell", "webhook", "webhookexit")
process_deprecated_setting(
config, "webhook", "webhooksellcancel", "webhook", "webhookexitcancel"
)
process_deprecated_setting(
config, "webhook", "webhooksellfill", "webhook", "webhookexitfill"
)
# Legacy way - having them in experimental ...
process_removed_setting(config, 'experimental', 'use_sell_signal', None, 'use_exit_signal')
process_removed_setting(config, 'experimental', 'sell_profit_only', None, 'exit_profit_only')
process_removed_setting(config, 'experimental', 'ignore_roi_if_buy_signal',
None, 'ignore_roi_if_entry_signal')
process_removed_setting(config, "experimental", "use_sell_signal", None, "use_exit_signal")
process_removed_setting(config, "experimental", "sell_profit_only", None, "exit_profit_only")
process_removed_setting(
config, "experimental", "ignore_roi_if_buy_signal", None, "ignore_roi_if_entry_signal"
)
process_removed_setting(config, 'ask_strategy', 'use_sell_signal', None, 'use_exit_signal')
process_removed_setting(config, 'ask_strategy', 'sell_profit_only', None, 'exit_profit_only')
process_removed_setting(config, 'ask_strategy', 'sell_profit_offset',
None, 'exit_profit_offset')
process_removed_setting(config, 'ask_strategy', 'ignore_roi_if_buy_signal',
None, 'ignore_roi_if_entry_signal')
if (config.get('edge', {}).get('enabled', False)
and 'capital_available_percentage' in config.get('edge', {})):
process_removed_setting(config, "ask_strategy", "use_sell_signal", None, "use_exit_signal")
process_removed_setting(config, "ask_strategy", "sell_profit_only", None, "exit_profit_only")
process_removed_setting(
config, "ask_strategy", "sell_profit_offset", None, "exit_profit_offset"
)
process_removed_setting(
config, "ask_strategy", "ignore_roi_if_buy_signal", None, "ignore_roi_if_entry_signal"
)
if config.get("edge", {}).get(
"enabled", False
) and "capital_available_percentage" in config.get("edge", {}):
raise ConfigurationError(
"DEPRECATED: "
"Using 'edge.capital_available_percentage' has been deprecated in favor of "
@ -129,12 +170,11 @@ def process_temporary_deprecated_settings(config: Config) -> None:
"'tradable_balance_ratio' and remove 'capital_available_percentage' "
"from the edge configuration."
)
if 'ticker_interval' in config:
if "ticker_interval" in config:
raise ConfigurationError(
"DEPRECATED: 'ticker_interval' detected. "
"Please use 'timeframe' instead of 'ticker_interval."
)
if 'protections' in config:
if "protections" in config:
logger.warning("DEPRECATED: Setting 'protections' in the configuration is deprecated.")

View File

@ -5,4 +5,4 @@ def running_in_docker() -> bool:
"""
Check if we are running in a docker container
"""
return os.environ.get('FT_APP_ENV') == 'docker'
return os.environ.get("FT_APP_ENV") == "docker"

View File

@ -19,16 +19,15 @@ logger = logging.getLogger(__name__)
def create_datadir(config: Config, datadir: Optional[str] = None) -> Path:
folder = Path(datadir) if datadir else Path(f"{config['user_data_dir']}/data")
if not datadir:
# set datadir
exchange_name = config.get('exchange', {}).get('name', '').lower()
exchange_name = config.get("exchange", {}).get("name", "").lower()
folder = folder.joinpath(exchange_name)
if not folder.is_dir():
folder.mkdir(parents=True)
logger.info(f'Created data directory: {datadir}')
logger.info(f"Created data directory: {datadir}")
return folder
@ -40,8 +39,8 @@ def chown_user_directory(directory: Path) -> None:
if running_in_docker():
try:
import subprocess
subprocess.check_output(
['sudo', 'chown', '-R', 'ftuser:', str(directory.resolve())])
subprocess.check_output(["sudo", "chown", "-R", "ftuser:", str(directory.resolve())])
except Exception:
logger.warning(f"Could not chown {directory}")
@ -56,18 +55,28 @@ def create_userdata_dir(directory: str, create_dir: bool = False) -> Path:
:param create_dir: Create directory if it does not exist.
:return: Path object containing the directory
"""
sub_dirs = ["backtest_results", "data", USERPATH_HYPEROPTS, "hyperopt_results", "logs",
USERPATH_NOTEBOOKS, "plot", USERPATH_STRATEGIES, USERPATH_FREQAIMODELS]
sub_dirs = [
"backtest_results",
"data",
USERPATH_HYPEROPTS,
"hyperopt_results",
"logs",
USERPATH_NOTEBOOKS,
"plot",
USERPATH_STRATEGIES,
USERPATH_FREQAIMODELS,
]
folder = Path(directory)
chown_user_directory(folder)
if not folder.is_dir():
if create_dir:
folder.mkdir(parents=True)
logger.info(f'Created user-data directory: {folder}')
logger.info(f"Created user-data directory: {folder}")
else:
raise OperationalException(
f"Directory `{folder}` does not exist. "
"Please use `freqtrade create-userdir` to create a user directory")
"Please use `freqtrade create-userdir` to create a user directory"
)
# Create required subdirectories
for f in sub_dirs:

View File

@ -16,9 +16,9 @@ def _get_var_typed(val):
try:
return float(val)
except ValueError:
if val.lower() in ('t', 'true'):
if val.lower() in ("t", "true"):
return True
elif val.lower() in ('f', 'false'):
elif val.lower() in ("f", "false"):
return False
# keep as string
return val
@ -32,16 +32,19 @@ def _flat_vars_to_nested_dict(env_dict: Dict[str, Any], prefix: str) -> Dict[str
:param prefix: Prefix to consider (usually FREQTRADE__)
:return: Nested dict based on available and relevant variables.
"""
no_convert = ['CHAT_ID', 'PASSWORD']
no_convert = ["CHAT_ID", "PASSWORD"]
relevant_vars: Dict[str, Any] = {}
for env_var, val in sorted(env_dict.items()):
if env_var.startswith(prefix):
logger.info(f"Loading variable '{env_var}'")
key = env_var.replace(prefix, '')
for k in reversed(key.split('__')):
val = {k.lower(): _get_var_typed(val)
if not isinstance(val, dict) and k not in no_convert else val}
key = env_var.replace(prefix, "")
for k in reversed(key.split("__")):
val = {
k.lower(): _get_var_typed(val)
if not isinstance(val, dict) and k not in no_convert
else val
}
relevant_vars = deep_merge_dicts(val, relevant_vars)
return relevant_vars

View File

@ -1,6 +1,7 @@
"""
This module contain functions to load the configuration file
"""
import logging
import re
import sys
@ -25,25 +26,25 @@ def log_config_error_range(path: str, errmsg: str) -> str:
"""
Parses configuration file and prints range around error
"""
if path != '-':
offsetlist = re.findall(r'(?<=Parse\serror\sat\soffset\s)\d+', errmsg)
if path != "-":
offsetlist = re.findall(r"(?<=Parse\serror\sat\soffset\s)\d+", errmsg)
if offsetlist:
offset = int(offsetlist[0])
text = Path(path).read_text()
# Fetch an offset of 80 characters around the error line
subtext = text[offset - min(80, offset):offset + 80]
segments = subtext.split('\n')
subtext = text[offset - min(80, offset) : offset + 80]
segments = subtext.split("\n")
if len(segments) > 3:
# Remove first and last lines, to avoid odd truncations
return '\n'.join(segments[1:-1])
return "\n".join(segments[1:-1])
else:
return subtext
return ''
return ""
def load_file(path: Path) -> Dict[str, Any]:
try:
with path.open('r') as file:
with path.open("r") as file:
config = rapidjson.load(file, parse_mode=CONFIG_PARSE_MODE)
except FileNotFoundError:
raise OperationalException(f'File "{path}" not found!') from None
@ -58,25 +59,27 @@ def load_config_file(path: str) -> Dict[str, Any]:
"""
try:
# Read config from stdin if requested in the options
with Path(path).open() if path != '-' else sys.stdin as file:
with Path(path).open() if path != "-" else sys.stdin as file:
config = rapidjson.load(file, parse_mode=CONFIG_PARSE_MODE)
except FileNotFoundError:
raise OperationalException(
f'Config file "{path}" not found!'
' Please create a config file or check whether it exists.') from None
" Please create a config file or check whether it exists."
) from None
except rapidjson.JSONDecodeError as e:
err_range = log_config_error_range(path, str(e))
raise ConfigurationError(
f'{e}\n'
f'Please verify the following segment of your configuration:\n{err_range}'
if err_range else 'Please verify your configuration file for syntax errors.'
f"{e}\n" f"Please verify the following segment of your configuration:\n{err_range}"
if err_range
else "Please verify your configuration file for syntax errors."
)
return config
def load_from_files(
files: List[str], base_path: Optional[Path] = None, level: int = 0) -> Dict[str, Any]:
files: List[str], base_path: Optional[Path] = None, level: int = 0
) -> Dict[str, Any]:
"""
Recursively load configuration files if specified.
Sub-files are assumed to be relative to the initial config.
@ -90,8 +93,8 @@ def load_from_files(
files_loaded = []
# We expect here a list of config filenames
for filename in files:
logger.info(f'Using config: {filename} ...')
if filename == '-':
logger.info(f"Using config: {filename} ...")
if filename == "-":
# Immediately load stdin and return
return load_config_file(filename)
file = Path(filename)
@ -100,10 +103,11 @@ def load_from_files(
file = base_path / file
config_tmp = load_config_file(str(file))
if 'add_config_files' in config_tmp:
if "add_config_files" in config_tmp:
config_sub = load_from_files(
config_tmp['add_config_files'], file.resolve().parent, level + 1)
files_loaded.extend(config_sub.get('config_files', []))
config_tmp["add_config_files"], file.resolve().parent, level + 1
)
files_loaded.extend(config_sub.get("config_files", []))
config_tmp = deep_merge_dicts(config_tmp, config_sub)
files_loaded.insert(0, str(file))
@ -111,6 +115,6 @@ def load_from_files(
# Merge config options, overwriting prior values
config = deep_merge_dicts(config_tmp, config)
config['config_files'] = files_loaded
config["config_files"] = files_loaded
return config

View File

@ -1,6 +1,7 @@
"""
This module contains the argument manager class
"""
import logging
import re
from datetime import datetime, timezone
@ -22,9 +23,13 @@ class TimeRange:
if *type is None, don't use corresponding startvalue.
"""
def __init__(self, starttype: Optional[str] = None, stoptype: Optional[str] = None,
startts: int = 0, stopts: int = 0):
def __init__(
self,
starttype: Optional[str] = None,
stoptype: Optional[str] = None,
startts: int = 0,
stopts: int = 0,
):
self.starttype: Optional[str] = starttype
self.stoptype: Optional[str] = stoptype
self.startts: int = startts
@ -48,12 +53,12 @@ class TimeRange:
Returns a string representation of the timerange as used by parse_timerange.
Follows the format yyyymmdd-yyyymmdd - leaving out the parts that are not set.
"""
start = ''
stop = ''
start = ""
stop = ""
if startdt := self.startdt:
start = startdt.strftime('%Y%m%d')
start = startdt.strftime("%Y%m%d")
if stopdt := self.stopdt:
stop = stopdt.strftime('%Y%m%d')
stop = stopdt.strftime("%Y%m%d")
return f"{start}-{stop}"
@property
@ -61,7 +66,7 @@ class TimeRange:
"""
Returns a string representation of the start date
"""
val = 'unbounded'
val = "unbounded"
if (startdt := self.startdt) is not None:
val = startdt.strftime(DATETIME_PRINT_FORMAT)
return val
@ -71,15 +76,19 @@ class TimeRange:
"""
Returns a string representation of the stop date
"""
val = 'unbounded'
val = "unbounded"
if (stopdt := self.stopdt) is not None:
val = stopdt.strftime(DATETIME_PRINT_FORMAT)
return val
def __eq__(self, other):
"""Override the default Equals behavior"""
return (self.starttype == other.starttype and self.stoptype == other.stoptype
and self.startts == other.startts and self.stopts == other.stopts)
return (
self.starttype == other.starttype
and self.stoptype == other.stoptype
and self.startts == other.startts
and self.stopts == other.stopts
)
def subtract_start(self, seconds: int) -> None:
"""
@ -90,8 +99,9 @@ class TimeRange:
if self.startts:
self.startts = self.startts - seconds
def adjust_start_if_necessary(self, timeframe_secs: int, startup_candles: int,
min_date: datetime) -> None:
def adjust_start_if_necessary(
self, timeframe_secs: int, startup_candles: int, min_date: datetime
) -> None:
"""
Adjust startts by <startup_candles> candles.
Applies only if no startup-candles have been available.
@ -101,13 +111,13 @@ class TimeRange:
has to be moved
:return: None (Modifies the object in place)
"""
if (not self.starttype or (startup_candles
and min_date.timestamp() >= self.startts)):
if not self.starttype or (startup_candles and min_date.timestamp() >= self.startts):
# If no startts was defined, or backtest-data starts at the defined backtest-date
logger.warning("Moving start-date by %s candles to account for startup time.",
startup_candles)
logger.warning(
"Moving start-date by %s candles to account for startup time.", startup_candles
)
self.startts = int(min_date.timestamp() + timeframe_secs * startup_candles)
self.starttype = 'date'
self.starttype = "date"
@classmethod
def parse_timerange(cls, text: Optional[str]) -> Self:
@ -118,16 +128,17 @@ class TimeRange:
"""
if not text:
return cls(None, None, 0, 0)
syntax = [(r'^-(\d{8})$', (None, 'date')),
(r'^(\d{8})-$', ('date', None)),
(r'^(\d{8})-(\d{8})$', ('date', 'date')),
(r'^-(\d{10})$', (None, 'date')),
(r'^(\d{10})-$', ('date', None)),
(r'^(\d{10})-(\d{10})$', ('date', 'date')),
(r'^-(\d{13})$', (None, 'date')),
(r'^(\d{13})-$', ('date', None)),
(r'^(\d{13})-(\d{13})$', ('date', 'date')),
]
syntax = [
(r"^-(\d{8})$", (None, "date")),
(r"^(\d{8})-$", ("date", None)),
(r"^(\d{8})-(\d{8})$", ("date", "date")),
(r"^-(\d{10})$", (None, "date")),
(r"^(\d{10})-$", ("date", None)),
(r"^(\d{10})-(\d{10})$", ("date", "date")),
(r"^-(\d{13})$", (None, "date")),
(r"^(\d{13})-$", ("date", None)),
(r"^(\d{13})-(\d{13})$", ("date", "date")),
]
for rex, stype in syntax:
# Apply the regular expression to text
match = re.match(rex, text)
@ -138,9 +149,12 @@ class TimeRange:
stop: int = 0
if stype[0]:
starts = rvals[index]
if stype[0] == 'date' and len(starts) == 8:
start = int(datetime.strptime(starts, '%Y%m%d').replace(
tzinfo=timezone.utc).timestamp())
if stype[0] == "date" and len(starts) == 8:
start = int(
datetime.strptime(starts, "%Y%m%d")
.replace(tzinfo=timezone.utc)
.timestamp()
)
elif len(starts) == 13:
start = int(starts) // 1000
else:
@ -148,15 +162,19 @@ class TimeRange:
index += 1
if stype[1]:
stops = rvals[index]
if stype[1] == 'date' and len(stops) == 8:
stop = int(datetime.strptime(stops, '%Y%m%d').replace(
tzinfo=timezone.utc).timestamp())
if stype[1] == "date" and len(stops) == 8:
stop = int(
datetime.strptime(stops, "%Y%m%d")
.replace(tzinfo=timezone.utc)
.timestamp()
)
elif len(stops) == 13:
stop = int(stops) // 1000
else:
stop = int(stops)
if start > stop > 0:
raise ConfigurationError(
f'Start date is after stop date for timerange "{text}"')
f'Start date is after stop date for timerange "{text}"'
)
return cls(stype[0], stype[1], start, stop)
raise ConfigurationError(f'Incorrect syntax for timerange "{text}"')