Merge branch 'feature/fetch-public-trades-cached' into feature/fetch-public-trades

This commit is contained in:
Joe Schr 2024-06-24 17:47:45 +02:00
commit 5379400ab3
4 changed files with 52 additions and 16 deletions

View File

@ -21,6 +21,7 @@ This guide walks you through utilizing public trade data for advanced orderflow
2. **Configure Orderflow Processing:** Define your desired settings for orderflow processing within the orderflow section of config.json. Here, you can adjust factors like:
- `cache_size`: How many previous orderflow candles are saved into cache instead of calculated every new candle
- `max_candles`: Filter how many candles get processed from the tail
- `scale`: This controls the price bin size for the footprint chart.
- `stacked_imbalance_range`: Defines the minimum consecutive imbalanced price levels required for consideration.
@ -29,6 +30,7 @@ This guide walks you through utilizing public trade data for advanced orderflow
```json
"orderflow": {
"cache_size": 1000,
"max_candles": 1500,
"scale": 0.5,
"stacked_imbalance_range": 3, // needs at least this amount of imbalance next to each other

View File

@ -537,6 +537,7 @@ CONF_SCHEMA = {
"orderflow": {
"type": "object",
"properties": {
"cache_size": {"type": "number", "minimum": 1, "default": 1000},
"max_candles": {"type": "number", "minimum": 1, "default": 1500},
"scale": {"type": "number", "minimum": 0.0},
"stacked_imbalance_range": {"type": "number", "minimum": 0},

View File

@ -4,16 +4,20 @@ Functions to convert orderflow data from public_trades
import logging
import time
from collections import OrderedDict
import numpy as np
import pandas as pd
from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, Config
from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS
from freqtrade.exceptions import DependencyException
logger = logging.getLogger(__name__)
# Global cache dictionary
cached_grouped_trades: OrderedDict[pd.Timestamp, pd.DataFrame] = OrderedDict()
def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame):
"""
@ -56,17 +60,16 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
df.drop(columns=["datetime"], inplace=True)
def populate_dataframe_with_trades(
config: Config, dataframe: pd.DataFrame, trades: pd.DataFrame
) -> pd.DataFrame:
def populate_dataframe_with_trades(config, dataframe, trades):
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
:param trades: Trades to populate with
:return: Dataframe with trades populated
"""
config_orderflow = config["orderflow"]
timeframe = config["timeframe"]
config_orderflow = config["orderflow"]
cache_size = config_orderflow["cache_size"]
# create columns for trades
_init_dataframe_with_trades_columns(dataframe)
@ -92,14 +95,13 @@ def populate_dataframe_with_trades(
stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object)
stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object)
trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False)
for candle_start, trades_grouped_df in trades_grouped_by_candle_start:
is_between = candle_start == dataframe["date"]
if is_between.any():
from freqtrade.exchange import timeframe_to_next_date
candle_next = timeframe_to_next_date(timeframe, candle_start)
# skip if there are no trades at next candle
# because that this candle isn't finished yet
if candle_next not in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades "
@ -109,6 +111,17 @@ def populate_dataframe_with_trades(
indices = dataframe.index[is_between].tolist()
# Add trades to each candle
trades_series.loc[indices] = [trades_grouped_df] * len(indices)
# Use caching mechanism
if (candle_start, candle_next) in cached_grouped_trades:
cache_entry = cached_grouped_trades[(candle_start, candle_next)]
# dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround:
# Create a dictionary of the column values to be assigned
update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns}
# Assign the values using the update_dict
dataframe.loc[is_between, update_dict.keys()] = pd.DataFrame(
[update_dict], index=dataframe.loc[is_between].index
)
continue
# Calculate orderflow for each candle
orderflow = trades_to_volumeprofile_with_total_delta_bid_ask(
@ -136,14 +149,11 @@ def populate_dataframe_with_trades(
] * len(indices)
bid = np.where(
trades_grouped_df["side"].str.contains("sell"),
trades_grouped_df["amount"],
0,
trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0
)
ask = np.where(
trades_grouped_df["side"].str.contains("buy"),
trades_grouped_df["amount"],
0,
trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0
)
deltas_per_trade = ask - bid
min_delta = deltas_per_trade.cumsum().min()
@ -157,6 +167,15 @@ def populate_dataframe_with_trades(
dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"]
)
dataframe.loc[indices, "total_trades"] = len(trades_grouped_df)
# Cache the result
cached_grouped_trades[(candle_start, candle_next)] = dataframe.loc[
is_between
].copy()
# Maintain cache size
if len(cached_grouped_trades) > cache_size:
cached_grouped_trades.popitem(last=False)
else:
logger.debug(f"Found NO candles for trades starting with {candle_start}")
logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds")

View File

@ -1,3 +1,5 @@
from collections import OrderedDict
import numpy as np
import pandas as pd
import pytest
@ -47,6 +49,15 @@ def public_trades_list_simple(testdatadir):
return read_csv(testdatadir / "orderflow/public_trades_list_simple_example.csv").copy()
@pytest.fixture
def reset_cache(request):
import freqtrade.data.converter.orderflow as orderflow
global orderflow # noqa F811
orderflow.cached_grouped_trades = OrderedDict()
yield
def test_public_trades_columns_before_change(
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
@ -71,7 +82,7 @@ def test_public_trades_columns_before_change(
def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
reset_cache, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
"""
Tests the `populate_dataframe_with_trades` function's order flow calculation.
@ -90,6 +101,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
config = {
"timeframe": "5m",
"orderflow": {
"cache_size": 1000,
"max_candles": 1500,
"scale": 0.005,
"imbalance_volume": 0,
@ -169,7 +181,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
reset_cache, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
"""
Tests the `populate_dataframe_with_trades` function's handling of trades,
@ -201,6 +213,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
config = {
"timeframe": "5m",
"orderflow": {
"cache_size": 1000,
"max_candles": 1500,
"scale": 0.5,
"imbalance_volume": 0,
@ -243,7 +256,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
assert 169.442 == row["ask"]
# Assert the number of trades
assert 151 == len(row.trades)
assert 151 == len(row["trades"])
# Assert specific details of the first trade
t = row["trades"].iloc[0]
@ -367,6 +380,7 @@ def test_public_trades_config_max_trades(
orderflow_config = {
"timeframe": "5m",
"orderflow": {
"cache_size": 1000,
"max_candles": 1,
"scale": 0.005,
"imbalance_volume": 0,