mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-15 12:43:56 +00:00
118 lines
4.5 KiB
Python
118 lines
4.5 KiB
Python
"""
|
|
Protection manager class
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
from freqtrade.constants import Config, LongShort
|
|
from freqtrade.exceptions import ConfigurationError
|
|
from freqtrade.persistence import PairLocks
|
|
from freqtrade.persistence.models import PairLock
|
|
from freqtrade.plugins.protections import IProtection
|
|
from freqtrade.resolvers import ProtectionResolver
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProtectionManager:
|
|
def __init__(self, config: Config, protections: list) -> None:
|
|
self._config = config
|
|
|
|
self._protection_handlers: list[IProtection] = []
|
|
self.validate_protections(protections)
|
|
for protection_handler_config in protections:
|
|
protection_handler = ProtectionResolver.load_protection(
|
|
protection_handler_config["method"],
|
|
config=config,
|
|
protection_config=protection_handler_config,
|
|
)
|
|
self._protection_handlers.append(protection_handler)
|
|
|
|
if not self._protection_handlers:
|
|
logger.info("No protection Handlers defined.")
|
|
|
|
@property
|
|
def name_list(self) -> list[str]:
|
|
"""
|
|
Get list of loaded Protection Handler names
|
|
"""
|
|
return [p.name for p in self._protection_handlers]
|
|
|
|
def short_desc(self) -> list[dict]:
|
|
"""
|
|
List of short_desc for each Pairlist Handler
|
|
"""
|
|
return [{p.name: p.short_desc()} for p in self._protection_handlers]
|
|
|
|
def global_stop(
|
|
self, now: Optional[datetime] = None, side: LongShort = "long"
|
|
) -> Optional[PairLock]:
|
|
if not now:
|
|
now = datetime.now(timezone.utc)
|
|
result = None
|
|
for protection_handler in self._protection_handlers:
|
|
if protection_handler.has_global_stop:
|
|
lock = protection_handler.global_stop(date_now=now, side=side)
|
|
if lock and lock.until:
|
|
if not PairLocks.is_global_lock(lock.until, side=lock.lock_side):
|
|
result = PairLocks.lock_pair(
|
|
"*", lock.until, lock.reason, now=now, side=lock.lock_side
|
|
)
|
|
return result
|
|
|
|
def stop_per_pair(
|
|
self, pair, now: Optional[datetime] = None, side: LongShort = "long"
|
|
) -> Optional[PairLock]:
|
|
if not now:
|
|
now = datetime.now(timezone.utc)
|
|
result = None
|
|
for protection_handler in self._protection_handlers:
|
|
if protection_handler.has_local_stop:
|
|
lock = protection_handler.stop_per_pair(pair=pair, date_now=now, side=side)
|
|
if lock and lock.until:
|
|
if not PairLocks.is_pair_locked(pair, lock.until, lock.lock_side):
|
|
result = PairLocks.lock_pair(
|
|
pair, lock.until, lock.reason, now=now, side=lock.lock_side
|
|
)
|
|
return result
|
|
|
|
@staticmethod
|
|
def validate_protections(protections: list[dict[str, Any]]) -> None:
|
|
"""
|
|
Validate protection setup validity
|
|
"""
|
|
|
|
for prot in protections:
|
|
parsed_unlock_at = None
|
|
if (config_unlock_at := prot.get("unlock_at")) is not None:
|
|
try:
|
|
parsed_unlock_at = datetime.strptime(config_unlock_at, "%H:%M")
|
|
except ValueError:
|
|
raise ConfigurationError(
|
|
f"Invalid date format for unlock_at: {config_unlock_at}."
|
|
)
|
|
|
|
if "stop_duration" in prot and "stop_duration_candles" in prot:
|
|
raise ConfigurationError(
|
|
"Protections must specify either `stop_duration` or `stop_duration_candles`.\n"
|
|
f"Please fix the protection {prot.get('method')}."
|
|
)
|
|
|
|
if "lookback_period" in prot and "lookback_period_candles" in prot:
|
|
raise ConfigurationError(
|
|
"Protections must specify either `lookback_period` or "
|
|
f"`lookback_period_candles`.\n Please fix the protection {prot.get('method')}."
|
|
)
|
|
|
|
if parsed_unlock_at is not None and (
|
|
"stop_duration" in prot or "stop_duration_candles" in prot
|
|
):
|
|
raise ConfigurationError(
|
|
"Protections must specify either `unlock_at`, `stop_duration` or "
|
|
"`stop_duration_candles`.\n"
|
|
f"Please fix the protection {prot.get('method')}."
|
|
)
|