""" Functions to convert orderflow data from public_trades """ import logging import time import numpy as np import pandas as pd from pandas import DataFrame from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, Config logger = logging.getLogger(__name__) def _init_dataframe_with_trades_columns(dataframe: DataFrame): """ Populates a dataframe with trades columns :param dataframe: Dataframe to populate """ dataframe['trades'] = dataframe.apply(lambda _: [], axis=1) dataframe['orderflow'] = dataframe.apply(lambda _: {}, axis=1) dataframe['bid'] = np.nan dataframe['ask'] = np.nan dataframe['delta'] = np.nan dataframe['min_delta'] = np.nan dataframe['max_delta'] = np.nan dataframe['total_trades'] = np.nan dataframe['stacked_imbalances_bid'] = np.nan dataframe['stacked_imbalances_ask'] = np.nan def _convert_timeframe_to_pandas_frequency(timeframe: str): # convert timeframe to format usable by pandas from freqtrade.exchange import timeframe_to_minutes timeframe_minutes = timeframe_to_minutes(timeframe) timeframe_frequency = f'{timeframe_minutes}min' return (timeframe_frequency, timeframe_minutes) def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str): from freqtrade.exchange.exchange_utils import timeframe_to_resample_freq _, timeframe_minutes = _convert_timeframe_to_pandas_frequency( timeframe) timeframe_frequency = timeframe_to_resample_freq(timeframe) # calculate ohlcv candle start and end if df is not None and not df.empty: df['datetime'] = pd.to_datetime(df['date'], unit='ms') df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency) # used in _now_is_time_to_refresh_trades df['candle_end'] = df['candle_start'] + \ pd.Timedelta(minutes=timeframe_minutes) df.drop(columns=['datetime'], inplace=True) def populate_dataframe_with_trades(config: Config, dataframe: DataFrame, trades: DataFrame) -> DataFrame: """ Populates a dataframe with trades :param dataframe: Dataframe to populate :param trades: Trades to populate with :return: Dataframe with trades populated """ config_orderflow = config['orderflow'] timeframe = config['timeframe'] # create columns for trades _init_dataframe_with_trades_columns(dataframe) df = dataframe.copy() try: start_time = time.time() # calculate ohlcv candle start and end _calculate_ohlcv_candle_start_and_end(df, timeframe) _calculate_ohlcv_candle_start_and_end(trades, timeframe) # slice of trades that are before current ohlcv candles to make groupby faster trades = trades.loc[trades.candle_start >= df.candle_start[0]] trades.reset_index(inplace=True, drop=True) # group trades by candle start trades_grouped_by_candle_start = trades.groupby( 'candle_start', group_keys=False) for candle_start in trades_grouped_by_candle_start.groups: trades_grouped_df = trades[candle_start == trades['candle_start']] is_between = (candle_start == df['candle_start']) if np.any(is_between == True): # noqa: E712 (_, timeframe_minutes) = _convert_timeframe_to_pandas_frequency(timeframe) candle_next = candle_start + \ pd.Timedelta(minutes=timeframe_minutes) # skip if there are no trades at next candle # because that this candle isn't finished yet if candle_next not in trades_grouped_by_candle_start.groups: logger.warning( f"candle at {candle_start} with {len(trades_grouped_df)} trades " f"might be unfinished, because no finished trades at {candle_next}") # add trades to each candle df.loc[is_between, 'trades'] = df.loc[is_between, 'trades'].apply(lambda _: trades_grouped_df) # calculate orderflow for each candle df.loc[is_between, 'orderflow'] = df.loc[is_between, 'orderflow'].apply( lambda _: trades_to_volumeprofile_with_total_delta_bid_ask( pd.DataFrame(trades_grouped_df), scale=config_orderflow['scale'])) # calculate imbalances for each candle's orderflow df.loc[is_between, 'imbalances'] = df.loc[is_between, 'orderflow'].apply( lambda x: trades_orderflow_to_imbalances( x, imbalance_ratio=config_orderflow['imbalance_ratio'], imbalance_volume=config_orderflow['imbalance_volume'])) _stacked_imb = config_orderflow['stacked_imbalance_range'] df.loc[is_between, 'stacked_imbalances_bid'] = df.loc[ is_between, 'imbalances'].apply(lambda x: stacked_imbalance_bid( x, stacked_imbalance_range=_stacked_imb)) df.loc[is_between, 'stacked_imbalances_ask'] = df.loc[ is_between, 'imbalances'].apply( lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb)) buy = df.loc[is_between, 'bid'].apply(lambda _: np.where( trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount'])) sell = df.loc[is_between, 'ask'].apply(lambda _: np.where( trades_grouped_df['side'].str.contains('sell'), 0, trades_grouped_df['amount'])) deltas_per_trade = sell - buy min_delta = 0 max_delta = 0 delta = 0 for deltas in deltas_per_trade: for d in deltas: delta += d if delta > max_delta: max_delta = delta if delta < min_delta: min_delta = delta df.loc[is_between, 'max_delta'] = max_delta df.loc[is_between, 'min_delta'] = min_delta df.loc[is_between, 'bid'] = np.where(trades_grouped_df['side'].str.contains( 'buy'), 0, trades_grouped_df['amount']).sum() df.loc[is_between, 'ask'] = np.where(trades_grouped_df['side'].str.contains( 'sell'), 0, trades_grouped_df['amount']).sum() df.loc[is_between, 'delta'] = df.loc[is_between, 'ask'] - df.loc[is_between, 'bid'] min_delta = np.min(deltas_per_trade) max_delta = np.max(deltas_per_trade) df.loc[is_between, 'total_trades'] = len(trades_grouped_df) # copy to avoid memory leaks dataframe.loc[is_between] = df.loc[is_between].copy() else: logger.debug( f"Found NO candles for trades starting with {candle_start}") logger.debug( f"trades.groups_keys in {time.time() - start_time} seconds") logger.debug( f"trades.singleton_iterate in {time.time() - start_time} seconds") except Exception as e: logger.exception("Error populating dataframe with trades:", e) return dataframe def trades_to_volumeprofile_with_total_delta_bid_ask(trades: DataFrame, scale: float): """ :param trades: dataframe :param scale: scale aka bin size e.g. 0.5 :return: trades binned to levels according to scale aka orderflow """ df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS) # create bid, ask where side is sell or buy df['bid_amount'] = np.where( trades['side'].str.contains('buy'), 0, trades['amount']) df['ask_amount'] = np.where( trades['side'].str.contains('sell'), 0, trades['amount']) df['bid'] = np.where(trades['side'].str.contains('buy'), 0, 1) df['ask'] = np.where(trades['side'].str.contains('sell'), 0, 1) # round the prices to the nearest multiple of the scale df['price'] = ((trades['price'] / scale).round() * scale).astype('float64').values if df.empty: df['total'] = np.nan df['delta'] = np.nan return df df['delta'] = df['ask_amount'] - df['bid_amount'] df['total_volume'] = df['ask_amount'] + df['bid_amount'] df['total_trades'] = df['ask'] + df['bid'] # group to bins aka apply scale df = df.groupby('price').sum(numeric_only=True) return df def trades_orderflow_to_imbalances(df: DataFrame, imbalance_ratio: int, imbalance_volume: int): """ :param df: dataframes with bid and ask :param imbalance_ratio: imbalance_ratio e.g. 300 :param imbalance_volume: imbalance volume e.g. 3) :return: dataframe with bid and ask imbalance """ bid = df.bid ask = df.ask.shift(-1) bid_imbalance = (bid / ask) > (imbalance_ratio / 100) # overwrite bid_imbalance with False if volume is not big enough bid_imbalance_filtered = np.where( df.total_volume < imbalance_volume, False, bid_imbalance) ask_imbalance = (ask / bid) > (imbalance_ratio / 100) # overwrite ask_imbalance with False if volume is not big enough ask_imbalance_filtered = np.where( df.total_volume < imbalance_volume, False, ask_imbalance) dataframe = DataFrame({ "bid_imbalance": bid_imbalance_filtered, "ask_imbalance": ask_imbalance_filtered }, index=df.index, ) return dataframe def stacked_imbalance(df: DataFrame, label: str, stacked_imbalance_range: int, should_reverse: bool): """ y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1) https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array """ imbalance = df[f'{label}_imbalance'] int_series = pd.Series(np.where(imbalance, 1, 0)) stacked = ( int_series * ( int_series.groupby( (int_series != int_series.shift()).cumsum()).cumcount() + 1 ) ) max_stacked_imbalance_idx = stacked.index[stacked >= stacked_imbalance_range] stacked_imbalance_price = np.nan if not max_stacked_imbalance_idx.empty: idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud( max_stacked_imbalance_idx)[0] stacked_imbalance_price = imbalance.index[idx] return stacked_imbalance_price def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int): return stacked_imbalance(df, 'bid', stacked_imbalance_range, should_reverse=False) def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int): return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True) def orderflow_to_volume_profile(df: DataFrame): """ :param orderflow: dataframe :return: volume profile dataframe """ bid = df.groupby('level').bid.sum() ask = df.groupby('level').ask.sum() df.groupby('level')['level'].sum() delta = df.groupby('level').ask.sum() - df.groupby('level').bid.sum() df = pd.DataFrame({'bid': bid, 'ask': ask, 'delta': delta}) return df