ruff format freqtrade/resolvers

This commit is contained in:
Matthias 2024-05-12 16:21:12 +02:00
parent 9121d3af65
commit 1a4bff7fb8
8 changed files with 282 additions and 198 deletions

View File

@ -2,6 +2,7 @@
# isort: off
from freqtrade.resolvers.iresolver import IResolver
from freqtrade.resolvers.exchange_resolver import ExchangeResolver
# isort: on
# Don't import HyperoptResolver to avoid loading the whole Optimize tree
# from freqtrade.resolvers.hyperopt_resolver import HyperOptResolver

View File

@ -1,6 +1,7 @@
"""
This module loads custom exchanges
"""
import logging
from inspect import isclass
from typing import Any, Dict, List, Optional
@ -18,17 +19,23 @@ class ExchangeResolver(IResolver):
"""
This class contains all the logic to load a custom exchange class
"""
object_type = Exchange
@staticmethod
def load_exchange(config: Config, *, exchange_config: Optional[ExchangeConfig] = None,
validate: bool = True, load_leverage_tiers: bool = False) -> Exchange:
def load_exchange(
config: Config,
*,
exchange_config: Optional[ExchangeConfig] = None,
validate: bool = True,
load_leverage_tiers: bool = False,
) -> Exchange:
"""
Load the custom class from config parameter
:param exchange_name: name of the Exchange to load
:param config: configuration dictionary
"""
exchange_name: str = config['exchange']['name']
exchange_name: str = config["exchange"]["name"]
# Map exchange name to avoid duplicate classes for identical exchanges
exchange_name = MAP_EXCHANGE_CHILDCLASS.get(exchange_name, exchange_name)
exchange_name = exchange_name.title()
@ -37,16 +44,22 @@ class ExchangeResolver(IResolver):
exchange = ExchangeResolver._load_exchange(
exchange_name,
kwargs={
'config': config,
'validate': validate,
'exchange_config': exchange_config,
'load_leverage_tiers': load_leverage_tiers}
"config": config,
"validate": validate,
"exchange_config": exchange_config,
"load_leverage_tiers": load_leverage_tiers,
},
)
except ImportError:
logger.info(
f"No {exchange_name} specific subclass found. Using the generic class instead.")
f"No {exchange_name} specific subclass found. Using the generic class instead."
)
if not exchange:
exchange = Exchange(config, validate=validate, exchange_config=exchange_config,)
exchange = Exchange(
config,
validate=validate,
exchange_config=exchange_config,
)
return exchange
@staticmethod
@ -75,8 +88,9 @@ class ExchangeResolver(IResolver):
)
@classmethod
def search_all_objects(cls, config: Config, enum_failed: bool,
recursive: bool = False) -> List[Dict[str, Any]]:
def search_all_objects(
cls, config: Config, enum_failed: bool, recursive: bool = False
) -> List[Dict[str, Any]]:
"""
Searches for valid objects
:param config: Config object
@ -89,10 +103,12 @@ class ExchangeResolver(IResolver):
for exchange_name in dir(exchanges):
exchange = getattr(exchanges, exchange_name)
if isclass(exchange) and issubclass(exchange, Exchange):
result.append({
'name': exchange_name,
'class': exchange,
'location': exchange.__module__,
'location_rel: ': exchange.__module__.replace('freqtrade.', ''),
})
result.append(
{
"name": exchange_name,
"class": exchange,
"location": exchange.__module__,
"location_rel: ": exchange.__module__.replace("freqtrade.", ""),
}
)
return result

View File

@ -3,6 +3,7 @@
"""
This module load a custom model for freqai
"""
import logging
from pathlib import Path

View File

@ -3,6 +3,7 @@
"""
This module load custom hyperopt
"""
import logging
from pathlib import Path
@ -19,10 +20,11 @@ class HyperOptLossResolver(IResolver):
"""
This class contains all the logic to load custom hyperopt loss class
"""
object_type = IHyperOptLoss
object_type_str = "HyperoptLoss"
user_subdir = USERPATH_HYPEROPTS
initial_search_path = Path(__file__).parent.parent.joinpath('optimize/hyperopt_loss').resolve()
initial_search_path = Path(__file__).parent.parent.joinpath("optimize/hyperopt_loss").resolve()
@staticmethod
def load_hyperoptloss(config: Config) -> IHyperOptLoss:
@ -31,18 +33,18 @@ class HyperOptLossResolver(IResolver):
:param config: configuration dictionary
"""
hyperoptloss_name = config.get('hyperopt_loss')
hyperoptloss_name = config.get("hyperopt_loss")
if not hyperoptloss_name:
raise OperationalException(
"No Hyperopt loss set. Please use `--hyperopt-loss` to "
"specify the Hyperopt-Loss class to use.\n"
f"Built-in Hyperopt-loss-functions are: {', '.join(HYPEROPT_LOSS_BUILTIN)}"
)
hyperoptloss = HyperOptLossResolver.load_object(hyperoptloss_name,
config, kwargs={},
extra_dir=config.get('hyperopt_path'))
hyperoptloss = HyperOptLossResolver.load_object(
hyperoptloss_name, config, kwargs={}, extra_dir=config.get("hyperopt_path")
)
# Assign timeframe to be used in hyperopt
hyperoptloss.__class__.timeframe = str(config['timeframe'])
hyperoptloss.__class__.timeframe = str(config["timeframe"])
return hyperoptloss

View File

@ -3,6 +3,7 @@
"""
This module load custom objects
"""
import importlib.util
import inspect
import logging
@ -37,6 +38,7 @@ class IResolver:
"""
This class contains all the logic to load custom classes
"""
# Childclasses need to override this
object_type: Type[Any]
object_type_str: str
@ -46,15 +48,18 @@ class IResolver:
extra_path: Optional[str] = None
@classmethod
def build_search_paths(cls, config: Config, user_subdir: Optional[str] = None,
extra_dirs: Optional[List[str]] = None) -> List[Path]:
def build_search_paths(
cls,
config: Config,
user_subdir: Optional[str] = None,
extra_dirs: Optional[List[str]] = None,
) -> List[Path]:
abs_paths: List[Path] = []
if cls.initial_search_path:
abs_paths.append(cls.initial_search_path)
if user_subdir:
abs_paths.insert(0, config['user_data_dir'].joinpath(user_subdir))
abs_paths.insert(0, config["user_data_dir"].joinpath(user_subdir))
# Add extra directory to the top of the search paths
if extra_dirs:
@ -67,8 +72,9 @@ class IResolver:
return abs_paths
@classmethod
def _get_valid_object(cls, module_path: Path, object_name: Optional[str],
enum_failed: bool = False) -> Iterator[Any]:
def _get_valid_object(
cls, module_path: Path, object_name: Optional[str], enum_failed: bool = False
) -> Iterator[Any]:
"""
Generator returning objects with matching object_type and object_name in the path given.
:param module_path: absolute path to the module
@ -90,28 +96,35 @@ class IResolver:
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module) # type: ignore # importlib does not use typehints
except (AttributeError, ModuleNotFoundError, SyntaxError,
ImportError, NameError) as err:
except (
AttributeError,
ModuleNotFoundError,
SyntaxError,
ImportError,
NameError,
) as err:
# Catch errors in case a specific module is not installed
logger.warning(f"Could not import {module_path} due to '{err}'")
if enum_failed:
return iter([None])
valid_objects_gen = (
(obj, inspect.getsource(module)) for
name, obj in inspect.getmembers(
module, inspect.isclass) if ((object_name is None or object_name == name)
and issubclass(obj, cls.object_type)
and obj is not cls.object_type
and obj.__module__ == module_name
)
(obj, inspect.getsource(module))
for name, obj in inspect.getmembers(module, inspect.isclass)
if (
(object_name is None or object_name == name)
and issubclass(obj, cls.object_type)
and obj is not cls.object_type
and obj.__module__ == module_name
)
)
# The __module__ check ensures we only use strategies that are defined in this folder.
return valid_objects_gen
@classmethod
def _search_object(cls, directory: Path, *, object_name: str, add_source: bool = False
) -> Union[Tuple[Any, Path], Tuple[None, None]]:
def _search_object(
cls, directory: Path, *, object_name: str, add_source: bool = False
) -> Union[Tuple[Any, Path], Tuple[None, None]]:
"""
Search for the objectname in the given directory
:param directory: relative or absolute directory path
@ -121,11 +134,11 @@ class IResolver:
logger.debug(f"Searching for {cls.object_type.__name__} {object_name} in '{directory}'")
for entry in directory.iterdir():
# Only consider python files
if entry.suffix != '.py':
logger.debug('Ignoring %s', entry)
if entry.suffix != ".py":
logger.debug("Ignoring %s", entry)
continue
if entry.is_symlink() and not entry.is_file():
logger.debug('Ignoring broken symlink %s', entry)
logger.debug("Ignoring broken symlink %s", entry)
continue
module_path = entry.resolve()
@ -139,21 +152,23 @@ class IResolver:
return (None, None)
@classmethod
def _load_object(cls, paths: List[Path], *, object_name: str, add_source: bool = False,
kwargs: Dict) -> Optional[Any]:
def _load_object(
cls, paths: List[Path], *, object_name: str, add_source: bool = False, kwargs: Dict
) -> Optional[Any]:
"""
Try to load object from path list.
"""
for _path in paths:
try:
(module, module_path) = cls._search_object(directory=_path,
object_name=object_name,
add_source=add_source)
(module, module_path) = cls._search_object(
directory=_path, object_name=object_name, add_source=add_source
)
if module:
logger.info(
f"Using resolved {cls.object_type.__name__.lower()[1:]} {object_name} "
f"from '{module_path}'...")
f"from '{module_path}'..."
)
return module(**kwargs)
except FileNotFoundError:
logger.warning('Path "%s" does not exist.', _path.resolve())
@ -161,8 +176,9 @@ class IResolver:
return None
@classmethod
def load_object(cls, object_name: str, config: Config, *, kwargs: dict,
extra_dir: Optional[str] = None) -> Any:
def load_object(
cls, object_name: str, config: Config, *, kwargs: dict, extra_dir: Optional[str] = None
) -> Any:
"""
Search and loads the specified object as configured in the child class.
:param object_name: name of the module to import
@ -176,12 +192,11 @@ class IResolver:
if extra_dir:
extra_dirs.append(extra_dir)
abs_paths = cls.build_search_paths(config,
user_subdir=cls.user_subdir,
extra_dirs=extra_dirs)
abs_paths = cls.build_search_paths(
config, user_subdir=cls.user_subdir, extra_dirs=extra_dirs
)
found_object = cls._load_object(paths=abs_paths, object_name=object_name,
kwargs=kwargs)
found_object = cls._load_object(paths=abs_paths, object_name=object_name, kwargs=kwargs)
if found_object:
return found_object
raise OperationalException(
@ -190,8 +205,9 @@ class IResolver:
)
@classmethod
def search_all_objects(cls, config: Config, enum_failed: bool,
recursive: bool = False) -> List[Dict[str, Any]]:
def search_all_objects(
cls, config: Config, enum_failed: bool, recursive: bool = False
) -> List[Dict[str, Any]]:
"""
Searches for valid objects
:param config: Config object
@ -209,15 +225,21 @@ class IResolver:
@classmethod
def _build_rel_location(cls, directory: Path, entry: Path) -> str:
builtin = cls.initial_search_path == directory
return f"<builtin>/{entry.relative_to(directory)}" if builtin else str(
entry.relative_to(directory))
return (
f"<builtin>/{entry.relative_to(directory)}"
if builtin
else str(entry.relative_to(directory))
)
@classmethod
def _search_all_objects(
cls, directory: Path, enum_failed: bool, recursive: bool = False,
basedir: Optional[Path] = None) -> List[Dict[str, Any]]:
cls,
directory: Path,
enum_failed: bool,
recursive: bool = False,
basedir: Optional[Path] = None,
) -> List[Dict[str, Any]]:
"""
Searches a directory for valid objects
:param directory: Path to search
@ -233,24 +255,29 @@ class IResolver:
return objects
for entry in directory.iterdir():
if (
recursive and entry.is_dir()
and not entry.name.startswith('__')
and not entry.name.startswith('.')
recursive
and entry.is_dir()
and not entry.name.startswith("__")
and not entry.name.startswith(".")
):
objects.extend(cls._search_all_objects(
entry, enum_failed, recursive, basedir or directory))
objects.extend(
cls._search_all_objects(entry, enum_failed, recursive, basedir or directory)
)
# Only consider python files
if entry.suffix != '.py':
logger.debug('Ignoring %s', entry)
if entry.suffix != ".py":
logger.debug("Ignoring %s", entry)
continue
module_path = entry.resolve()
logger.debug(f"Path {module_path}")
for obj in cls._get_valid_object(module_path, object_name=None,
enum_failed=enum_failed):
for obj in cls._get_valid_object(
module_path, object_name=None, enum_failed=enum_failed
):
objects.append(
{'name': obj[0].__name__ if obj is not None else '',
'class': obj[0] if obj is not None else None,
'location': entry,
'location_rel': cls._build_rel_location(basedir or directory, entry),
})
{
"name": obj[0].__name__ if obj is not None else "",
"class": obj[0] if obj is not None else None,
"location": entry,
"location_rel": cls._build_rel_location(basedir or directory, entry),
}
)
return objects

View File

@ -3,6 +3,7 @@
"""
This module load custom pairlists
"""
import logging
from pathlib import Path
@ -18,14 +19,21 @@ class PairListResolver(IResolver):
"""
This class contains all the logic to load custom PairList class
"""
object_type = IPairList
object_type_str = "Pairlist"
user_subdir = None
initial_search_path = Path(__file__).parent.parent.joinpath('plugins/pairlist').resolve()
initial_search_path = Path(__file__).parent.parent.joinpath("plugins/pairlist").resolve()
@staticmethod
def load_pairlist(pairlist_name: str, exchange, pairlistmanager,
config: Config, pairlistconfig: dict, pairlist_pos: int) -> IPairList:
def load_pairlist(
pairlist_name: str,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: dict,
pairlist_pos: int,
) -> IPairList:
"""
Load the pairlist with pairlist_name
:param pairlist_name: Classname of the pairlist
@ -36,10 +44,14 @@ class PairListResolver(IResolver):
:param pairlist_pos: Position of the pairlist in the list of pairlists
:return: initialized Pairlist class
"""
return PairListResolver.load_object(pairlist_name, config,
kwargs={'exchange': exchange,
'pairlistmanager': pairlistmanager,
'config': config,
'pairlistconfig': pairlistconfig,
'pairlist_pos': pairlist_pos},
)
return PairListResolver.load_object(
pairlist_name,
config,
kwargs={
"exchange": exchange,
"pairlistmanager": pairlistmanager,
"config": config,
"pairlistconfig": pairlistconfig,
"pairlist_pos": pairlist_pos,
},
)

View File

@ -1,6 +1,7 @@
"""
This module load custom pairlists
"""
import logging
from pathlib import Path
from typing import Dict
@ -17,14 +18,16 @@ class ProtectionResolver(IResolver):
"""
This class contains all the logic to load custom PairList class
"""
object_type = IProtection
object_type_str = "Protection"
user_subdir = None
initial_search_path = Path(__file__).parent.parent.joinpath('plugins/protections').resolve()
initial_search_path = Path(__file__).parent.parent.joinpath("plugins/protections").resolve()
@staticmethod
def load_protection(protection_name: str, config: Config,
protection_config: Dict) -> IProtection:
def load_protection(
protection_name: str, config: Config, protection_config: Dict
) -> IProtection:
"""
Load the protection with protection_name
:param protection_name: Classname of the pairlist
@ -32,8 +35,11 @@ class ProtectionResolver(IResolver):
:param protection_config: Configuration dedicated to this pairlist
:return: initialized Protection class
"""
return ProtectionResolver.load_object(protection_name, config,
kwargs={'config': config,
'protection_config': protection_config,
},
)
return ProtectionResolver.load_object(
protection_name,
config,
kwargs={
"config": config,
"protection_config": protection_config,
},
)

View File

@ -3,6 +3,7 @@
"""
This module load custom strategies
"""
import logging
import tempfile
from base64 import urlsafe_b64decode
@ -26,6 +27,7 @@ class StrategyResolver(IResolver):
"""
This class contains the logic to load custom strategy class
"""
object_type = IStrategy
object_type_str = "Strategy"
user_subdir = USERPATH_STRATEGIES
@ -40,47 +42,48 @@ class StrategyResolver(IResolver):
"""
config = config or {}
if not config.get('strategy'):
raise OperationalException("No strategy set. Please use `--strategy` to specify "
"the strategy class to use.")
if not config.get("strategy"):
raise OperationalException(
"No strategy set. Please use `--strategy` to specify " "the strategy class to use."
)
strategy_name = config['strategy']
strategy_name = config["strategy"]
strategy: IStrategy = StrategyResolver._load_strategy(
strategy_name, config=config,
extra_dir=config.get('strategy_path'))
strategy_name, config=config, extra_dir=config.get("strategy_path")
)
strategy.ft_load_params_from_file()
# Set attributes
# Check if we need to override configuration
# (Attribute name, default, subkey)
attributes = [("minimal_roi", {"0": 10.0}),
("timeframe", None),
("stoploss", None),
("trailing_stop", None),
("trailing_stop_positive", None),
("trailing_stop_positive_offset", 0.0),
("trailing_only_offset_is_reached", None),
("use_custom_stoploss", None),
("process_only_new_candles", None),
("order_types", None),
("order_time_in_force", None),
("stake_currency", None),
("stake_amount", None),
("protections", None),
("startup_candle_count", None),
("unfilledtimeout", None),
("use_exit_signal", True),
("exit_profit_only", False),
("ignore_roi_if_entry_signal", False),
("exit_profit_offset", 0.0),
("disable_dataframe_checks", False),
("ignore_buying_expired_candle_after", 0),
("position_adjustment_enable", False),
("max_entry_position_adjustment", -1),
("max_open_trades", -1)
]
attributes = [
("minimal_roi", {"0": 10.0}),
("timeframe", None),
("stoploss", None),
("trailing_stop", None),
("trailing_stop_positive", None),
("trailing_stop_positive_offset", 0.0),
("trailing_only_offset_is_reached", None),
("use_custom_stoploss", None),
("process_only_new_candles", None),
("order_types", None),
("order_time_in_force", None),
("stake_currency", None),
("stake_amount", None),
("protections", None),
("startup_candle_count", None),
("unfilledtimeout", None),
("use_exit_signal", True),
("exit_profit_only", False),
("ignore_roi_if_entry_signal", False),
("exit_profit_offset", 0.0),
("disable_dataframe_checks", False),
("ignore_buying_expired_candle_after", 0),
("position_adjustment_enable", False),
("max_entry_position_adjustment", -1),
("max_open_trades", -1),
]
for attribute, default in attributes:
StrategyResolver._override_attribute_helper(strategy, config,
attribute, default)
StrategyResolver._override_attribute_helper(strategy, config, attribute, default)
# Loop this list again to have output combined
for attribute, _ in attributes:
@ -101,19 +104,23 @@ class StrategyResolver(IResolver):
- Strategy
- default (if not None)
"""
if (attribute in config
and not isinstance(getattr(type(strategy), attribute, None), property)):
if attribute in config and not isinstance(
getattr(type(strategy), attribute, None), property
):
# Ensure Properties are not overwritten
setattr(strategy, attribute, config[attribute])
logger.info("Override strategy '%s' with value in config file: %s.",
attribute, config[attribute])
logger.info(
"Override strategy '%s' with value in config file: %s.",
attribute,
config[attribute],
)
elif hasattr(strategy, attribute):
val = getattr(strategy, attribute)
# None's cannot exist in the config, so do not copy them
if val is not None:
# max_open_trades set to -1 in the strategy will be copied as infinity in the config
if attribute == 'max_open_trades' and val == -1:
config[attribute] = float('inf')
if attribute == "max_open_trades" and val == -1:
config[attribute] = float("inf")
else:
config[attribute] = val
# Explicitly check for None here as other "falsy" values are possible
@ -127,14 +134,17 @@ class StrategyResolver(IResolver):
Normalize attributes to have the correct type.
"""
# Sort and apply type conversions
if hasattr(strategy, 'minimal_roi'):
strategy.minimal_roi = dict(sorted(
{int(key): value for (key, value) in strategy.minimal_roi.items()}.items(),
key=lambda t: t[0]))
if hasattr(strategy, 'stoploss'):
if hasattr(strategy, "minimal_roi"):
strategy.minimal_roi = dict(
sorted(
{int(key): value for (key, value) in strategy.minimal_roi.items()}.items(),
key=lambda t: t[0],
)
)
if hasattr(strategy, "stoploss"):
strategy.stoploss = float(strategy.stoploss)
if hasattr(strategy, 'max_open_trades') and strategy.max_open_trades < 0:
strategy.max_open_trades = float('inf')
if hasattr(strategy, "max_open_trades") and strategy.max_open_trades < 0:
strategy.max_open_trades = float("inf")
return strategy
@staticmethod
@ -143,92 +153,102 @@ class StrategyResolver(IResolver):
validate_migrated_strategy_settings(strategy.config)
if not all(k in strategy.order_types for k in REQUIRED_ORDERTYPES):
raise ImportError(f"Impossible to load Strategy '{strategy.__class__.__name__}'. "
f"Order-types mapping is incomplete.")
raise ImportError(
f"Impossible to load Strategy '{strategy.__class__.__name__}'. "
f"Order-types mapping is incomplete."
)
if not all(k in strategy.order_time_in_force for k in REQUIRED_ORDERTIF):
raise ImportError(f"Impossible to load Strategy '{strategy.__class__.__name__}'. "
f"Order-time-in-force mapping is incomplete.")
trading_mode = strategy.config.get('trading_mode', TradingMode.SPOT)
raise ImportError(
f"Impossible to load Strategy '{strategy.__class__.__name__}'. "
f"Order-time-in-force mapping is incomplete."
)
trading_mode = strategy.config.get("trading_mode", TradingMode.SPOT)
if (strategy.can_short and trading_mode == TradingMode.SPOT):
if strategy.can_short and trading_mode == TradingMode.SPOT:
raise ImportError(
"Short strategies cannot run in spot markets. Please make sure that this "
"is the correct strategy and that your trading mode configuration is correct. "
"You can run this strategy in spot markets by setting `can_short=False`"
" in your strategy. Please note that short signals will be ignored in that case."
)
)
@staticmethod
def validate_strategy(strategy: IStrategy) -> IStrategy:
if strategy.config.get('trading_mode', TradingMode.SPOT) != TradingMode.SPOT:
if strategy.config.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT:
# Require new method
warn_deprecated_setting(strategy, 'sell_profit_only', 'exit_profit_only', True)
warn_deprecated_setting(strategy, 'sell_profit_offset', 'exit_profit_offset', True)
warn_deprecated_setting(strategy, 'use_sell_signal', 'use_exit_signal', True)
warn_deprecated_setting(strategy, 'ignore_roi_if_buy_signal',
'ignore_roi_if_entry_signal', True)
warn_deprecated_setting(strategy, "sell_profit_only", "exit_profit_only", True)
warn_deprecated_setting(strategy, "sell_profit_offset", "exit_profit_offset", True)
warn_deprecated_setting(strategy, "use_sell_signal", "use_exit_signal", True)
warn_deprecated_setting(
strategy, "ignore_roi_if_buy_signal", "ignore_roi_if_entry_signal", True
)
if not check_override(strategy, IStrategy, 'populate_entry_trend'):
if not check_override(strategy, IStrategy, "populate_entry_trend"):
raise OperationalException("`populate_entry_trend` must be implemented.")
if not check_override(strategy, IStrategy, 'populate_exit_trend'):
if not check_override(strategy, IStrategy, "populate_exit_trend"):
raise OperationalException("`populate_exit_trend` must be implemented.")
if check_override(strategy, IStrategy, 'check_buy_timeout'):
raise OperationalException("Please migrate your implementation "
"of `check_buy_timeout` to `check_entry_timeout`.")
if check_override(strategy, IStrategy, 'check_sell_timeout'):
raise OperationalException("Please migrate your implementation "
"of `check_sell_timeout` to `check_exit_timeout`.")
if check_override(strategy, IStrategy, 'custom_sell'):
if check_override(strategy, IStrategy, "check_buy_timeout"):
raise OperationalException(
"Please migrate your implementation of `custom_sell` to `custom_exit`.")
"Please migrate your implementation "
"of `check_buy_timeout` to `check_entry_timeout`."
)
if check_override(strategy, IStrategy, "check_sell_timeout"):
raise OperationalException(
"Please migrate your implementation "
"of `check_sell_timeout` to `check_exit_timeout`."
)
if check_override(strategy, IStrategy, "custom_sell"):
raise OperationalException(
"Please migrate your implementation of `custom_sell` to `custom_exit`."
)
else:
# TODO: Implementing one of the following methods should show a deprecation warning
# buy_trend and sell_trend, custom_sell
warn_deprecated_setting(strategy, 'sell_profit_only', 'exit_profit_only')
warn_deprecated_setting(strategy, 'sell_profit_offset', 'exit_profit_offset')
warn_deprecated_setting(strategy, 'use_sell_signal', 'use_exit_signal')
warn_deprecated_setting(strategy, 'ignore_roi_if_buy_signal',
'ignore_roi_if_entry_signal')
warn_deprecated_setting(strategy, "sell_profit_only", "exit_profit_only")
warn_deprecated_setting(strategy, "sell_profit_offset", "exit_profit_offset")
warn_deprecated_setting(strategy, "use_sell_signal", "use_exit_signal")
warn_deprecated_setting(
strategy, "ignore_roi_if_buy_signal", "ignore_roi_if_entry_signal"
)
if (
not check_override(strategy, IStrategy, 'populate_buy_trend')
and not check_override(strategy, IStrategy, 'populate_entry_trend')
if not check_override(strategy, IStrategy, "populate_buy_trend") and not check_override(
strategy, IStrategy, "populate_entry_trend"
):
raise OperationalException(
"`populate_entry_trend` or `populate_buy_trend` must be implemented.")
if (
not check_override(strategy, IStrategy, 'populate_sell_trend')
and not check_override(strategy, IStrategy, 'populate_exit_trend')
):
"`populate_entry_trend` or `populate_buy_trend` must be implemented."
)
if not check_override(
strategy, IStrategy, "populate_sell_trend"
) and not check_override(strategy, IStrategy, "populate_exit_trend"):
raise OperationalException(
"`populate_exit_trend` or `populate_sell_trend` must be implemented.")
"`populate_exit_trend` or `populate_sell_trend` must be implemented."
)
_populate_fun_len = len(getfullargspec(strategy.populate_indicators).args)
_buy_fun_len = len(getfullargspec(strategy.populate_buy_trend).args)
_sell_fun_len = len(getfullargspec(strategy.populate_sell_trend).args)
if any(x == 2 for x in [
_populate_fun_len,
_buy_fun_len,
_sell_fun_len
]):
if any(x == 2 for x in [_populate_fun_len, _buy_fun_len, _sell_fun_len]):
raise OperationalException(
"Strategy Interface v1 is no longer supported. "
"Please update your strategy to implement "
"`populate_indicators`, `populate_entry_trend` and `populate_exit_trend` "
"with the metadata argument. ")
"with the metadata argument. "
)
has_after_fill = ('after_fill' in getfullargspec(strategy.custom_stoploss).args
and check_override(strategy, IStrategy, 'custom_stoploss'))
has_after_fill = "after_fill" in getfullargspec(
strategy.custom_stoploss
).args and check_override(strategy, IStrategy, "custom_stoploss")
if has_after_fill:
strategy._ft_stop_uses_after_fill = True
return strategy
@staticmethod
def _load_strategy(strategy_name: str,
config: Config, extra_dir: Optional[str] = None) -> IStrategy:
def _load_strategy(
strategy_name: str, config: Config, extra_dir: Optional[str] = None
) -> IStrategy:
"""
Search and loads the specified strategy.
:param strategy_name: name of the module to import
@ -236,7 +256,7 @@ class StrategyResolver(IResolver):
:param extra_dir: additional directory to search for the given strategy
:return: Strategy instance or None
"""
if config.get('recursive_strategy_search', False):
if config.get("recursive_strategy_search", False):
extra_dirs: List[str] = [
path[0] for path in walk(f"{config['user_data_dir']}/{USERPATH_STRATEGIES}")
] # sub-directories
@ -246,9 +266,9 @@ class StrategyResolver(IResolver):
if extra_dir:
extra_dirs.append(extra_dir)
abs_paths = StrategyResolver.build_search_paths(config,
user_subdir=USERPATH_STRATEGIES,
extra_dirs=extra_dirs)
abs_paths = StrategyResolver.build_search_paths(
config, user_subdir=USERPATH_STRATEGIES, extra_dirs=extra_dirs
)
if ":" in strategy_name:
logger.info("loading base64 encoded strategy")
@ -258,7 +278,7 @@ class StrategyResolver(IResolver):
temp = Path(tempfile.mkdtemp("freq", "strategy"))
name = strat[0] + ".py"
temp.joinpath(name).write_text(urlsafe_b64decode(strat[1]).decode('utf-8'))
temp.joinpath(name).write_text(urlsafe_b64decode(strat[1]).decode("utf-8"))
temp.joinpath("__init__.py").touch()
strategy_name = strat[0]
@ -270,11 +290,10 @@ class StrategyResolver(IResolver):
paths=abs_paths,
object_name=strategy_name,
add_source=True,
kwargs={'config': config},
kwargs={"config": config},
)
if strategy:
return StrategyResolver.validate_strategy(strategy)
raise OperationalException(
@ -289,7 +308,7 @@ def warn_deprecated_setting(strategy: IStrategy, old: str, new: str, error=False
if error:
raise OperationalException(errormsg)
logger.warning(errormsg)
setattr(strategy, new, getattr(strategy, f'{old}'))
setattr(strategy, new, getattr(strategy, f"{old}"))
def check_override(object, parentclass, attribute):