import logging from collections import Counter from copy import deepcopy from datetime import datetime from typing import Any, Dict from jsonschema import Draft4Validator, validators from jsonschema.exceptions import ValidationError, best_match from freqtrade.configuration.config_schema import ( CONF_SCHEMA, SCHEMA_BACKTEST_REQUIRED, SCHEMA_BACKTEST_REQUIRED_FINAL, SCHEMA_MINIMAL_REQUIRED, SCHEMA_MINIMAL_WEBSERVER, SCHEMA_TRADE_REQUIRED, ) from freqtrade.configuration.deprecated_settings import process_deprecated_setting from freqtrade.constants import UNLIMITED_STAKE_AMOUNT from freqtrade.enums import RunMode, TradingMode from freqtrade.exceptions import ConfigurationError logger = logging.getLogger(__name__) def _extend_validator(validator_class): """ Extended validator for the Freqtrade configuration JSON Schema. Currently it only handles defaults for subschemas. """ validate_properties = validator_class.VALIDATORS["properties"] def set_defaults(validator, properties, instance, schema): for prop, subschema in properties.items(): if "default" in subschema: instance.setdefault(prop, subschema["default"]) yield from validate_properties(validator, properties, instance, schema) return validators.extend(validator_class, {"properties": set_defaults}) FreqtradeValidator = _extend_validator(Draft4Validator) def validate_config_schema(conf: Dict[str, Any], preliminary: bool = False) -> Dict[str, Any]: """ Validate the configuration follow the Config Schema :param conf: Config in JSON format :return: Returns the config if valid, otherwise throw an exception """ conf_schema = deepcopy(CONF_SCHEMA) if conf.get("runmode", RunMode.OTHER) in (RunMode.DRY_RUN, RunMode.LIVE): conf_schema["required"] = SCHEMA_TRADE_REQUIRED elif conf.get("runmode", RunMode.OTHER) in (RunMode.BACKTEST, RunMode.HYPEROPT): if preliminary: conf_schema["required"] = SCHEMA_BACKTEST_REQUIRED else: conf_schema["required"] = SCHEMA_BACKTEST_REQUIRED_FINAL elif conf.get("runmode", RunMode.OTHER) == RunMode.WEBSERVER: conf_schema["required"] = SCHEMA_MINIMAL_WEBSERVER else: conf_schema["required"] = SCHEMA_MINIMAL_REQUIRED try: FreqtradeValidator(conf_schema).validate(conf) return conf except ValidationError as e: logger.critical(f"Invalid configuration. Reason: {e}") raise ValidationError(best_match(Draft4Validator(conf_schema).iter_errors(conf)).message) def validate_config_consistency(conf: Dict[str, Any], *, preliminary: bool = False) -> None: """ Validate the configuration consistency. Should be ran after loading both configuration and strategy, since strategies can set certain configuration settings too. :param conf: Config in JSON format :return: Returns None if everything is ok, otherwise throw an ConfigurationError """ # validating trailing stoploss _validate_trailing_stoploss(conf) _validate_price_config(conf) _validate_edge(conf) _validate_whitelist(conf) _validate_protections(conf) _validate_unlimited_amount(conf) _validate_ask_orderbook(conf) _validate_freqai_hyperopt(conf) _validate_freqai_backtest(conf) _validate_freqai_include_timeframes(conf, preliminary=preliminary) _validate_consumers(conf) validate_migrated_strategy_settings(conf) _validate_orderflow(conf) # validate configuration before returning logger.info("Validating configuration ...") validate_config_schema(conf, preliminary=preliminary) def _validate_unlimited_amount(conf: Dict[str, Any]) -> None: """ If edge is disabled, either max_open_trades or stake_amount need to be set. :raise: ConfigurationError if config validation failed """ if ( not conf.get("edge", {}).get("enabled") and conf.get("max_open_trades") == float("inf") and conf.get("stake_amount") == UNLIMITED_STAKE_AMOUNT ): raise ConfigurationError("`max_open_trades` and `stake_amount` cannot both be unlimited.") def _validate_price_config(conf: Dict[str, Any]) -> None: """ When using market orders, price sides must be using the "other" side of the price """ # TODO: The below could be an enforced setting when using market orders if conf.get("order_types", {}).get("entry") == "market" and conf.get("entry_pricing", {}).get( "price_side" ) not in ("ask", "other"): raise ConfigurationError('Market entry orders require entry_pricing.price_side = "other".') if conf.get("order_types", {}).get("exit") == "market" and conf.get("exit_pricing", {}).get( "price_side" ) not in ("bid", "other"): raise ConfigurationError('Market exit orders require exit_pricing.price_side = "other".') def _validate_trailing_stoploss(conf: Dict[str, Any]) -> None: if conf.get("stoploss") == 0.0: raise ConfigurationError( "The config stoploss needs to be different from 0 to avoid problems with sell orders." ) # Skip if trailing stoploss is not activated if not conf.get("trailing_stop", False): return tsl_positive = float(conf.get("trailing_stop_positive", 0)) tsl_offset = float(conf.get("trailing_stop_positive_offset", 0)) tsl_only_offset = conf.get("trailing_only_offset_is_reached", False) if tsl_only_offset: if tsl_positive == 0.0: raise ConfigurationError( "The config trailing_only_offset_is_reached needs " "trailing_stop_positive_offset to be more than 0 in your config." ) if tsl_positive > 0 and 0 < tsl_offset <= tsl_positive: raise ConfigurationError( "The config trailing_stop_positive_offset needs " "to be greater than trailing_stop_positive in your config." ) # Fetch again without default if "trailing_stop_positive" in conf and float(conf["trailing_stop_positive"]) == 0.0: raise ConfigurationError( "The config trailing_stop_positive needs to be different from 0 " "to avoid problems with sell orders." ) def _validate_edge(conf: Dict[str, Any]) -> None: """ Edge and Dynamic whitelist should not both be enabled, since edge overrides dynamic whitelists. """ if not conf.get("edge", {}).get("enabled"): return if not conf.get("use_exit_signal", True): raise ConfigurationError( "Edge requires `use_exit_signal` to be True, otherwise no sells will happen." ) def _validate_whitelist(conf: Dict[str, Any]) -> None: """ Dynamic whitelist does not require pair_whitelist to be set - however StaticWhitelist does. """ if conf.get("runmode", RunMode.OTHER) in [ RunMode.OTHER, RunMode.PLOT, RunMode.UTIL_NO_EXCHANGE, RunMode.UTIL_EXCHANGE, ]: return for pl in conf.get("pairlists", [{"method": "StaticPairList"}]): if ( isinstance(pl, dict) and pl.get("method") == "StaticPairList" and not conf.get("exchange", {}).get("pair_whitelist") ): raise ConfigurationError("StaticPairList requires pair_whitelist to be set.") def _validate_protections(conf: Dict[str, Any]) -> None: """ Validate protection configuration validity """ for prot in conf.get("protections", []): parsed_unlock_at = None if (config_unlock_at := prot.get("unlock_at")) is not None: try: parsed_unlock_at = datetime.strptime(config_unlock_at, "%H:%M") except ValueError: raise ConfigurationError(f"Invalid date format for unlock_at: {config_unlock_at}.") if "stop_duration" in prot and "stop_duration_candles" in prot: raise ConfigurationError( "Protections must specify either `stop_duration` or `stop_duration_candles`.\n" f"Please fix the protection {prot.get('method')}." ) if "lookback_period" in prot and "lookback_period_candles" in prot: raise ConfigurationError( "Protections must specify either `lookback_period` or `lookback_period_candles`.\n" f"Please fix the protection {prot.get('method')}." ) if parsed_unlock_at is not None and ( "stop_duration" in prot or "stop_duration_candles" in prot ): raise ConfigurationError( "Protections must specify either `unlock_at`, `stop_duration` or " "`stop_duration_candles`.\n" f"Please fix the protection {prot.get('method')}." ) def _validate_ask_orderbook(conf: Dict[str, Any]) -> None: ask_strategy = conf.get("exit_pricing", {}) ob_min = ask_strategy.get("order_book_min") ob_max = ask_strategy.get("order_book_max") if ob_min is not None and ob_max is not None and ask_strategy.get("use_order_book"): if ob_min != ob_max: raise ConfigurationError( "Using order_book_max != order_book_min in exit_pricing is no longer supported." "Please pick one value and use `order_book_top` in the future." ) else: # Move value to order_book_top ask_strategy["order_book_top"] = ob_min logger.warning( "DEPRECATED: " "Please use `order_book_top` instead of `order_book_min` and `order_book_max` " "for your `exit_pricing` configuration." ) def validate_migrated_strategy_settings(conf: Dict[str, Any]) -> None: _validate_time_in_force(conf) _validate_order_types(conf) _validate_unfilledtimeout(conf) _validate_pricing_rules(conf) _strategy_settings(conf) def _validate_time_in_force(conf: Dict[str, Any]) -> None: time_in_force = conf.get("order_time_in_force", {}) if "buy" in time_in_force or "sell" in time_in_force: if conf.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT: raise ConfigurationError( "Please migrate your time_in_force settings to use 'entry' and 'exit'." ) else: logger.warning( "DEPRECATED: Using 'buy' and 'sell' for time_in_force is deprecated." "Please migrate your time_in_force settings to use 'entry' and 'exit'." ) process_deprecated_setting( conf, "order_time_in_force", "buy", "order_time_in_force", "entry" ) process_deprecated_setting( conf, "order_time_in_force", "sell", "order_time_in_force", "exit" ) def _validate_order_types(conf: Dict[str, Any]) -> None: order_types = conf.get("order_types", {}) old_order_types = [ "buy", "sell", "emergencysell", "forcebuy", "forcesell", "emergencyexit", "forceexit", "forceentry", ] if any(x in order_types for x in old_order_types): if conf.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT: raise ConfigurationError( "Please migrate your order_types settings to use the new wording." ) else: logger.warning( "DEPRECATED: Using 'buy' and 'sell' for order_types is deprecated." "Please migrate your order_types settings to use 'entry' and 'exit' wording." ) for o, n in [ ("buy", "entry"), ("sell", "exit"), ("emergencysell", "emergency_exit"), ("forcesell", "force_exit"), ("forcebuy", "force_entry"), ("emergencyexit", "emergency_exit"), ("forceexit", "force_exit"), ("forceentry", "force_entry"), ]: process_deprecated_setting(conf, "order_types", o, "order_types", n) def _validate_unfilledtimeout(conf: Dict[str, Any]) -> None: unfilledtimeout = conf.get("unfilledtimeout", {}) if any(x in unfilledtimeout for x in ["buy", "sell"]): if conf.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT: raise ConfigurationError( "Please migrate your unfilledtimeout settings to use the new wording." ) else: logger.warning( "DEPRECATED: Using 'buy' and 'sell' for unfilledtimeout is deprecated." "Please migrate your unfilledtimeout settings to use 'entry' and 'exit' wording." ) for o, n in [ ("buy", "entry"), ("sell", "exit"), ]: process_deprecated_setting(conf, "unfilledtimeout", o, "unfilledtimeout", n) def _validate_pricing_rules(conf: Dict[str, Any]) -> None: if conf.get("ask_strategy") or conf.get("bid_strategy"): if conf.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT: raise ConfigurationError("Please migrate your pricing settings to use the new wording.") else: logger.warning( "DEPRECATED: Using 'ask_strategy' and 'bid_strategy' is deprecated." "Please migrate your settings to use 'entry_pricing' and 'exit_pricing'." ) conf["entry_pricing"] = {} for obj in list(conf.get("bid_strategy", {}).keys()): if obj == "ask_last_balance": process_deprecated_setting( conf, "bid_strategy", obj, "entry_pricing", "price_last_balance" ) else: process_deprecated_setting(conf, "bid_strategy", obj, "entry_pricing", obj) del conf["bid_strategy"] conf["exit_pricing"] = {} for obj in list(conf.get("ask_strategy", {}).keys()): if obj == "bid_last_balance": process_deprecated_setting( conf, "ask_strategy", obj, "exit_pricing", "price_last_balance" ) else: process_deprecated_setting(conf, "ask_strategy", obj, "exit_pricing", obj) del conf["ask_strategy"] def _validate_freqai_hyperopt(conf: Dict[str, Any]) -> None: freqai_enabled = conf.get("freqai", {}).get("enabled", False) analyze_per_epoch = conf.get("analyze_per_epoch", False) if analyze_per_epoch and freqai_enabled: raise ConfigurationError( "Using analyze-per-epoch parameter is not supported with a FreqAI strategy." ) def _validate_freqai_include_timeframes(conf: Dict[str, Any], preliminary: bool) -> None: freqai_enabled = conf.get("freqai", {}).get("enabled", False) if freqai_enabled: main_tf = conf.get("timeframe", "5m") freqai_include_timeframes = ( conf.get("freqai", {}).get("feature_parameters", {}).get("include_timeframes", []) ) from freqtrade.exchange import timeframe_to_seconds main_tf_s = timeframe_to_seconds(main_tf) offending_lines = [] for tf in freqai_include_timeframes: tf_s = timeframe_to_seconds(tf) if tf_s < main_tf_s: offending_lines.append(tf) if offending_lines: raise ConfigurationError( f"Main timeframe of {main_tf} must be smaller or equal to FreqAI " f"`include_timeframes`.Offending include-timeframes: {', '.join(offending_lines)}" ) # Ensure that the base timeframe is included in the include_timeframes list if not preliminary and main_tf not in freqai_include_timeframes: feature_parameters = conf.get("freqai", {}).get("feature_parameters", {}) include_timeframes = [main_tf] + freqai_include_timeframes conf.get("freqai", {}).get("feature_parameters", {}).update( {**feature_parameters, "include_timeframes": include_timeframes} ) def _validate_freqai_backtest(conf: Dict[str, Any]) -> None: if conf.get("runmode", RunMode.OTHER) == RunMode.BACKTEST: freqai_enabled = conf.get("freqai", {}).get("enabled", False) timerange = conf.get("timerange") freqai_backtest_live_models = conf.get("freqai_backtest_live_models", False) if freqai_backtest_live_models and freqai_enabled and timerange: raise ConfigurationError( "Using timerange parameter is not supported with " "--freqai-backtest-live-models parameter." ) if freqai_backtest_live_models and not freqai_enabled: raise ConfigurationError( "Using --freqai-backtest-live-models parameter is only " "supported with a FreqAI strategy." ) if freqai_enabled and not freqai_backtest_live_models and not timerange: raise ConfigurationError( "Please pass --timerange if you intend to use FreqAI for backtesting." ) def _validate_consumers(conf: Dict[str, Any]) -> None: emc_conf = conf.get("external_message_consumer", {}) if emc_conf.get("enabled", False): if len(emc_conf.get("producers", [])) < 1: raise ConfigurationError("You must specify at least 1 Producer to connect to.") producer_names = [p["name"] for p in emc_conf.get("producers", [])] duplicates = [item for item, count in Counter(producer_names).items() if count > 1] if duplicates: raise ConfigurationError( f"Producer names must be unique. Duplicate: {', '.join(duplicates)}" ) if conf.get("process_only_new_candles", True): # Warning here or require it? logger.warning( "To receive best performance with external data, " "please set `process_only_new_candles` to False" ) def _validate_orderflow(conf: Dict[str, Any]) -> None: if conf.get("exchange", {}).get("use_public_trades"): if "orderflow" not in conf: raise ConfigurationError( "Orderflow is a required configuration key when using public trades." ) def _strategy_settings(conf: Dict[str, Any]) -> None: process_deprecated_setting(conf, None, "use_sell_signal", None, "use_exit_signal") process_deprecated_setting(conf, None, "sell_profit_only", None, "exit_profit_only") process_deprecated_setting(conf, None, "sell_profit_offset", None, "exit_profit_offset") process_deprecated_setting( conf, None, "ignore_roi_if_buy_signal", None, "ignore_roi_if_entry_signal" )