Moves orderflow logic to it's own file

This commit is contained in:
Joe Schr 2024-02-12 13:20:52 +01:00
parent 64e9784d1f
commit bff93e31c8
4 changed files with 285 additions and 271 deletions

View File

@ -1,9 +1,8 @@
from freqtrade.data.converter.converter import (clean_ohlcv_dataframe, convert_ohlcv_format,
ohlcv_fill_up_missing_data, ohlcv_to_dataframe,
order_book_to_dataframe,
populate_dataframe_with_trades,
reduce_dataframe_footprint, trim_dataframe,
trim_dataframes)
order_book_to_dataframe, reduce_dataframe_footprint,
trim_dataframe, trim_dataframes)
from freqtrade.data.converter.orderflow import populate_dataframe_with_trades
from freqtrade.data.converter.trade_converter import (convert_trades_format,
convert_trades_to_ohlcv, trades_convert_types,
trades_df_remove_duplicates,

View File

@ -2,16 +2,14 @@
Functions to convert data from one format to another
"""
import logging
import time
from typing import Dict
import numpy as np
import pandas as pd
from pandas import DataFrame, to_datetime
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, DEFAULT_ORDERFLOW_COLUMNS, Config
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, Config
from freqtrade.enums import CandleType, TradingMode
from freqtrade.exchange.exchange_utils import timeframe_to_resample_freq
logger = logging.getLogger(__name__)
@ -46,269 +44,6 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *,
drop_incomplete=drop_incomplete)
def _init_dataframe_with_trades_columns(dataframe: DataFrame):
"""
Populates a dataframe with trades columns
:param dataframe: Dataframe to populate
"""
dataframe['trades'] = dataframe.apply(lambda _: [], axis=1)
dataframe['orderflow'] = dataframe.apply(lambda _: {}, axis=1)
dataframe['bid'] = np.nan
dataframe['ask'] = np.nan
dataframe['delta'] = np.nan
dataframe['min_delta'] = np.nan
dataframe['max_delta'] = np.nan
dataframe['total_trades'] = np.nan
dataframe['stacked_imbalances_bid'] = np.nan
dataframe['stacked_imbalances_ask'] = np.nan
def _convert_timeframe_to_pandas_frequency(timeframe: str):
# convert timeframe to format usable by pandas
from freqtrade.exchange import timeframe_to_minutes
timeframe_minutes = timeframe_to_minutes(timeframe)
timeframe_frequency = f'{timeframe_minutes}min'
return (timeframe_frequency, timeframe_minutes)
def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str):
_, timeframe_minutes = _convert_timeframe_to_pandas_frequency(
timeframe)
timeframe_frequency = timeframe_to_resample_freq(timeframe)
# calculate ohlcv candle start and end
if df is not None and not df.empty:
df['datetime'] = pd.to_datetime(df['date'], unit='ms')
df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency)
df['candle_end'] = df['candle_start'] + \
pd.Timedelta(minutes=timeframe_minutes)
df.drop(columns=['datetime'], inplace=True)
def populate_dataframe_with_trades(config: Config,
dataframe: DataFrame,
trades: DataFrame,
*,
pair: str) -> DataFrame:
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
:param trades: Trades to populate with
:return: Dataframe with trades populated
"""
config_orderflow = config['orderflow']
timeframe = config['timeframe']
# create columns for trades
_init_dataframe_with_trades_columns(dataframe)
df = dataframe.copy()
try:
start_time = time.time()
# calculate ohlcv candle start and end
# TODO: check if call is necessary for df.
_calculate_ohlcv_candle_start_and_end(df, timeframe)
_calculate_ohlcv_candle_start_and_end(trades, timeframe)
# slice of trades that are before current ohlcv candles to make groupby faster
# TODO: maybe use df.date instead of df.candle_start at comparision below
trades = trades.loc[trades.candle_start >= df.candle_start[0]]
trades.reset_index(inplace=True, drop=True)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby(
'candle_start', group_keys=False)
for candle_start in trades_grouped_by_candle_start.groups:
trades_grouped_df = trades[candle_start == trades['candle_start']]
is_between = (candle_start == df['candle_start'])
if np.any(is_between == True): # noqa: E712
(_, timeframe_minutes) = _convert_timeframe_to_pandas_frequency(timeframe)
candle_next = candle_start + \
pd.Timedelta(minutes=timeframe_minutes)
# skip if there are no trades at next candle
# because that this candle isn't finished yet
if candle_next not in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades "
f"might be unfinished, because no finished trades at {candle_next}")
# add trades to each candle
df.loc[is_between, 'trades'] = df.loc[is_between,
'trades'].apply(lambda _: trades_grouped_df)
# calculate orderflow for each candle
df.loc[is_between, 'orderflow'] = df.loc[is_between, 'orderflow'].apply(
lambda _: trades_to_volumeprofile_with_total_delta_bid_ask(
pd.DataFrame(trades_grouped_df),
scale=config_orderflow['scale']))
# calculate imbalances for each candle's orderflow
df.loc[is_between, 'imbalances'] = df.loc[is_between, 'orderflow'].apply(
lambda x: trades_orderflow_to_imbalances(
x,
imbalance_ratio=config_orderflow['imbalance_ratio'],
imbalance_volume=config_orderflow['imbalance_volume']))
_stacked_imb = config_orderflow['stacked_imbalance_range']
df.loc[is_between, 'stacked_imbalances_bid'] = df.loc[
is_between, 'imbalances'].apply(lambda x: stacked_imbalance_bid(
x,
stacked_imbalance_range=_stacked_imb))
df.loc[is_between, 'stacked_imbalances_ask'] = df.loc[
is_between, 'imbalances'].apply(
lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb))
# TODO: maybe use simple np.where instead
buy = df.loc[is_between, 'bid'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount']))
sell = df.loc[is_between, 'ask'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('sell'), 0, trades_grouped_df['amount']))
deltas_per_trade = sell - buy
min_delta = 0
max_delta = 0
delta = 0
for deltas in deltas_per_trade:
for d in deltas:
delta += d
if delta > max_delta:
max_delta = delta
if delta < min_delta:
min_delta = delta
df.loc[is_between, 'max_delta'] = max_delta
df.loc[is_between, 'min_delta'] = min_delta
df.loc[is_between, 'bid'] = np.where(trades_grouped_df['side'].str.contains(
'buy'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'ask'] = np.where(trades_grouped_df['side'].str.contains(
'sell'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'delta'] = df.loc[is_between,
'ask'] - df.loc[is_between, 'bid']
min_delta = np.min(deltas_per_trade)
max_delta = np.max(deltas_per_trade)
df.loc[is_between, 'total_trades'] = len(trades_grouped_df)
# copy to avoid memory leaks
dataframe.loc[is_between] = df.loc[is_between].copy()
else:
logger.debug(
f"Found NO candles for trades starting with {candle_start}")
logger.debug(
f"trades.groups_keys in {time.time() - start_time} seconds")
logger.debug(
f"trades.singleton_iterate in {time.time() - start_time} seconds")
except Exception as e:
logger.exception("Error populating dataframe with trades:", e)
return dataframe
def trades_to_volumeprofile_with_total_delta_bid_ask(trades: DataFrame, scale: float):
"""
:param trades: dataframe
:param scale: scale aka bin size e.g. 0.5
:return: trades binned to levels according to scale aka orderflow
"""
df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS)
# create bid, ask where side is sell or buy
df['bid_amount'] = np.where(
trades['side'].str.contains('buy'), 0, trades['amount'])
df['ask_amount'] = np.where(
trades['side'].str.contains('sell'), 0, trades['amount'])
df['bid'] = np.where(trades['side'].str.contains('buy'), 0, 1)
df['ask'] = np.where(trades['side'].str.contains('sell'), 0, 1)
# round the prices to the nearest multiple of the scale
df['price'] = ((trades['price'] / scale).round()
* scale).astype('float64').values
if df.empty:
df['total'] = np.nan
df['delta'] = np.nan
return df
df['delta'] = df['ask_amount'] - df['bid_amount']
df['total_volume'] = df['ask_amount'] + df['bid_amount']
df['total_trades'] = df['ask'] + df['bid']
# group to bins aka apply scale
df = df.groupby('price').sum(numeric_only=True)
return df
def trades_orderflow_to_imbalances(df: DataFrame, imbalance_ratio: int, imbalance_volume: int):
"""
:param df: dataframes with bid and ask
:param imbalance_ratio: imbalance_ratio e.g. 300
:param imbalance_volume: imbalance volume e.g. 3)
:return: dataframe with bid and ask imbalance
"""
bid = df.bid
ask = df.ask.shift(-1)
bid_imbalance = (bid / ask) > (imbalance_ratio / 100)
# overwrite bid_imbalance with False if volume is not big enough
bid_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, bid_imbalance)
ask_imbalance = (ask / bid) > (imbalance_ratio / 100)
# overwrite ask_imbalance with False if volume is not big enough
ask_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, ask_imbalance)
dataframe = DataFrame({
"bid_imbalance": bid_imbalance_filtered,
"ask_imbalance": ask_imbalance_filtered
}, index=df.index,
)
return dataframe
def stacked_imbalance(df: DataFrame,
label: str,
stacked_imbalance_range: int,
should_reverse: bool):
"""
y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)
https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array
"""
imbalance = df[f'{label}_imbalance']
int_series = pd.Series(np.where(imbalance, 1, 0))
stacked = (
int_series * (
int_series.groupby(
(int_series != int_series.shift()).cumsum()).cumcount() + 1
)
)
max_stacked_imbalance_idx = stacked.index[stacked >=
stacked_imbalance_range]
stacked_imbalance_price = np.nan
if not max_stacked_imbalance_idx.empty:
idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud(
max_stacked_imbalance_idx)[0]
stacked_imbalance_price = imbalance.index[idx]
return stacked_imbalance_price
def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, 'bid', stacked_imbalance_range, should_reverse=False)
def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True)
def orderflow_to_volume_profile(df: DataFrame):
"""
:param orderflow: dataframe
:return: volume profile dataframe
"""
bid = df.groupby('level').bid.sum()
ask = df.groupby('level').ask.sum()
df.groupby('level')['level'].sum()
delta = df.groupby('level').ask.sum() - df.groupby('level').bid.sum()
df = pd.DataFrame({'bid': bid, 'ask': ask, 'delta': delta})
return df
def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool, drop_incomplete: bool) -> DataFrame:
"""

View File

@ -0,0 +1,280 @@
"""
Functions to convert orderflow data from public_trades
"""
import logging
import time
import numpy as np
import pandas as pd
from pandas import DataFrame
from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, Config
from freqtrade.exchange.exchange_utils import timeframe_to_resample_freq
logger = logging.getLogger(__name__)
def _init_dataframe_with_trades_columns(dataframe: DataFrame):
"""
Populates a dataframe with trades columns
:param dataframe: Dataframe to populate
"""
dataframe['trades'] = dataframe.apply(lambda _: [], axis=1)
dataframe['orderflow'] = dataframe.apply(lambda _: {}, axis=1)
dataframe['bid'] = np.nan
dataframe['ask'] = np.nan
dataframe['delta'] = np.nan
dataframe['min_delta'] = np.nan
dataframe['max_delta'] = np.nan
dataframe['total_trades'] = np.nan
dataframe['stacked_imbalances_bid'] = np.nan
dataframe['stacked_imbalances_ask'] = np.nan
def _convert_timeframe_to_pandas_frequency(timeframe: str):
# convert timeframe to format usable by pandas
from freqtrade.exchange import timeframe_to_minutes
timeframe_minutes = timeframe_to_minutes(timeframe)
timeframe_frequency = f'{timeframe_minutes}min'
return (timeframe_frequency, timeframe_minutes)
def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str):
_, timeframe_minutes = _convert_timeframe_to_pandas_frequency(
timeframe)
timeframe_frequency = timeframe_to_resample_freq(timeframe)
# calculate ohlcv candle start and end
if df is not None and not df.empty:
df['datetime'] = pd.to_datetime(df['date'], unit='ms')
df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency)
# used in _now_is_time_to_refresh_trades
df['candle_end'] = df['candle_start'] + \
pd.Timedelta(minutes=timeframe_minutes)
df.drop(columns=['datetime'], inplace=True)
def populate_dataframe_with_trades(config: Config,
dataframe: DataFrame,
trades: DataFrame,
*,
pair: str) -> DataFrame:
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
:param trades: Trades to populate with
:return: Dataframe with trades populated
"""
config_orderflow = config['orderflow']
timeframe = config['timeframe']
# create columns for trades
_init_dataframe_with_trades_columns(dataframe)
df = dataframe.copy()
try:
start_time = time.time()
# calculate ohlcv candle start and end
# TODO: check if call is necessary for df.
_calculate_ohlcv_candle_start_and_end(df, timeframe)
_calculate_ohlcv_candle_start_and_end(trades, timeframe)
# slice of trades that are before current ohlcv candles to make groupby faster
# TODO: maybe use df.date instead of df.candle_start at comparision below
trades = trades.loc[trades.candle_start >= df.candle_start[0]]
trades.reset_index(inplace=True, drop=True)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby(
'candle_start', group_keys=False)
for candle_start in trades_grouped_by_candle_start.groups:
trades_grouped_df = trades[candle_start == trades['candle_start']]
is_between = (candle_start == df['candle_start'])
if np.any(is_between == True): # noqa: E712
(_, timeframe_minutes) = _convert_timeframe_to_pandas_frequency(timeframe)
candle_next = candle_start + \
pd.Timedelta(minutes=timeframe_minutes)
# skip if there are no trades at next candle
# because that this candle isn't finished yet
if candle_next not in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades "
f"might be unfinished, because no finished trades at {candle_next}")
# add trades to each candle
df.loc[is_between, 'trades'] = df.loc[is_between,
'trades'].apply(lambda _: trades_grouped_df)
# calculate orderflow for each candle
df.loc[is_between, 'orderflow'] = df.loc[is_between, 'orderflow'].apply(
lambda _: trades_to_volumeprofile_with_total_delta_bid_ask(
pd.DataFrame(trades_grouped_df),
scale=config_orderflow['scale']))
# calculate imbalances for each candle's orderflow
df.loc[is_between, 'imbalances'] = df.loc[is_between, 'orderflow'].apply(
lambda x: trades_orderflow_to_imbalances(
x,
imbalance_ratio=config_orderflow['imbalance_ratio'],
imbalance_volume=config_orderflow['imbalance_volume']))
_stacked_imb = config_orderflow['stacked_imbalance_range']
df.loc[is_between, 'stacked_imbalances_bid'] = df.loc[
is_between, 'imbalances'].apply(lambda x: stacked_imbalance_bid(
x,
stacked_imbalance_range=_stacked_imb))
df.loc[is_between, 'stacked_imbalances_ask'] = df.loc[
is_between, 'imbalances'].apply(
lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb))
# TODO: maybe use simple np.where instead
buy = df.loc[is_between, 'bid'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount']))
sell = df.loc[is_between, 'ask'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('sell'), 0, trades_grouped_df['amount']))
deltas_per_trade = sell - buy
min_delta = 0
max_delta = 0
delta = 0
for deltas in deltas_per_trade:
for d in deltas:
delta += d
if delta > max_delta:
max_delta = delta
if delta < min_delta:
min_delta = delta
df.loc[is_between, 'max_delta'] = max_delta
df.loc[is_between, 'min_delta'] = min_delta
df.loc[is_between, 'bid'] = np.where(trades_grouped_df['side'].str.contains(
'buy'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'ask'] = np.where(trades_grouped_df['side'].str.contains(
'sell'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'delta'] = df.loc[is_between,
'ask'] - df.loc[is_between, 'bid']
min_delta = np.min(deltas_per_trade)
max_delta = np.max(deltas_per_trade)
df.loc[is_between, 'total_trades'] = len(trades_grouped_df)
# copy to avoid memory leaks
dataframe.loc[is_between] = df.loc[is_between].copy()
else:
logger.debug(
f"Found NO candles for trades starting with {candle_start}")
logger.debug(
f"trades.groups_keys in {time.time() - start_time} seconds")
logger.debug(
f"trades.singleton_iterate in {time.time() - start_time} seconds")
except Exception as e:
logger.exception("Error populating dataframe with trades:", e)
return dataframe
def trades_to_volumeprofile_with_total_delta_bid_ask(trades: DataFrame, scale: float):
"""
:param trades: dataframe
:param scale: scale aka bin size e.g. 0.5
:return: trades binned to levels according to scale aka orderflow
"""
df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS)
# create bid, ask where side is sell or buy
df['bid_amount'] = np.where(
trades['side'].str.contains('buy'), 0, trades['amount'])
df['ask_amount'] = np.where(
trades['side'].str.contains('sell'), 0, trades['amount'])
df['bid'] = np.where(trades['side'].str.contains('buy'), 0, 1)
df['ask'] = np.where(trades['side'].str.contains('sell'), 0, 1)
# round the prices to the nearest multiple of the scale
df['price'] = ((trades['price'] / scale).round()
* scale).astype('float64').values
if df.empty:
df['total'] = np.nan
df['delta'] = np.nan
return df
df['delta'] = df['ask_amount'] - df['bid_amount']
df['total_volume'] = df['ask_amount'] + df['bid_amount']
df['total_trades'] = df['ask'] + df['bid']
# group to bins aka apply scale
df = df.groupby('price').sum(numeric_only=True)
return df
def trades_orderflow_to_imbalances(df: DataFrame, imbalance_ratio: int, imbalance_volume: int):
"""
:param df: dataframes with bid and ask
:param imbalance_ratio: imbalance_ratio e.g. 300
:param imbalance_volume: imbalance volume e.g. 3)
:return: dataframe with bid and ask imbalance
"""
bid = df.bid
ask = df.ask.shift(-1)
bid_imbalance = (bid / ask) > (imbalance_ratio / 100)
# overwrite bid_imbalance with False if volume is not big enough
bid_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, bid_imbalance)
ask_imbalance = (ask / bid) > (imbalance_ratio / 100)
# overwrite ask_imbalance with False if volume is not big enough
ask_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, ask_imbalance)
dataframe = DataFrame({
"bid_imbalance": bid_imbalance_filtered,
"ask_imbalance": ask_imbalance_filtered
}, index=df.index,
)
return dataframe
def stacked_imbalance(df: DataFrame,
label: str,
stacked_imbalance_range: int,
should_reverse: bool):
"""
y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)
https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array
"""
imbalance = df[f'{label}_imbalance']
int_series = pd.Series(np.where(imbalance, 1, 0))
stacked = (
int_series * (
int_series.groupby(
(int_series != int_series.shift()).cumsum()).cumcount() + 1
)
)
max_stacked_imbalance_idx = stacked.index[stacked >=
stacked_imbalance_range]
stacked_imbalance_price = np.nan
if not max_stacked_imbalance_idx.empty:
idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud(
max_stacked_imbalance_idx)[0]
stacked_imbalance_price = imbalance.index[idx]
return stacked_imbalance_price
def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, 'bid', stacked_imbalance_range, should_reverse=False)
def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True)
def orderflow_to_volume_profile(df: DataFrame):
"""
:param orderflow: dataframe
:return: volume profile dataframe
"""
bid = df.groupby('level').bid.sum()
ask = df.groupby('level').ask.sum()
df.groupby('level')['level'].sum()
delta = df.groupby('level').ask.sum() - df.groupby('level').bid.sum()
df = pd.DataFrame({'bid': bid, 'ask': ask, 'delta': delta})
return df

View File

@ -25,7 +25,7 @@ from freqtrade.constants import (DEFAULT_AMOUNT_RESERVE_PERCENT, DEFAULT_TRADES_
ExchangeConfig, ListPairsWithTimeframes, MakerTaker, OBLiteral,
PairWithTimeframe)
from freqtrade.data.converter import clean_ohlcv_dataframe, ohlcv_to_dataframe, trades_dict_to_list
from freqtrade.data.converter.converter import _calculate_ohlcv_candle_start_and_end
from freqtrade.data.converter.orderflow import _calculate_ohlcv_candle_start_and_end
from freqtrade.data.converter.trade_converter import trades_df_remove_duplicates, trades_list_to_df
from freqtrade.enums import OPTIMIZE_MODES, CandleType, MarginMode, PriceType, RunMode, TradingMode
from freqtrade.exceptions import (DDosProtection, ExchangeError, InsufficientFundsError,