freqtrade_origin/freqtrade/persistence/pairlock_middleware.py

191 lines
6.3 KiB
Python

import logging
from collections.abc import Sequence
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select
from freqtrade.exchange import timeframe_to_next_date
from freqtrade.persistence.models import PairLock
logger = logging.getLogger(__name__)
class PairLocks:
"""
Pairlocks middleware class
Abstracts the database layer away so it becomes optional - which will be necessary to support
backtesting and hyperopt in the future.
"""
use_db = True
locks: list[PairLock] = []
timeframe: str = ""
@staticmethod
def reset_locks() -> None:
"""
Resets all locks. Only active for backtesting mode.
"""
if not PairLocks.use_db:
PairLocks.locks = []
@staticmethod
def lock_pair(
pair: str,
until: datetime,
reason: Optional[str] = None,
*,
now: Optional[datetime] = None,
side: str = "*",
) -> PairLock:
"""
Create PairLock from now to "until".
Uses database by default, unless PairLocks.use_db is set to False,
in which case a list is maintained.
:param pair: pair to lock. use '*' to lock all pairs
:param until: End time of the lock. Will be rounded up to the next candle.
:param reason: Reason string that will be shown as reason for the lock
:param now: Current timestamp. Used to determine lock start time.
:param side: Side to lock pair, can be 'long', 'short' or '*'
"""
lock = PairLock(
pair=pair,
lock_time=now or datetime.now(timezone.utc),
lock_end_time=timeframe_to_next_date(PairLocks.timeframe, until),
reason=reason,
side=side,
active=True,
)
if PairLocks.use_db:
PairLock.session.add(lock)
PairLock.session.commit()
else:
PairLocks.locks.append(lock)
return lock
@staticmethod
def get_pair_locks(
pair: Optional[str], now: Optional[datetime] = None, side: str = "*"
) -> Sequence[PairLock]:
"""
Get all currently active locks for this pair
:param pair: Pair to check for. Returns all current locks if pair is empty
:param now: Datetime object (generated via datetime.now(timezone.utc)).
defaults to datetime.now(timezone.utc)
"""
if not now:
now = datetime.now(timezone.utc)
if PairLocks.use_db:
return PairLock.query_pair_locks(pair, now, side).all()
else:
locks = [
lock
for lock in PairLocks.locks
if (
lock.lock_end_time >= now
and lock.active is True
and (pair is None or lock.pair == pair)
and (lock.side == "*" or lock.side == side)
)
]
return locks
@staticmethod
def get_pair_longest_lock(
pair: str, now: Optional[datetime] = None, side: str = "*"
) -> Optional[PairLock]:
"""
Get the lock that expires the latest for the pair given.
"""
locks = PairLocks.get_pair_locks(pair, now, side=side)
locks = sorted(locks, key=lambda lock: lock.lock_end_time, reverse=True)
return locks[0] if locks else None
@staticmethod
def unlock_pair(pair: str, now: Optional[datetime] = None, side: str = "*") -> None:
"""
Release all locks for this pair.
:param pair: Pair to unlock
:param now: Datetime object (generated via datetime.now(timezone.utc)).
defaults to datetime.now(timezone.utc)
"""
if not now:
now = datetime.now(timezone.utc)
logger.info(f"Releasing all locks for {pair}.")
locks = PairLocks.get_pair_locks(pair, now, side=side)
for lock in locks:
lock.active = False
if PairLocks.use_db:
PairLock.session.commit()
@staticmethod
def unlock_reason(reason: str, now: Optional[datetime] = None) -> None:
"""
Release all locks for this reason.
:param reason: Which reason to unlock
:param now: Datetime object (generated via datetime.now(timezone.utc)).
defaults to datetime.now(timezone.utc)
"""
if not now:
now = datetime.now(timezone.utc)
if PairLocks.use_db:
# used in live modes
logger.info(f"Releasing all locks with reason '{reason}':")
filters = [
PairLock.lock_end_time > now,
PairLock.active.is_(True),
PairLock.reason == reason,
]
locks = PairLock.session.scalars(select(PairLock).filter(*filters)).all()
for lock in locks:
logger.info(f"Releasing lock for {lock.pair} with reason '{reason}'.")
lock.active = False
PairLock.session.commit()
else:
# used in backtesting mode; don't show log messages for speed
locksb = PairLocks.get_pair_locks(None)
for lock in locksb:
if lock.reason == reason:
lock.active = False
@staticmethod
def is_global_lock(now: Optional[datetime] = None, side: str = "*") -> bool:
"""
:param now: Datetime object (generated via datetime.now(timezone.utc)).
defaults to datetime.now(timezone.utc)
"""
if not now:
now = datetime.now(timezone.utc)
return len(PairLocks.get_pair_locks("*", now, side)) > 0
@staticmethod
def is_pair_locked(pair: str, now: Optional[datetime] = None, side: str = "*") -> bool:
"""
:param pair: Pair to check for
:param now: Datetime object (generated via datetime.now(timezone.utc)).
defaults to datetime.now(timezone.utc)
"""
if not now:
now = datetime.now(timezone.utc)
return len(PairLocks.get_pair_locks(pair, now, side)) > 0 or PairLocks.is_global_lock(
now, side
)
@staticmethod
def get_all_locks() -> Sequence[PairLock]:
"""
Return all locks, also locks with expired end date
"""
if PairLocks.use_db:
return PairLock.get_all_locks().all()
else:
return PairLocks.locks