freqtrade_origin/freqtrade/data/history/datahandlers/idatahandler.py

532 lines
20 KiB
Python
Raw Normal View History

"""
Abstract datahandler interface.
It's subclasses handle and storing data from disk.
"""
2024-05-12 15:41:55 +00:00
2019-12-25 10:09:29 +00:00
import logging
2021-11-28 14:03:55 +00:00
import re
from abc import ABC, abstractmethod
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
2022-08-19 11:44:31 +00:00
from typing import List, Optional, Tuple, Type
2023-08-18 07:12:40 +00:00
from pandas import DataFrame
from freqtrade import misc
from freqtrade.configuration import TimeRange
2023-08-18 07:31:57 +00:00
from freqtrade.constants import DEFAULT_TRADES_COLUMNS, ListPairsWithTimeframes
from freqtrade.data.converter import (
clean_ohlcv_dataframe,
trades_convert_types,
trades_df_remove_duplicates,
trim_dataframe,
)
2022-03-03 06:06:13 +00:00
from freqtrade.enums import CandleType, TradingMode
2019-12-25 10:09:29 +00:00
from freqtrade.exchange import timeframe_to_seconds
2020-09-28 17:39:41 +00:00
2019-12-25 10:09:29 +00:00
logger = logging.getLogger(__name__)
class IDataHandler(ABC):
2024-05-12 15:41:55 +00:00
_OHLCV_REGEX = r"^([a-zA-Z_\d-]+)\-(\d+[a-zA-Z]{1,2})\-?([a-zA-Z_]*)?(?=\.)"
2021-11-28 13:33:46 +00:00
2019-12-25 10:09:29 +00:00
def __init__(self, datadir: Path) -> None:
self._datadir = datadir
@classmethod
def _get_file_extension(cls) -> str:
"""
Get file extension for this particular datahandler
"""
raise NotImplementedError()
@classmethod
2022-03-03 06:06:13 +00:00
def ohlcv_get_available_data(
2024-05-12 15:41:55 +00:00
cls, datadir: Path, trading_mode: TradingMode
) -> ListPairsWithTimeframes:
"""
Returns a list of all pairs with ohlcv data available in this datadir
:param datadir: Directory to search for ohlcv files
:param trading_mode: trading-mode to be used
:return: List of Tuples of (pair, timeframe, CandleType)
"""
if trading_mode == TradingMode.FUTURES:
2024-05-12 15:41:55 +00:00
datadir = datadir.joinpath("futures")
_tmp = [
2024-05-12 15:41:55 +00:00
re.search(cls._OHLCV_REGEX, p.name)
for p in datadir.glob(f"*.{cls._get_file_extension()}")
]
return [
(
cls.rebuild_pair_from_filename(match[1]),
cls.rebuild_timeframe_from_filename(match[2]),
2024-05-12 15:41:55 +00:00
CandleType.from_string(match[3]),
)
for match in _tmp
if match and len(match.groups()) > 1
]
@classmethod
2021-12-07 19:30:58 +00:00
def ohlcv_get_pairs(cls, datadir: Path, timeframe: str, candle_type: CandleType) -> List[str]:
2019-12-28 10:10:31 +00:00
"""
Returns a list of all pairs with ohlcv data available in this datadir
for the specified timeframe
:param datadir: Directory to search for ohlcv files
:param timeframe: Timeframe to search pairs for
2021-12-03 11:23:35 +00:00
:param candle_type: Any of the enum CandleType (must match trading mode!)
2019-12-28 10:10:31 +00:00
:return: List of Pairs
"""
2022-09-18 14:18:27 +00:00
candle = ""
if candle_type != CandleType.SPOT:
2024-05-12 15:41:55 +00:00
datadir = datadir.joinpath("futures")
2022-09-18 14:18:27 +00:00
candle = f"-{candle_type}"
ext = cls._get_file_extension()
2024-05-12 15:41:55 +00:00
_tmp = [
re.search(r"^(\S+)(?=\-" + timeframe + candle + f".{ext})", p.name)
for p in datadir.glob(f"*{timeframe}{candle}.{ext}")
]
2022-09-18 14:18:27 +00:00
# Check if regex found something and only return these results
return [cls.rebuild_pair_from_filename(match[0]) for match in _tmp if match]
2019-12-28 10:10:31 +00:00
@abstractmethod
def ohlcv_store(
2024-05-12 15:41:55 +00:00
self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType
) -> None:
2019-12-28 10:10:31 +00:00
"""
Store ohlcv data.
2019-12-28 10:10:31 +00:00
:param pair: Pair - used to generate filename
2021-06-25 17:13:31 +00:00
:param timeframe: Timeframe - used to generate filename
:param data: Dataframe containing OHLCV data
2021-12-03 11:23:35 +00:00
:param candle_type: Any of the enum CandleType (must match trading mode!)
2019-12-28 10:10:31 +00:00
:return: None
"""
2024-05-12 15:41:55 +00:00
def ohlcv_data_min_max(
self, pair: str, timeframe: str, candle_type: CandleType
) -> Tuple[datetime, datetime, int]:
2022-08-19 11:44:31 +00:00
"""
Returns the min and max timestamp for the given pair and timeframe.
:param pair: Pair to get min/max for
:param timeframe: Timeframe to get min/max for
:param candle_type: Any of the enum CandleType (must match trading mode!)
:return: (min, max, len)
2022-08-19 11:44:31 +00:00
"""
df = self._ohlcv_load(pair, timeframe, None, candle_type)
if df.empty:
return (
datetime.fromtimestamp(0, tz=timezone.utc),
datetime.fromtimestamp(0, tz=timezone.utc),
0,
)
2024-05-12 15:41:55 +00:00
return df.iloc[0]["date"].to_pydatetime(), df.iloc[-1]["date"].to_pydatetime(), len(df)
2022-08-19 11:44:31 +00:00
2019-12-28 10:10:31 +00:00
@abstractmethod
2024-05-12 15:41:55 +00:00
def _ohlcv_load(
self, pair: str, timeframe: str, timerange: Optional[TimeRange], candle_type: CandleType
) -> DataFrame:
2019-12-28 10:10:31 +00:00
"""
Internal method used to load data for one pair from disk.
Implements the loading and conversion to a Pandas dataframe.
2019-12-28 10:10:31 +00:00
Timerange trimming and dataframe validation happens outside of this method.
:param pair: Pair to load data
:param timeframe: Timeframe (e.g. "5m")
2019-12-28 10:10:31 +00:00
:param timerange: Limit data to be loaded to this timerange.
Optionally implemented by subclasses to avoid loading
all data where possible.
2021-12-03 11:23:35 +00:00
:param candle_type: Any of the enum CandleType (must match trading mode!)
2019-12-28 10:10:31 +00:00
:return: DataFrame with ohlcv data, or empty DataFrame
"""
2021-12-08 12:00:11 +00:00
def ohlcv_purge(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
2019-12-28 10:10:31 +00:00
"""
Remove data for this pair
:param pair: Delete data for this pair.
:param timeframe: Timeframe (e.g. "5m")
2021-12-03 11:23:35 +00:00
:param candle_type: Any of the enum CandleType (must match trading mode!)
2019-12-28 10:10:31 +00:00
:return: True when deleted, false if file did not exist.
"""
2022-05-16 17:53:01 +00:00
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
if filename.exists():
filename.unlink()
return True
return False
2019-12-28 10:10:31 +00:00
@abstractmethod
def ohlcv_append(
2024-05-12 15:41:55 +00:00
self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType
) -> None:
2019-12-28 10:10:31 +00:00
"""
Append data to existing data structures
:param pair: Pair
:param timeframe: Timeframe this ohlcv data is for
:param data: Data to append.
2021-12-03 11:23:35 +00:00
:param candle_type: Any of the enum CandleType (must match trading mode!)
2019-12-28 10:10:31 +00:00
"""
@classmethod
2019-12-28 10:10:31 +00:00
def trades_get_pairs(cls, datadir: Path) -> List[str]:
"""
Returns a list of all pairs for which trade data is available in this
:param datadir: Directory to search for ohlcv files
:return: List of Pairs
"""
2022-09-18 14:57:03 +00:00
_ext = cls._get_file_extension()
2024-05-12 15:41:55 +00:00
_tmp = [
re.search(r"^(\S+)(?=\-trades." + _ext + ")", p.name)
for p in datadir.glob(f"*trades.{_ext}")
]
2022-09-18 14:57:03 +00:00
# Check if regex found something and only return these results to avoid exceptions.
return [cls.rebuild_pair_from_filename(match[0]) for match in _tmp if match]
2019-12-28 10:10:31 +00:00
@abstractmethod
def _trades_store(self, pair: str, data: DataFrame, trading_mode: TradingMode) -> None:
2019-12-28 10:10:31 +00:00
"""
Store trades data (list of Dicts) to file
:param pair: Pair - used for filename
:param data: Dataframe containing trades
column sequence as in DEFAULT_TRADES_COLUMNS
:param trading_mode: Trading mode to use (used to determine the filename)
2019-12-28 10:10:31 +00:00
"""
@abstractmethod
def trades_append(self, pair: str, data: DataFrame):
2019-12-28 10:10:31 +00:00
"""
Append data to existing files
:param pair: Pair - used for filename
:param data: Dataframe containing trades
column sequence as in DEFAULT_TRADES_COLUMNS
2019-12-28 10:10:31 +00:00
"""
@abstractmethod
def _trades_load(
self, pair: str, trading_mode: TradingMode, timerange: Optional[TimeRange] = None
) -> DataFrame:
2019-12-28 10:10:31 +00:00
"""
Load a pair from file, either .json.gz or .json
:param pair: Load trades for this pair
:param trading_mode: Trading mode to use (used to determine the filename)
2019-12-28 10:10:31 +00:00
:param timerange: Timerange to load trades for - currently not implemented
:return: Dataframe containing trades
2019-12-28 10:10:31 +00:00
"""
def trades_store(self, pair: str, data: DataFrame, trading_mode: TradingMode) -> None:
2023-08-18 07:31:57 +00:00
"""
Store trades data (list of Dicts) to file
:param pair: Pair - used for filename
:param data: Dataframe containing trades
column sequence as in DEFAULT_TRADES_COLUMNS
:param trading_mode: Trading mode to use (used to determine the filename)
2023-08-18 07:31:57 +00:00
"""
# Filter on expected columns (will remove the actual date column).
self._trades_store(pair, data[DEFAULT_TRADES_COLUMNS], trading_mode)
2023-08-18 07:31:57 +00:00
def trades_purge(self, pair: str, trading_mode: TradingMode) -> bool:
2019-12-28 10:10:31 +00:00
"""
Remove data for this pair
:param pair: Delete data for this pair.
:param trading_mode: Trading mode to use (used to determine the filename)
2019-12-28 10:10:31 +00:00
:return: True when deleted, false if file did not exist.
"""
filename = self._pair_trades_filename(self._datadir, pair, trading_mode)
if filename.exists():
filename.unlink()
return True
return False
2019-12-25 10:09:29 +00:00
def trades_load(
2024-05-12 15:41:55 +00:00
self, pair: str, trading_mode: TradingMode, timerange: Optional[TimeRange] = None
) -> DataFrame:
2020-04-01 05:58:39 +00:00
"""
Load a pair from file, either .json.gz or .json
Removes duplicates in the process.
:param pair: Load trades for this pair
:param trading_mode: Trading mode to use (used to determine the filename)
2020-04-01 05:58:39 +00:00
:param timerange: Timerange to load trades for - currently not implemented
:return: List of trades
"""
trades = trades_df_remove_duplicates(
self._trades_load(pair, trading_mode, timerange=timerange)
)
2023-08-18 05:43:29 +00:00
trades = trades_convert_types(trades)
return trades
@classmethod
def create_dir_if_needed(cls, datadir: Path):
"""
Creates datadir if necessary
should only create directories for "futures" mode at the moment.
"""
if not datadir.parent.is_dir():
datadir.parent.mkdir()
@classmethod
def _pair_data_filename(
cls,
datadir: Path,
pair: str,
timeframe: str,
2022-05-16 17:53:01 +00:00
candle_type: CandleType,
2024-05-12 15:41:55 +00:00
no_timeframe_modify: bool = False,
) -> Path:
pair_s = misc.pair_to_filename(pair)
2021-12-03 11:23:35 +00:00
candle = ""
2022-05-16 17:53:01 +00:00
if not no_timeframe_modify:
timeframe = cls.timeframe_to_file(timeframe)
2021-12-08 13:35:15 +00:00
if candle_type != CandleType.SPOT:
2024-05-12 15:41:55 +00:00
datadir = datadir.joinpath("futures")
2021-12-03 11:23:35 +00:00
candle = f"-{candle_type}"
2024-05-12 15:41:55 +00:00
filename = datadir.joinpath(f"{pair_s}-{timeframe}{candle}.{cls._get_file_extension()}")
return filename
@classmethod
def _pair_trades_filename(cls, datadir: Path, pair: str, trading_mode: TradingMode) -> Path:
pair_s = misc.pair_to_filename(pair)
if trading_mode == TradingMode.FUTURES:
# Futures pair ...
2024-05-12 15:41:55 +00:00
datadir = datadir.joinpath("futures")
2024-05-12 15:41:55 +00:00
filename = datadir.joinpath(f"{pair_s}-trades.{cls._get_file_extension()}")
return filename
2022-05-01 15:00:00 +00:00
@staticmethod
def timeframe_to_file(timeframe: str):
2024-05-12 15:41:55 +00:00
return timeframe.replace("M", "Mo")
2022-05-01 15:00:00 +00:00
@staticmethod
def rebuild_timeframe_from_filename(timeframe: str) -> str:
"""
converts timeframe from disk to file
Replaces mo with M (to avoid problems on case-insensitive filesystems)
"""
2024-05-12 15:41:55 +00:00
return re.sub("1mo", "1M", timeframe, flags=re.IGNORECASE)
2022-05-01 15:00:00 +00:00
2021-11-28 14:03:55 +00:00
@staticmethod
def rebuild_pair_from_filename(pair: str) -> str:
"""
Rebuild pair name from filename
Assumes a asset name of max. 7 length to also support BTC-PERP and BTC-PERP:USD names.
"""
2024-05-12 15:41:55 +00:00
res = re.sub(r"^(([A-Za-z\d]{1,10})|^([A-Za-z\-]{1,6}))(_)", r"\g<1>/", pair, count=1)
res = re.sub("_", ":", res, count=1)
2021-11-28 14:03:55 +00:00
return res
2024-05-12 15:41:55 +00:00
def ohlcv_load(
self,
pair,
timeframe: str,
candle_type: CandleType,
*,
timerange: Optional[TimeRange] = None,
fill_missing: bool = True,
drop_incomplete: bool = False,
startup_candles: int = 0,
warn_no_data: bool = True,
) -> DataFrame:
2019-12-25 10:09:29 +00:00
"""
Load cached candle (OHLCV) data for the given pair.
2019-12-25 10:09:29 +00:00
:param pair: Pair to load data for
:param timeframe: Timeframe (e.g. "5m")
2019-12-25 10:09:29 +00:00
:param timerange: Limit data to be loaded to this timerange
2019-12-25 14:07:49 +00:00
:param fill_missing: Fill missing values with "No action"-candles
2019-12-25 10:09:29 +00:00
:param drop_incomplete: Drop last candle assuming it may be incomplete.
:param startup_candles: Additional candles to load at the start of the period
2019-12-27 05:58:29 +00:00
:param warn_no_data: Log a warning message when no data is found
2021-12-03 11:23:35 +00:00
:param candle_type: Any of the enum CandleType (must match trading mode!)
2019-12-25 10:09:29 +00:00
:return: DataFrame with ohlcv data, or empty DataFrame
"""
# Fix startup period
timerange_startup = deepcopy(timerange)
if startup_candles > 0 and timerange_startup:
timerange_startup.subtract_start(timeframe_to_seconds(timeframe) * startup_candles)
pairdf = self._ohlcv_load(
2024-05-12 15:41:55 +00:00
pair, timeframe, timerange=timerange_startup, candle_type=candle_type
)
if self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data):
return pairdf
else:
2024-05-12 15:41:55 +00:00
enddate = pairdf.iloc[-1]["date"]
if timerange_startup:
2022-01-08 13:38:46 +00:00
self._validate_pairdata(pair, pairdf, timeframe, candle_type, timerange_startup)
pairdf = trim_dataframe(pairdf, timerange_startup)
if self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data, True):
return pairdf
# incomplete candles should only be dropped if we didn't trim the end beforehand.
2024-05-12 15:41:55 +00:00
pairdf = clean_ohlcv_dataframe(
pairdf,
timeframe,
pair=pair,
fill_missing=fill_missing,
drop_incomplete=(drop_incomplete and enddate == pairdf.iloc[-1]["date"]),
)
self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data)
2020-03-12 18:50:46 +00:00
return pairdf
2019-12-25 10:09:29 +00:00
def _check_empty_df(
2024-05-12 15:41:55 +00:00
self,
pairdf: DataFrame,
pair: str,
timeframe: str,
candle_type: CandleType,
warn_no_data: bool,
warn_price: bool = False,
) -> bool:
"""
Warn on empty dataframe
"""
if pairdf.empty:
if warn_no_data:
logger.warning(
f"No history for {pair}, {candle_type}, {timeframe} found. "
"Use `freqtrade download-data` to download the data"
)
return True
elif warn_price:
candle_price_gap = 0
2024-05-12 15:41:55 +00:00
if (
candle_type in (CandleType.SPOT, CandleType.FUTURES)
and not pairdf.empty
and "close" in pairdf.columns
and "open" in pairdf.columns
):
# Detect gaps between prior close and open
2024-05-12 15:41:55 +00:00
gaps = (pairdf["open"] - pairdf["close"].shift(1)) / pairdf["close"].shift(1)
gaps = gaps.dropna()
if len(gaps):
candle_price_gap = max(abs(gaps))
if candle_price_gap > 0.1:
2024-05-12 15:41:55 +00:00
logger.info(
f"Price jump in {pair}, {timeframe}, {candle_type} between two candles "
f"of {candle_price_gap:.2%} detected."
)
return False
2019-12-25 10:09:29 +00:00
2024-05-12 15:41:55 +00:00
def _validate_pairdata(
self,
pair,
pairdata: DataFrame,
timeframe: str,
candle_type: CandleType,
timerange: TimeRange,
):
2019-12-25 10:09:29 +00:00
"""
Validates pairdata for missing data at start end end and logs warnings.
:param pairdata: Dataframe to validate
:param timerange: Timerange specified for start and end dates
"""
2024-05-12 15:41:55 +00:00
if timerange.starttype == "date":
if pairdata.iloc[0]["date"] > timerange.startdt:
logger.warning(
f"{pair}, {candle_type}, {timeframe}, "
f"data starts at {pairdata.iloc[0]['date']:%Y-%m-%d %H:%M:%S}"
)
if timerange.stoptype == "date":
if pairdata.iloc[-1]["date"] < timerange.stopdt:
logger.warning(
f"{pair}, {candle_type}, {timeframe}, "
f"data ends at {pairdata.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}"
)
2023-01-12 19:59:43 +00:00
def rename_futures_data(
2024-05-12 15:41:55 +00:00
self, pair: str, new_pair: str, timeframe: str, candle_type: CandleType
):
2023-01-12 19:59:43 +00:00
"""
Temporary method to migrate data from old naming to new naming (BTC/USDT -> BTC/USDT:USDT)
Only used for binance to support the binance futures naming unification.
"""
file_old = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
file_new = self._pair_data_filename(self._datadir, new_pair, timeframe, candle_type)
# print(file_old, file_new)
if file_new.exists():
logger.warning(f"{file_new} exists already, can't migrate {pair}.")
return
file_old.rename(file_new)
def fix_funding_fee_timeframe(self, ff_timeframe: str):
"""
Temporary method to migrate data from old funding fee timeframe to the correct timeframe
Applies to bybit and okx, where funding-fee and mark candles have different timeframes.
"""
paircombs = self.ohlcv_get_available_data(self._datadir, TradingMode.FUTURES)
funding_rate_combs = [
f for f in paircombs if f[2] == CandleType.FUNDING_RATE and f[1] != ff_timeframe
]
2024-01-04 16:06:15 +00:00
if funding_rate_combs:
logger.warning(
2024-05-12 15:41:55 +00:00
f"Migrating {len(funding_rate_combs)} funding fees to correct timeframe."
)
2024-01-04 16:06:15 +00:00
for pair, timeframe, candletype in funding_rate_combs:
old_name = self._pair_data_filename(self._datadir, pair, timeframe, candletype)
new_name = self._pair_data_filename(self._datadir, pair, ff_timeframe, candletype)
if not Path(old_name).exists():
2024-05-12 15:41:55 +00:00
logger.warning(f"{old_name} does not exist, skipping.")
continue
if Path(new_name).exists():
2024-05-12 15:41:55 +00:00
logger.warning(f"{new_name} already exists, Removing.")
2024-01-04 16:06:15 +00:00
Path(new_name).unlink()
Path(old_name).rename(new_name)
def get_datahandlerclass(datatype: str) -> Type[IDataHandler]:
"""
Get datahandler class.
Could be done using Resolvers, but since this may be called often and resolvers
are rather expensive, doing this directly should improve performance.
:param datatype: datatype to use.
:return: Datahandler class
"""
2024-05-12 15:41:55 +00:00
if datatype == "json":
from .jsondatahandler import JsonDataHandler
2024-05-12 15:41:55 +00:00
return JsonDataHandler
2024-05-12 15:41:55 +00:00
elif datatype == "jsongz":
from .jsondatahandler import JsonGzDataHandler
2024-05-12 15:41:55 +00:00
return JsonGzDataHandler
2024-05-12 15:41:55 +00:00
elif datatype == "hdf5":
2020-07-24 17:23:37 +00:00
from .hdf5datahandler import HDF5DataHandler
2024-05-12 15:41:55 +00:00
2020-07-24 17:23:37 +00:00
return HDF5DataHandler
2024-05-12 15:41:55 +00:00
elif datatype == "feather":
from .featherdatahandler import FeatherDataHandler
2024-05-12 15:41:55 +00:00
return FeatherDataHandler
2024-05-12 15:41:55 +00:00
elif datatype == "parquet":
2022-09-20 13:42:15 +00:00
from .parquetdatahandler import ParquetDataHandler
2024-05-12 15:41:55 +00:00
2022-09-20 13:42:15 +00:00
return ParquetDataHandler
else:
raise ValueError(f"No datahandler for datatype {datatype} available.")
2024-05-12 15:41:55 +00:00
def get_datahandler(
datadir: Path, data_format: Optional[str] = None, data_handler: Optional[IDataHandler] = None
) -> IDataHandler:
"""
:param datadir: Folder to save data
2021-06-25 17:13:31 +00:00
:param data_format: dataformat to use
:param data_handler: returns this datahandler if it exists or initializes a new one
"""
if not data_handler:
2024-05-12 15:41:55 +00:00
HandlerClass = get_datahandlerclass(data_format or "feather")
data_handler = HandlerClass(datadir)
return data_handler