freqtrade_origin/freqtrade/data/history/datahandlers/parquetdatahandler.py

129 lines
4.9 KiB
Python
Raw Permalink Normal View History

2022-09-20 13:42:15 +00:00
import logging
from pandas import DataFrame, read_parquet, to_datetime
from freqtrade.configuration import TimeRange
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, DEFAULT_TRADES_COLUMNS
from freqtrade.enums import CandleType, TradingMode
2022-09-20 13:42:15 +00:00
from .idatahandler import IDataHandler
logger = logging.getLogger(__name__)
class ParquetDataHandler(IDataHandler):
_columns = DEFAULT_DATAFRAME_COLUMNS
def ohlcv_store(
2024-05-12 15:41:55 +00:00
self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType
) -> None:
2022-09-20 13:42:15 +00:00
"""
Store data in json format "values".
format looks as follows:
[[<date>,<open>,<high>,<low>,<close>]]
:param pair: Pair - used to generate filename
:param timeframe: Timeframe - used to generate filename
:param data: Dataframe containing OHLCV data
:param candle_type: Any of the enum CandleType (must match trading mode!)
:return: None
"""
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
self.create_dir_if_needed(filename)
data.reset_index(drop=True).loc[:, self._columns].to_parquet(filename)
2024-05-12 15:41:55 +00:00
def _ohlcv_load(
self, pair: str, timeframe: str, timerange: TimeRange | None, candle_type: CandleType
2024-05-12 15:41:55 +00:00
) -> DataFrame:
2022-09-20 13:42:15 +00:00
"""
Internal method used to load data for one pair from disk.
Implements the loading and conversion to a Pandas dataframe.
Timerange trimming and dataframe validation happens outside of this method.
:param pair: Pair to load data
:param timeframe: Timeframe (e.g. "5m")
:param timerange: Limit data to be loaded to this timerange.
Optionally implemented by subclasses to avoid loading
all data where possible.
:param candle_type: Any of the enum CandleType (must match trading mode!)
:return: DataFrame with ohlcv data, or empty DataFrame
"""
2024-05-12 15:41:55 +00:00
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type=candle_type)
2022-09-20 13:42:15 +00:00
if not filename.exists():
# Fallback mode for 1M files
filename = self._pair_data_filename(
2024-05-12 15:41:55 +00:00
self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True
)
2022-09-20 13:42:15 +00:00
if not filename.exists():
return DataFrame(columns=self._columns)
2022-09-23 16:24:30 +00:00
pairdata = read_parquet(filename)
pairdata.columns = self._columns
2024-05-12 15:41:55 +00:00
pairdata = pairdata.astype(
dtype={
"open": "float",
"high": "float",
"low": "float",
"close": "float",
"volume": "float",
}
)
pairdata["date"] = to_datetime(pairdata["date"], unit="ms", utc=True)
2022-09-20 13:42:15 +00:00
return pairdata
def ohlcv_append(
2024-05-12 15:41:55 +00:00
self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType
2022-09-20 13:42:15 +00:00
) -> None:
"""
Append data to existing data structures
:param pair: Pair
:param timeframe: Timeframe this ohlcv data is for
:param data: Data to append.
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
raise NotImplementedError()
def _trades_store(self, pair: str, data: DataFrame, trading_mode: TradingMode) -> None:
2022-09-20 13:42:15 +00:00
"""
Store trades data (list of Dicts) to file
:param pair: Pair - used for filename
:param data: Dataframe containing trades
2022-09-20 13:42:15 +00:00
column sequence as in DEFAULT_TRADES_COLUMNS
:param trading_mode: Trading mode to use (used to determine the filename)
2022-09-20 13:42:15 +00:00
"""
filename = self._pair_trades_filename(self._datadir, pair, trading_mode)
self.create_dir_if_needed(filename)
data.reset_index(drop=True).to_parquet(filename)
2022-09-20 13:42:15 +00:00
def trades_append(self, pair: str, data: DataFrame):
2022-09-20 13:42:15 +00:00
"""
Append data to existing files
:param pair: Pair - used for filename
:param data: Dataframe containing trades
2022-09-20 13:42:15 +00:00
column sequence as in DEFAULT_TRADES_COLUMNS
"""
raise NotImplementedError()
def _trades_load(
self, pair: str, trading_mode: TradingMode, timerange: TimeRange | None = None
) -> DataFrame:
2022-09-20 13:42:15 +00:00
"""
Load a pair from file, either .json.gz or .json
# TODO: respect timerange ...
:param pair: Load trades for this pair
:param trading_mode: Trading mode to use (used to determine the filename)
2022-09-20 13:42:15 +00:00
:param timerange: Timerange to load trades for - currently not implemented
:return: List of trades
"""
filename = self._pair_trades_filename(self._datadir, pair, trading_mode)
if not filename.exists():
return DataFrame(columns=DEFAULT_TRADES_COLUMNS)
2022-09-20 13:42:15 +00:00
tradesdata = read_parquet(filename)
2022-09-20 13:42:15 +00:00
return tradesdata
2022-09-20 13:42:15 +00:00
@classmethod
def _get_file_extension(cls):
return "parquet"