orderflow: use cache per pair

This commit is contained in:
Joe Schr 2024-06-26 19:54:36 +02:00
parent dad2cad525
commit 7de102320c
3 changed files with 29 additions and 12 deletions

View File

@ -6,6 +6,8 @@ import logging
import time
import typing
from collections import OrderedDict
from datetime import datetime
from typing import Dict, Tuple
import numpy as np
import pandas as pd
@ -17,7 +19,7 @@ from freqtrade.exceptions import DependencyException
logger = logging.getLogger(__name__)
# Global cache dictionary
cached_grouped_trades: OrderedDict[pd.Timestamp, pd.DataFrame] = OrderedDict()
cached_grouped_trades_per_pair: Dict[str, OrderedDict[Tuple[datetime, datetime], pd.DataFrame]] = {}
def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame):
@ -61,7 +63,9 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
df.drop(columns=["datetime"], inplace=True)
def populate_dataframe_with_trades(config, dataframe, trades):
def populate_dataframe_with_trades(
pair: str, config, dataframe: pd.DataFrame, trades: pd.DataFrame
):
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
@ -76,6 +80,9 @@ def populate_dataframe_with_trades(config, dataframe, trades):
_init_dataframe_with_trades_columns(dataframe)
try:
cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], pd.DataFrame] = (
cached_grouped_trades_per_pair.get(pair, OrderedDict())
)
start_time = time.time()
# calculate ohlcv candle start and end
_calculate_ohlcv_candle_start_and_end(trades, timeframe)
@ -114,7 +121,9 @@ def populate_dataframe_with_trades(config, dataframe, trades):
trades_series.loc[indices] = [trades_grouped_df]
# Use caching mechanism
if (candle_start, candle_next) in cached_grouped_trades:
cache_entry = cached_grouped_trades[(typing.cast(datetime, candle_start), candle_next)]
cache_entry = cached_grouped_trades[
(typing.cast(datetime, candle_start), candle_next)
]
# dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround:
# Create a dictionary of the column values to be assigned
update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns}
@ -170,9 +179,9 @@ def populate_dataframe_with_trades(config, dataframe, trades):
dataframe.loc[indices, "total_trades"] = len(trades_grouped_df)
# Cache the result
cached_grouped_trades[(typing.cast(datetime, candle_start), candle_next)] = dataframe.loc[
is_between
].copy()
cached_grouped_trades[(typing.cast(datetime, candle_start), candle_next)] = (
dataframe.loc[is_between].copy()
)
# Maintain cache size
if len(cached_grouped_trades) > cache_size:
@ -187,6 +196,10 @@ def populate_dataframe_with_trades(config, dataframe, trades):
dataframe["imbalances"] = imbalances_series
dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series
dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series
# dereference old cache
if pair in cached_grouped_trades_per_pair:
del cached_grouped_trades_per_pair[pair]
cached_grouped_trades_per_pair[pair] = cached_grouped_trades
except Exception as e:
logger.exception("Error populating dataframe with trades")
@ -195,7 +208,9 @@ def populate_dataframe_with_trades(config, dataframe, trades):
return dataframe
def trades_to_volumeprofile_with_total_delta_bid_ask(trades: pd.DataFrame, scale: float) -> pd.DataFrame:
def trades_to_volumeprofile_with_total_delta_bid_ask(
trades: pd.DataFrame, scale: float
) -> pd.DataFrame:
"""
:param trades: dataframe
:param scale: scale aka bin size e.g. 0.5

View File

@ -1601,7 +1601,7 @@ class IStrategy(ABC, HyperStrategyMixin):
config = self.config
config["timeframe"] = self.timeframe
# TODO: slice trades to size of dataframe for faster backtesting
dataframe = populate_dataframe_with_trades(config, dataframe, trades)
dataframe = populate_dataframe_with_trades(metadata["pair"], config, dataframe, trades)
logger.debug("Populated dataframe with trades.")

View File

@ -54,7 +54,7 @@ def reset_cache(request):
import freqtrade.data.converter.orderflow as orderflow
global orderflow # noqa F811
orderflow.cached_grouped_trades = OrderedDict()
orderflow.cached_grouped_trades_per_pair = {}
yield
@ -110,7 +110,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
},
}
# Apply the function to populate the data frame with order flow data
df = populate_dataframe_with_trades(config, dataframe, trades)
df = populate_dataframe_with_trades("BTC/UDST", config, dataframe, trades)
# Extract results from the first row of the DataFrame
results = df.iloc[0]
t = results["trades"]
@ -223,7 +223,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
}
# Populate the DataFrame with trades and order flow data
df = populate_dataframe_with_trades(config, dataframe, trades)
df = populate_dataframe_with_trades("BTC/UDST", config, dataframe, trades)
# --- DataFrame and Trade Data Validation ---
@ -389,7 +389,9 @@ def test_public_trades_config_max_trades(
},
}
df = populate_dataframe_with_trades(default_conf | orderflow_config, dataframe, trades)
df = populate_dataframe_with_trades(
"BTC/UDST", default_conf | orderflow_config, dataframe, trades
)
assert df.delta.count() == 1