freqtrade_origin/freqtrade/persistence/custom_data.py

182 lines
6.5 KiB
Python
Raw Normal View History

import json
import logging
2022-05-31 09:26:07 +00:00
from datetime import datetime
from typing import Any, ClassVar, List, Optional, Sequence
2022-05-31 09:26:07 +00:00
2024-02-07 06:21:16 +00:00
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text, UniqueConstraint, select
2024-02-07 18:28:06 +00:00
from sqlalchemy.orm import Mapped, mapped_column, relationship
2022-05-31 09:26:07 +00:00
from freqtrade.constants import DATETIME_PRINT_FORMAT
2024-02-07 06:21:16 +00:00
from freqtrade.persistence.base import ModelBase, SessionType
from freqtrade.util import dt_now
2022-05-31 09:26:07 +00:00
logger = logging.getLogger(__name__)
2024-02-12 19:25:26 +00:00
class _CustomData(ModelBase):
2022-05-31 09:26:07 +00:00
"""
CustomData database model
2022-05-31 09:26:07 +00:00
Keeps records of metadata as key/value store
2024-04-18 20:51:25 +00:00
for trades or global persistent values
2022-05-31 09:26:07 +00:00
One to many relationship with Trades:
- One trade can have many metadata entries
- One metadata entry can only be associated with one Trade
"""
2024-05-12 14:48:11 +00:00
__tablename__ = "trade_custom_data"
2024-02-12 19:31:44 +00:00
__allow_unmapped__ = True
2024-02-07 06:21:16 +00:00
session: ClassVar[SessionType]
2022-05-31 09:26:07 +00:00
# Uniqueness should be ensured over pair, order_id
# its likely that order_id is unique per Pair on some exchanges.
2024-05-12 14:48:11 +00:00
__table_args__ = (UniqueConstraint("ft_trade_id", "cd_key", name="_trade_id_cd_key"),)
2022-05-31 09:26:07 +00:00
2024-02-07 06:21:16 +00:00
id = mapped_column(Integer, primary_key=True)
2024-05-12 14:48:11 +00:00
ft_trade_id = mapped_column(Integer, ForeignKey("trades.id"), index=True)
2022-05-31 09:26:07 +00:00
trade = relationship("Trade", back_populates="custom_data")
2022-05-31 09:26:07 +00:00
2024-02-07 06:21:16 +00:00
cd_key: Mapped[str] = mapped_column(String(255), nullable=False)
cd_type: Mapped[str] = mapped_column(String(25), nullable=False)
cd_value: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=dt_now)
updated_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
2022-05-31 09:26:07 +00:00
2024-02-12 19:25:26 +00:00
# Empty container value - not persisted, but filled with cd_value on query
value: Any = None
2022-05-31 09:26:07 +00:00
def __repr__(self):
2024-05-12 14:48:11 +00:00
create_time = (
self.created_at.strftime(DATETIME_PRINT_FORMAT) if self.created_at is not None else None
)
update_time = (
self.updated_at.strftime(DATETIME_PRINT_FORMAT) if self.updated_at is not None else None
)
return (
f"CustomData(id={self.id}, key={self.cd_key}, type={self.cd_type}, "
+ f"value={self.cd_value}, trade_id={self.ft_trade_id}, created={create_time}, "
+ f"updated={update_time})"
)
2022-05-31 09:26:07 +00:00
2024-02-07 18:28:06 +00:00
@classmethod
2024-05-12 14:48:11 +00:00
def query_cd(
cls, key: Optional[str] = None, trade_id: Optional[int] = None
) -> Sequence["_CustomData"]:
2022-05-31 09:26:07 +00:00
"""
Get all CustomData, if trade_id is not specified
2022-05-31 09:26:07 +00:00
return will be for generic values not tied to a trade
:param trade_id: id of the Trade
"""
filters = []
2024-02-07 18:28:06 +00:00
if trade_id is not None:
2024-02-12 19:25:26 +00:00
filters.append(_CustomData.ft_trade_id == trade_id)
if key is not None:
2024-02-12 19:25:26 +00:00
filters.append(_CustomData.cd_key.ilike(key))
2022-05-31 09:26:07 +00:00
2024-02-12 19:25:26 +00:00
return _CustomData.session.scalars(select(_CustomData).filter(*filters)).all()
class CustomDataWrapper:
"""
CustomData middleware class
Abstracts the database layer away so it becomes optional - which will be necessary to support
backtesting and hyperopt in the future.
"""
use_db = True
2024-02-12 19:25:26 +00:00
custom_data: List[_CustomData] = []
2024-05-12 14:48:11 +00:00
unserialized_types = ["bool", "float", "int", "str"]
2024-02-12 19:25:26 +00:00
@staticmethod
def _convert_custom_data(data: _CustomData) -> _CustomData:
2024-02-13 06:06:32 +00:00
if data.cd_type in CustomDataWrapper.unserialized_types:
data.value = data.cd_value
2024-05-12 14:48:11 +00:00
if data.cd_type == "bool":
data.value = data.cd_value.lower() == "true"
elif data.cd_type == "int":
2024-02-13 06:06:32 +00:00
data.value = int(data.cd_value)
2024-05-12 14:48:11 +00:00
elif data.cd_type == "float":
2024-02-13 06:06:32 +00:00
data.value = float(data.cd_value)
else:
2024-02-12 19:25:26 +00:00
data.value = json.loads(data.cd_value)
return data
@staticmethod
def reset_custom_data() -> None:
"""
Resets all key-value pairs. Only active for backtesting mode.
"""
if not CustomDataWrapper.use_db:
CustomDataWrapper.custom_data = []
2024-02-12 19:51:44 +00:00
@staticmethod
def delete_custom_data(trade_id: int) -> None:
_CustomData.session.query(_CustomData).filter(_CustomData.ft_trade_id == trade_id).delete()
2024-03-03 11:46:46 +00:00
_CustomData.session.commit()
2024-02-12 19:51:44 +00:00
@staticmethod
2024-02-13 06:06:32 +00:00
def get_custom_data(*, trade_id: int, key: Optional[str] = None) -> List[_CustomData]:
if CustomDataWrapper.use_db:
2024-02-13 06:06:32 +00:00
filters = [
2024-02-12 19:25:26 +00:00
_CustomData.ft_trade_id == trade_id,
2024-02-13 06:06:32 +00:00
]
if key is not None:
filters.append(_CustomData.cd_key.ilike(key))
2024-05-12 14:48:11 +00:00
filtered_custom_data = _CustomData.session.scalars(
select(_CustomData).filter(*filters)
).all()
2024-02-12 19:25:26 +00:00
else:
filtered_custom_data = [
2024-05-12 14:48:11 +00:00
data_entry
for data_entry in CustomDataWrapper.custom_data
if (data_entry.ft_trade_id == trade_id)
]
if key is not None:
filtered_custom_data = [
2024-05-12 14:48:11 +00:00
data_entry
for data_entry in filtered_custom_data
if (data_entry.cd_key.casefold() == key.casefold())
]
2024-02-12 19:25:26 +00:00
return [CustomDataWrapper._convert_custom_data(d) for d in filtered_custom_data]
@staticmethod
2024-02-13 06:06:32 +00:00
def set_custom_data(trade_id: int, key: str, value: Any) -> None:
value_type = type(value).__name__
if value_type not in CustomDataWrapper.unserialized_types:
try:
value_db = json.dumps(value)
except TypeError as e:
logger.warning(f"could not serialize {key} value due to {e}")
2024-02-12 19:25:26 +00:00
return
else:
value_db = str(value)
if trade_id is None:
trade_id = 0
2024-02-13 06:06:32 +00:00
custom_data = CustomDataWrapper.get_custom_data(trade_id=trade_id, key=key)
if custom_data:
data_entry = custom_data[0]
data_entry.cd_value = value_db
data_entry.updated_at = dt_now()
else:
2024-02-12 19:25:26 +00:00
data_entry = _CustomData(
ft_trade_id=trade_id,
cd_key=key,
cd_type=value_type,
cd_value=value_db,
2024-02-12 19:25:26 +00:00
created_at=dt_now(),
)
2024-02-12 19:25:26 +00:00
data_entry.value = value
if CustomDataWrapper.use_db and value_db is not None:
2024-02-12 19:25:26 +00:00
_CustomData.session.add(data_entry)
_CustomData.session.commit()
else:
2024-02-13 06:16:09 +00:00
if not custom_data:
CustomDataWrapper.custom_data.append(data_entry)
2024-02-13 06:16:09 +00:00
# Existing data will have updated interactively.