""" Functions to convert orderflow data from public_trades """ import logging import time import typing from collections import OrderedDict from datetime import datetime from typing import Dict, Tuple import numpy as np import pandas as pd from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS from freqtrade.exceptions import DependencyException logger = logging.getLogger(__name__) # Global cache dictionary cached_grouped_trades_per_pair: Dict[str, OrderedDict[Tuple[datetime, datetime], pd.DataFrame]] = {} def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): """ Populates a dataframe with trades columns :param dataframe: Dataframe to populate """ # Initialize columns with appropriate dtypes dataframe["trades"] = np.nan dataframe["orderflow"] = np.nan dataframe["imbalances"] = np.nan dataframe["stacked_imbalances_bid"] = np.nan dataframe["stacked_imbalances_ask"] = np.nan dataframe["max_delta"] = np.nan dataframe["min_delta"] = np.nan dataframe["bid"] = np.nan dataframe["ask"] = np.nan dataframe["delta"] = np.nan dataframe["total_trades"] = np.nan # Ensure the 'trades' column is of object type dataframe["trades"] = dataframe["trades"].astype(object) dataframe["orderflow"] = dataframe["orderflow"].astype(object) dataframe["imbalances"] = dataframe["imbalances"].astype(object) dataframe["stacked_imbalances_bid"] = dataframe["stacked_imbalances_bid"].astype(object) dataframe["stacked_imbalances_ask"] = dataframe["stacked_imbalances_ask"].astype(object) def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): from freqtrade.exchange import timeframe_to_next_date, timeframe_to_resample_freq timeframe_frequency = timeframe_to_resample_freq(timeframe) # calculate ohlcv candle start and end if df is not None and not df.empty: df["datetime"] = pd.to_datetime(df["date"], unit="ms") df["candle_start"] = df["datetime"].dt.floor(timeframe_frequency) # used in _now_is_time_to_refresh_trades df["candle_end"] = df["candle_start"].apply( lambda candle_start: timeframe_to_next_date(timeframe, candle_start) ) df.drop(columns=["datetime"], inplace=True) def populate_dataframe_with_trades( pair: str, config, dataframe: pd.DataFrame, trades: pd.DataFrame ): """ Populates a dataframe with trades :param dataframe: Dataframe to populate :param trades: Trades to populate with :return: Dataframe with trades populated """ timeframe = config["timeframe"] config_orderflow = config["orderflow"] cache_size = config_orderflow["cache_size"] # create columns for trades _init_dataframe_with_trades_columns(dataframe) try: cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], pd.DataFrame] = ( cached_grouped_trades_per_pair.get(pair, OrderedDict()) ) start_time = time.time() # calculate ohlcv candle start and end _calculate_ohlcv_candle_start_and_end(trades, timeframe) # get date of earliest max_candles candle max_candles = config_orderflow["max_candles"] start_date = dataframe.tail(max_candles).date.iat[0] # slice of trades that are before current ohlcv candles to make groupby faster trades = trades.loc[trades.candle_start >= start_date] trades.reset_index(inplace=True, drop=True) # group trades by candle start trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False) # Create Series to hold complex data trades_series = pd.Series(index=dataframe.index, dtype=object) orderflow_series = pd.Series(index=dataframe.index, dtype=object) imbalances_series = pd.Series(index=dataframe.index, dtype=object) stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object) stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object) trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False) for candle_start, trades_grouped_df in trades_grouped_by_candle_start: is_between = candle_start == dataframe["date"] if is_between.any(): from freqtrade.exchange import timeframe_to_next_date candle_next = timeframe_to_next_date(timeframe, typing.cast(datetime, candle_start)) if candle_next not in trades_grouped_by_candle_start.groups: logger.warning( f"candle at {candle_start} with {len(trades_grouped_df)} trades " f"might be unfinished, because no finished trades at {candle_next}" ) indices = dataframe.index[is_between].tolist() # Add trades to each candle trades_series.loc[indices] = [trades_grouped_df] # Use caching mechanism if (candle_start, candle_next) in cached_grouped_trades: cache_entry = cached_grouped_trades[ (typing.cast(datetime, candle_start), candle_next) ] # dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround: # Create a dictionary of the column values to be assigned update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns} # Assign the values using the update_dict dataframe.loc[is_between, update_dict.keys()] = pd.DataFrame( [update_dict], index=dataframe.loc[is_between].index ) continue # Calculate orderflow for each candle orderflow = trades_to_volumeprofile_with_total_delta_bid_ask( trades_grouped_df, scale=config_orderflow["scale"] ) orderflow_series.loc[indices] = [orderflow] # Calculate imbalances for each candle's orderflow imbalances = trades_orderflow_to_imbalances( orderflow, imbalance_ratio=config_orderflow["imbalance_ratio"], imbalance_volume=config_orderflow["imbalance_volume"], ) imbalances_series.loc[indices] = [imbalances] stacked_imbalance_range = config_orderflow["stacked_imbalance_range"] stacked_imbalances_bid_series.loc[indices] = [ stacked_imbalance_bid( imbalances, stacked_imbalance_range=stacked_imbalance_range ) ] stacked_imbalances_ask_series.loc[indices] = [ stacked_imbalance_ask( imbalances, stacked_imbalance_range=stacked_imbalance_range ) ] bid = np.where( trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0 ) ask = np.where( trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0 ) deltas_per_trade = ask - bid min_delta = deltas_per_trade.cumsum().min() max_delta = deltas_per_trade.cumsum().max() dataframe.loc[indices, "max_delta"] = max_delta dataframe.loc[indices, "min_delta"] = min_delta dataframe.loc[indices, "bid"] = bid.sum() dataframe.loc[indices, "ask"] = ask.sum() dataframe.loc[indices, "delta"] = ( dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"] ) dataframe.loc[indices, "total_trades"] = len(trades_grouped_df) # Cache the result cached_grouped_trades[(typing.cast(datetime, candle_start), candle_next)] = ( dataframe.loc[is_between].copy() ) # Maintain cache size if len(cached_grouped_trades) > cache_size: cached_grouped_trades.popitem(last=False) else: logger.debug(f"Found NO candles for trades starting with {candle_start}") logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds") # Merge the complex data Series back into the DataFrame dataframe["trades"] = trades_series dataframe["orderflow"] = orderflow_series dataframe["imbalances"] = imbalances_series dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series # dereference old cache if pair in cached_grouped_trades_per_pair: del cached_grouped_trades_per_pair[pair] cached_grouped_trades_per_pair[pair] = cached_grouped_trades except Exception as e: logger.exception("Error populating dataframe with trades") raise DependencyException(e) return dataframe def trades_to_volumeprofile_with_total_delta_bid_ask( trades: pd.DataFrame, scale: float ) -> pd.DataFrame: """ :param trades: dataframe :param scale: scale aka bin size e.g. 0.5 :return: trades binned to levels according to scale aka orderflow """ df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS) # create bid, ask where side is sell or buy df["bid_amount"] = np.where(trades["side"].str.contains("sell"), trades["amount"], 0) df["ask_amount"] = np.where(trades["side"].str.contains("buy"), trades["amount"], 0) df["bid"] = np.where(trades["side"].str.contains("sell"), 1, 0) df["ask"] = np.where(trades["side"].str.contains("buy"), 1, 0) # round the prices to the nearest multiple of the scale df["price"] = ((trades["price"] / scale).round() * scale).astype("float64").values if df.empty: df["total"] = np.nan df["delta"] = np.nan return df df["delta"] = df["ask_amount"] - df["bid_amount"] df["total_volume"] = df["ask_amount"] + df["bid_amount"] df["total_trades"] = df["ask"] + df["bid"] # group to bins aka apply scale df = df.groupby("price").sum(numeric_only=True) return df def trades_orderflow_to_imbalances(df: pd.DataFrame, imbalance_ratio: int, imbalance_volume: int): """ :param df: dataframes with bid and ask :param imbalance_ratio: imbalance_ratio e.g. 3 :param imbalance_volume: imbalance volume e.g. 10 :return: dataframe with bid and ask imbalance """ bid = df.bid # compares bid and ask diagonally ask = df.ask.shift(-1) bid_imbalance = (bid / ask) > (imbalance_ratio) # overwrite bid_imbalance with False if volume is not big enough bid_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, bid_imbalance) ask_imbalance = (ask / bid) > (imbalance_ratio) # overwrite ask_imbalance with False if volume is not big enough ask_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, ask_imbalance) dataframe = pd.DataFrame( {"bid_imbalance": bid_imbalance_filtered, "ask_imbalance": ask_imbalance_filtered}, index=df.index, ) return dataframe def stacked_imbalance( df: pd.DataFrame, label: str, stacked_imbalance_range: int, should_reverse: bool ): """ y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1) https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array """ imbalance = df[f"{label}_imbalance"] int_series = pd.Series(np.where(imbalance, 1, 0)) stacked = int_series * ( int_series.groupby((int_series != int_series.shift()).cumsum()).cumcount() + 1 ) max_stacked_imbalance_idx = stacked.index[stacked >= stacked_imbalance_range] stacked_imbalance_price = np.nan if not max_stacked_imbalance_idx.empty: idx = ( max_stacked_imbalance_idx[0] if not should_reverse else np.flipud(max_stacked_imbalance_idx)[0] ) stacked_imbalance_price = imbalance.index[idx] return stacked_imbalance_price def stacked_imbalance_ask(df: pd.DataFrame, stacked_imbalance_range: int): return stacked_imbalance(df, "ask", stacked_imbalance_range, should_reverse=True) def stacked_imbalance_bid(df: pd.DataFrame, stacked_imbalance_range: int): return stacked_imbalance(df, "bid", stacked_imbalance_range, should_reverse=False)