ruff format: orderflow / public trades

This commit is contained in:
Joe Schr 2024-05-15 17:09:32 +02:00
parent 6c696e14f0
commit a56faf503b
13 changed files with 420 additions and 314 deletions

View File

@ -423,8 +423,8 @@ def _validate_consumers(conf: Dict[str, Any]) -> None:
def _validate_orderflow(conf: Dict[str, Any]) -> None:
if conf.get('exchange', {}).get('use_public_trades'):
if 'orderflow' not in conf:
if conf.get("exchange", {}).get("use_public_trades"):
if "orderflow" not in conf:
raise ConfigurationError(
"Orderflow is a required configuration key when using public trades."
)

View File

@ -3,6 +3,7 @@
"""
bot constants
"""
from typing import Any, Dict, List, Literal, Optional, Tuple
from freqtrade.enums import CandleType, PriceType, RPCMessageType
@ -531,7 +532,7 @@ CONF_SCHEMA = {
"stacked_imbalance_range": {"type": "number"},
"imbalance_volume": {"type": "number"},
"imbalance_ratio": {"type": "number"},
}
},
},
},
"definitions": {

View File

@ -21,20 +21,20 @@ from freqtrade.data.converter.trade_converter import (
__all__ = [
'clean_ohlcv_dataframe',
'convert_ohlcv_format',
'ohlcv_fill_up_missing_data',
'ohlcv_to_dataframe',
'order_book_to_dataframe',
'reduce_dataframe_footprint',
'trim_dataframe',
'trim_dataframes',
'convert_trades_format',
'convert_trades_to_ohlcv',
'populate_dataframe_with_trades',
'trades_convert_types',
'trades_df_remove_duplicates',
'trades_dict_to_list',
'trades_list_to_df',
'trades_to_ohlcv',
"clean_ohlcv_dataframe",
"convert_ohlcv_format",
"ohlcv_fill_up_missing_data",
"ohlcv_to_dataframe",
"order_book_to_dataframe",
"reduce_dataframe_footprint",
"trim_dataframe",
"trim_dataframes",
"convert_trades_format",
"convert_trades_to_ohlcv",
"populate_dataframe_with_trades",
"trades_convert_types",
"trades_df_remove_duplicates",
"trades_dict_to_list",
"trades_list_to_df",
"trades_to_ohlcv",
]

View File

@ -1,6 +1,7 @@
"""
Functions to convert orderflow data from public_trades
"""
import logging
import time
@ -18,23 +19,24 @@ def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame):
Populates a dataframe with trades columns
:param dataframe: Dataframe to populate
"""
dataframe['trades'] = dataframe.apply(lambda _: [], axis=1)
dataframe['orderflow'] = dataframe.apply(lambda _: {}, axis=1)
dataframe['bid'] = np.nan
dataframe['ask'] = np.nan
dataframe['delta'] = np.nan
dataframe['min_delta'] = np.nan
dataframe['max_delta'] = np.nan
dataframe['total_trades'] = np.nan
dataframe['stacked_imbalances_bid'] = np.nan
dataframe['stacked_imbalances_ask'] = np.nan
dataframe["trades"] = dataframe.apply(lambda _: [], axis=1)
dataframe["orderflow"] = dataframe.apply(lambda _: {}, axis=1)
dataframe["bid"] = np.nan
dataframe["ask"] = np.nan
dataframe["delta"] = np.nan
dataframe["min_delta"] = np.nan
dataframe["max_delta"] = np.nan
dataframe["total_trades"] = np.nan
dataframe["stacked_imbalances_bid"] = np.nan
dataframe["stacked_imbalances_ask"] = np.nan
def _convert_timeframe_to_pandas_frequency(timeframe: str):
# convert timeframe to format usable by pandas
from freqtrade.exchange import timeframe_to_minutes
timeframe_minutes = timeframe_to_minutes(timeframe)
timeframe_frequency = f'{timeframe_minutes}min'
timeframe_frequency = f"{timeframe_minutes}min"
return (timeframe_frequency, timeframe_minutes)
@ -44,26 +46,26 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
timeframe_frequency = timeframe_to_resample_freq(timeframe)
# calculate ohlcv candle start and end
if df is not None and not df.empty:
df['datetime'] = pd.to_datetime(df['date'], unit='ms')
df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency)
df["datetime"] = pd.to_datetime(df["date"], unit="ms")
df["candle_start"] = df["datetime"].dt.floor(timeframe_frequency)
# used in _now_is_time_to_refresh_trades
df['candle_end'] = df['candle_start'].apply(
df["candle_end"] = df["candle_start"].apply(
lambda candle_start: timeframe_to_next_date(timeframe, candle_start)
)
df.drop(columns=["datetime"], inplace=True)
def populate_dataframe_with_trades(config: Config,
dataframe: pd.DataFrame,
trades: pd.DataFrame) -> pd.DataFrame:
def populate_dataframe_with_trades(
config: Config, dataframe: pd.DataFrame, trades: pd.DataFrame
) -> pd.DataFrame:
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
:param trades: Trades to populate with
:return: Dataframe with trades populated
"""
config_orderflow = config['orderflow']
timeframe = config['timeframe']
config_orderflow = config["orderflow"]
timeframe = config["timeframe"]
# create columns for trades
_init_dataframe_with_trades_columns(dataframe)
@ -79,8 +81,7 @@ def populate_dataframe_with_trades(config: Config,
trades.reset_index(inplace=True, drop=True)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby(
'candle_start', group_keys=False)
trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False)
for candle_start in trades_grouped_by_candle_start.groups:
trades_grouped_df = trades[candle_start == trades["candle_start"]]
@ -94,36 +95,50 @@ def populate_dataframe_with_trades(config: Config,
if candle_next not in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades "
f"might be unfinished, because no finished trades at {candle_next}")
f"might be unfinished, because no finished trades at {candle_next}"
)
# add trades to each candle
df.loc[is_between, 'trades'] = df.loc[is_between,
'trades'].apply(lambda _: trades_grouped_df)
df.loc[is_between, "trades"] = df.loc[is_between, "trades"].apply(
lambda _: trades_grouped_df
)
# calculate orderflow for each candle
df.loc[is_between, 'orderflow'] = df.loc[is_between, 'orderflow'].apply(
df.loc[is_between, "orderflow"] = df.loc[is_between, "orderflow"].apply(
lambda _: trades_to_volumeprofile_with_total_delta_bid_ask(
pd.DataFrame(trades_grouped_df),
scale=config_orderflow['scale']))
pd.DataFrame(trades_grouped_df), scale=config_orderflow["scale"]
)
)
# calculate imbalances for each candle's orderflow
df.loc[is_between, 'imbalances'] = df.loc[is_between, 'orderflow'].apply(
df.loc[is_between, "imbalances"] = df.loc[is_between, "orderflow"].apply(
lambda x: trades_orderflow_to_imbalances(
x,
imbalance_ratio=config_orderflow['imbalance_ratio'],
imbalance_volume=config_orderflow['imbalance_volume']))
imbalance_ratio=config_orderflow["imbalance_ratio"],
imbalance_volume=config_orderflow["imbalance_volume"],
)
)
_stacked_imb = config_orderflow['stacked_imbalance_range']
df.loc[is_between, 'stacked_imbalances_bid'] = df.loc[
is_between, 'imbalances'].apply(lambda x: stacked_imbalance_bid(
x,
stacked_imbalance_range=_stacked_imb))
df.loc[is_between, 'stacked_imbalances_ask'] = df.loc[
is_between, 'imbalances'].apply(
lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb))
_stacked_imb = config_orderflow["stacked_imbalance_range"]
df.loc[is_between, "stacked_imbalances_bid"] = df.loc[
is_between, "imbalances"
].apply(lambda x: stacked_imbalance_bid(x, stacked_imbalance_range=_stacked_imb))
df.loc[is_between, "stacked_imbalances_ask"] = df.loc[
is_between, "imbalances"
].apply(lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb))
buy = df.loc[is_between, 'bid'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount']))
sell = df.loc[is_between, 'ask'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('sell'), 0, trades_grouped_df['amount']))
buy = df.loc[is_between, "bid"].apply(
lambda _: np.where(
trades_grouped_df["side"].str.contains("buy"),
0,
trades_grouped_df["amount"],
)
)
sell = df.loc[is_between, "ask"].apply(
lambda _: np.where(
trades_grouped_df["side"].str.contains("sell"),
0,
trades_grouped_df["amount"],
)
)
deltas_per_trade = sell - buy
min_delta = 0
max_delta = 0
@ -135,29 +150,27 @@ def populate_dataframe_with_trades(config: Config,
max_delta = delta
if delta < min_delta:
min_delta = delta
df.loc[is_between, 'max_delta'] = max_delta
df.loc[is_between, 'min_delta'] = min_delta
df.loc[is_between, "max_delta"] = max_delta
df.loc[is_between, "min_delta"] = min_delta
df.loc[is_between, 'bid'] = np.where(trades_grouped_df['side'].str.contains(
'buy'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'ask'] = np.where(trades_grouped_df['side'].str.contains(
'sell'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'delta'] = df.loc[is_between,
'ask'] - df.loc[is_between, 'bid']
df.loc[is_between, "bid"] = np.where(
trades_grouped_df["side"].str.contains("buy"), 0, trades_grouped_df["amount"]
).sum()
df.loc[is_between, "ask"] = np.where(
trades_grouped_df["side"].str.contains("sell"), 0, trades_grouped_df["amount"]
).sum()
df.loc[is_between, "delta"] = df.loc[is_between, "ask"] - df.loc[is_between, "bid"]
min_delta = np.min(deltas_per_trade)
max_delta = np.max(deltas_per_trade)
df.loc[is_between, 'total_trades'] = len(trades_grouped_df)
df.loc[is_between, "total_trades"] = len(trades_grouped_df)
# copy to avoid memory leaks
dataframe.loc[is_between] = df.loc[is_between].copy()
else:
logger.debug(
f"Found NO candles for trades starting with {candle_start}")
logger.debug(
f"trades.groups_keys in {time.time() - start_time} seconds")
logger.debug(f"Found NO candles for trades starting with {candle_start}")
logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds")
logger.debug(
f"trades.singleton_iterate in {time.time() - start_time} seconds")
logger.debug(f"trades.singleton_iterate in {time.time() - start_time} seconds")
except Exception as e:
logger.exception("Error populating dataframe with trades:", e)
@ -174,27 +187,24 @@ def trades_to_volumeprofile_with_total_delta_bid_ask(trades: pd.DataFrame, scale
"""
df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS)
# create bid, ask where side is sell or buy
df['bid_amount'] = np.where(
trades['side'].str.contains('buy'), 0, trades['amount'])
df['ask_amount'] = np.where(
trades['side'].str.contains('sell'), 0, trades['amount'])
df['bid'] = np.where(trades['side'].str.contains('buy'), 0, 1)
df['ask'] = np.where(trades['side'].str.contains('sell'), 0, 1)
df["bid_amount"] = np.where(trades["side"].str.contains("buy"), 0, trades["amount"])
df["ask_amount"] = np.where(trades["side"].str.contains("sell"), 0, trades["amount"])
df["bid"] = np.where(trades["side"].str.contains("buy"), 0, 1)
df["ask"] = np.where(trades["side"].str.contains("sell"), 0, 1)
# round the prices to the nearest multiple of the scale
df['price'] = ((trades['price'] / scale).round()
* scale).astype('float64').values
df["price"] = ((trades["price"] / scale).round() * scale).astype("float64").values
if df.empty:
df['total'] = np.nan
df['delta'] = np.nan
df["total"] = np.nan
df["delta"] = np.nan
return df
df['delta'] = df['ask_amount'] - df['bid_amount']
df['total_volume'] = df['ask_amount'] + df['bid_amount']
df['total_trades'] = df['ask'] + df['bid']
df["delta"] = df["ask_amount"] - df["bid_amount"]
df["total_volume"] = df["ask_amount"] + df["bid_amount"]
df["total_trades"] = df["ask"] + df["bid"]
# group to bins aka apply scale
df = df.groupby('price').sum(numeric_only=True)
df = df.groupby("price").sum(numeric_only=True)
return df
@ -209,54 +219,49 @@ def trades_orderflow_to_imbalances(df: pd.DataFrame, imbalance_ratio: int, imbal
ask = df.ask.shift(-1)
bid_imbalance = (bid / ask) > (imbalance_ratio / 100)
# overwrite bid_imbalance with False if volume is not big enough
bid_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, bid_imbalance)
bid_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, bid_imbalance)
ask_imbalance = (ask / bid) > (imbalance_ratio / 100)
# overwrite ask_imbalance with False if volume is not big enough
ask_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, ask_imbalance)
dataframe = pd.DataFrame({
"bid_imbalance": bid_imbalance_filtered,
"ask_imbalance": ask_imbalance_filtered
}, index=df.index,
ask_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, ask_imbalance)
dataframe = pd.DataFrame(
{"bid_imbalance": bid_imbalance_filtered, "ask_imbalance": ask_imbalance_filtered},
index=df.index,
)
return dataframe
def stacked_imbalance(df: pd.DataFrame,
label: str,
stacked_imbalance_range: int,
should_reverse: bool):
def stacked_imbalance(
df: pd.DataFrame, label: str, stacked_imbalance_range: int, should_reverse: bool
):
"""
y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)
https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array
"""
imbalance = df[f'{label}_imbalance']
imbalance = df[f"{label}_imbalance"]
int_series = pd.Series(np.where(imbalance, 1, 0))
stacked = (
int_series * (
int_series.groupby(
(int_series != int_series.shift()).cumsum()).cumcount() + 1
)
stacked = int_series * (
int_series.groupby((int_series != int_series.shift()).cumsum()).cumcount() + 1
)
max_stacked_imbalance_idx = stacked.index[stacked >=
stacked_imbalance_range]
max_stacked_imbalance_idx = stacked.index[stacked >= stacked_imbalance_range]
stacked_imbalance_price = np.nan
if not max_stacked_imbalance_idx.empty:
idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud(
max_stacked_imbalance_idx)[0]
idx = (
max_stacked_imbalance_idx[0]
if not should_reverse
else np.flipud(max_stacked_imbalance_idx)[0]
)
stacked_imbalance_price = imbalance.index[idx]
return stacked_imbalance_price
def stacked_imbalance_bid(df: pd.DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, 'bid', stacked_imbalance_range, should_reverse=False)
return stacked_imbalance(df, "bid", stacked_imbalance_range, should_reverse=False)
def stacked_imbalance_ask(df: pd.DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True)
return stacked_imbalance(df, "ask", stacked_imbalance_range, should_reverse=True)
def orderflow_to_volume_profile(df: pd.DataFrame):
@ -264,9 +269,9 @@ def orderflow_to_volume_profile(df: pd.DataFrame):
:param orderflow: dataframe
:return: volume profile dataframe
"""
bid = df.groupby('level').bid.sum()
ask = df.groupby('level').ask.sum()
df.groupby('level')['level'].sum()
delta = df.groupby('level').ask.sum() - df.groupby('level').bid.sum()
df = pd.DataFrame({'bid': bid, 'ask': ask, 'delta': delta})
bid = df.groupby("level").bid.sum()
ask = df.groupby("level").ask.sum()
df.groupby("level")["level"].sum()
delta = df.groupby("level").ask.sum() - df.groupby("level").bid.sum()
df = pd.DataFrame({"bid": bid, "ask": ask, "delta": delta})
return df

View File

@ -455,7 +455,7 @@ class DataProvider:
Refresh latest trades data (if enabled in config)
"""
use_public_trades = self._config.get('exchange', {}).get('use_public_trades', False)
use_public_trades = self._config.get("exchange", {}).get("use_public_trades", False)
if use_public_trades:
if self._exchange:
self._exchange.refresh_latest_trades(pairlist)
@ -497,11 +497,7 @@ class DataProvider:
return DataFrame()
def trades(
self,
pair: str,
timeframe: Optional[str] = None,
copy: bool = True,
candle_type: str = ''
self, pair: str, timeframe: Optional[str] = None, copy: bool = True, candle_type: str = ""
) -> DataFrame:
"""
Get candle (TRADES) data for the given pair as DataFrame
@ -515,17 +511,23 @@ class DataProvider:
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
if self._exchange is None:
raise OperationalException(NO_EXCHANGE_EXCEPTION)
_candle_type = CandleType.from_string(
candle_type) if candle_type != '' else self._config['candle_type_def']
_candle_type = (
CandleType.from_string(candle_type)
if candle_type != ""
else self._config["candle_type_def"]
)
return self._exchange.trades(
(pair, timeframe or self._config['timeframe'], _candle_type),
copy=copy
(pair, timeframe or self._config["timeframe"], _candle_type), copy=copy
)
elif self.runmode in (RunMode.BACKTEST, RunMode.HYPEROPT):
_candle_type = CandleType.from_string(
candle_type) if candle_type != '' else self._config['candle_type_def']
_candle_type = (
CandleType.from_string(candle_type)
if candle_type != ""
else self._config["candle_type_def"]
)
data_handler = get_datahandler(
self._config['datadir'], data_format=self._config['dataformat_trades'])
self._config["datadir"], data_format=self._config["dataformat_trades"]
)
trades_df = data_handler.trades_load(pair, TradingMode.FUTURES)
return trades_df

View File

@ -480,9 +480,7 @@ def _download_trades_history(
return True
except Exception:
logger.exception(
f'Failed to download and store historic trades for pair: "{pair}". '
)
logger.exception(f'Failed to download and store historic trades for pair: "{pair}". ')
return False

View File

@ -2541,19 +2541,20 @@ class Exchange:
# fetch Trade data stuff
def needed_candle_ms(self, timeframe: str, candle_type: CandleType):
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type)
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(timeframe, candle_type)
move_to = one_call * self.required_candle_call_count
now = timeframe_to_next_date(timeframe)
return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000)
def _process_trades_df(self,
pair: str,
timeframe: str,
c_type: CandleType,
ticks: List[List],
cache: bool,
first_required_candle_date: Optional[int]) -> DataFrame:
def _process_trades_df(
self,
pair: str,
timeframe: str,
c_type: CandleType,
ticks: List[List],
cache: bool,
first_required_candle_date: Optional[int],
) -> DataFrame:
# keeping parsed dataframe in cache
trades_df = trades_list_to_df(ticks, True)
@ -2563,22 +2564,23 @@ class Exchange:
# Reassign so we return the updated, combined df
combined_df = concat([old, trades_df], axis=0)
logger.debug(f"Clean duplicated ticks from Trades data {pair}")
trades_df = DataFrame(trades_df_remove_duplicates(combined_df),
columns=combined_df.columns)
trades_df = DataFrame(
trades_df_remove_duplicates(combined_df), columns=combined_df.columns
)
# Age out old candles
if first_required_candle_date:
# slice of older dates
trades_df = trades_df[
first_required_candle_date < trades_df['timestamp']]
trades_df = trades_df[first_required_candle_date < trades_df["timestamp"]]
trades_df = trades_df.reset_index(drop=True)
self._trades[(pair, timeframe, c_type)] = trades_df
return trades_df
def refresh_latest_trades(self,
pair_list: ListPairsWithTimeframes,
*,
cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]:
def refresh_latest_trades(
self,
pair_list: ListPairsWithTimeframes,
*,
cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]:
"""
Refresh in-memory TRADES asynchronously and set `_trades` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
@ -2588,24 +2590,27 @@ class Exchange:
:return: Dict of [{(pair, timeframe): Dataframe}]
"""
from freqtrade.data.history import get_datahandler
data_handler = get_datahandler(
self._config['datadir'], data_format=self._config['dataformat_trades']
self._config["datadir"], data_format=self._config["dataformat_trades"]
)
logger.debug("Refreshing TRADES data for %d pairs", len(pair_list))
since_ms = None
results_df = {}
for pair, timeframe, candle_type in set(pair_list):
new_ticks: List = []
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ['date'])
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"])
first_candle_ms = self.needed_candle_ms(timeframe, candle_type)
# refresh, if
# a. not in _trades
# b. no cache used
# c. need new data
is_in_cache = (pair, timeframe, candle_type) in self._trades
if (not is_in_cache
or not cache
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)):
if (
not is_in_cache
or not cache
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)
):
logger.debug(f"Refreshing TRADES data for {pair}")
# fetch trades since latest _trades and
# store together with existing trades
@ -2613,26 +2618,29 @@ class Exchange:
until = None
from_id = None
if is_in_cache:
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]['id']
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"]
until = dt_ts() # now
else:
until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000
all_stored_ticks_df = data_handler.trades_load(
f"{pair}-cached", self.trading_mode)
f"{pair}-cached", self.trading_mode
)
if not all_stored_ticks_df.empty:
if all_stored_ticks_df.iloc[0]['timestamp'] <= first_candle_ms:
last_cached_ms = all_stored_ticks_df.iloc[-1]['timestamp']
if all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms:
last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"]
# only use cached if it's closer than first_candle_ms
since_ms = (
last_cached_ms if last_cached_ms > first_candle_ms
last_cached_ms
if last_cached_ms > first_candle_ms
else first_candle_ms
)
# doesn't go far enough
else:
all_stored_ticks_df = DataFrame(
columns=DEFAULT_TRADES_COLUMNS + ['date'])
columns=DEFAULT_TRADES_COLUMNS + ["date"]
)
# from_id overrules with exchange set to id paginate
# TODO: DEBUG:
@ -2643,7 +2651,7 @@ class Exchange:
pair,
since=since_ms if since_ms else first_candle_ms,
until=until,
from_id=from_id
from_id=from_id,
)
except Exception as e:
@ -2652,8 +2660,9 @@ class Exchange:
raise e
if new_ticks:
all_stored_ticks_list = all_stored_ticks_df[DEFAULT_TRADES_COLUMNS
].values.tolist()
all_stored_ticks_list = all_stored_ticks_df[
DEFAULT_TRADES_COLUMNS
].values.tolist()
all_stored_ticks_list.extend(new_ticks)
trades_df = self._process_trades_df(
pair,
@ -2661,11 +2670,12 @@ class Exchange:
candle_type,
all_stored_ticks_list,
cache,
first_required_candle_date=first_candle_ms
first_required_candle_date=first_candle_ms,
)
results_df[(pair, timeframe, candle_type)] = trades_df
data_handler.trades_store(
f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode)
f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode
)
else:
raise OperationalException("no new ticks")
@ -2677,8 +2687,10 @@ class Exchange:
) -> bool: # Timeframe in seconds
trades = self.trades((pair, timeframe, candle_type), False)
pair_last_refreshed = int(trades.iloc[-1]["timestamp"])
full_candle = int(timeframe_to_next_date(
timeframe, dt_from_ts(pair_last_refreshed)).timestamp()) * 1000
full_candle = (
int(timeframe_to_next_date(timeframe, dt_from_ts(pair_last_refreshed)).timestamp())
* 1000
)
now = dt_ts()
return full_candle <= now

View File

@ -1594,17 +1594,14 @@ class IStrategy(ABC, HyperStrategyMixin):
return dataframe
def _if_enabled_populate_trades(self, dataframe: DataFrame, metadata: dict):
use_public_trades = self.config.get('exchange', {}).get('use_public_trades', False)
use_public_trades = self.config.get("exchange", {}).get("use_public_trades", False)
if use_public_trades:
trades = self.dp.trades(pair=metadata['pair'], copy=False)
trades = self.dp.trades(pair=metadata["pair"], copy=False)
config = self.config
config['timeframe'] = self.timeframe
config["timeframe"] = self.timeframe
# TODO: slice trades to size of dataframe for faster backtesting
dataframe = populate_dataframe_with_trades(
config,
dataframe,
trades)
dataframe = populate_dataframe_with_trades(config, dataframe, trades)
logger.debug("Populated dataframe with trades.")

View File

@ -11,49 +11,68 @@ from freqtrade.data.converter.trade_converter import trades_list_to_df
BIN_SIZE_SCALE = 0.5
def read_csv(filename, converter_columns: list = ['side', 'type']):
return pd.read_csv(filename, skipinitialspace=True, infer_datetime_format=True, index_col=0,
parse_dates=True, converters={col: str.strip for col in converter_columns})
def read_csv(filename, converter_columns: list = ["side", "type"]):
return pd.read_csv(
filename,
skipinitialspace=True,
infer_datetime_format=True,
index_col=0,
parse_dates=True,
converters={col: str.strip for col in converter_columns},
)
@pytest.fixture
def populate_dataframe_with_trades_dataframe(testdatadir):
return pd.read_feather(testdatadir / 'orderflow/populate_dataframe_with_trades_DF.feather')
return pd.read_feather(testdatadir / "orderflow/populate_dataframe_with_trades_DF.feather")
@pytest.fixture
def populate_dataframe_with_trades_trades(testdatadir):
return pd.read_feather(testdatadir / 'orderflow/populate_dataframe_with_trades_TRADES.feather')
return pd.read_feather(testdatadir / "orderflow/populate_dataframe_with_trades_TRADES.feather")
@pytest.fixture
def candles(testdatadir):
return pd.read_json(testdatadir / 'orderflow/candles.json').copy()
return pd.read_json(testdatadir / "orderflow/candles.json").copy()
@pytest.fixture
def public_trades_list(testdatadir):
return read_csv(testdatadir / 'orderflow/public_trades_list.csv').copy()
return read_csv(testdatadir / "orderflow/public_trades_list.csv").copy()
@pytest.fixture
def public_trades_list_simple(testdatadir):
return read_csv(testdatadir / 'orderflow/public_trades_list_simple_example.csv').copy()
return read_csv(testdatadir / "orderflow/public_trades_list_simple_example.csv").copy()
def test_public_trades_columns_before_change(
populate_dataframe_with_trades_dataframe,
populate_dataframe_with_trades_trades):
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
assert populate_dataframe_with_trades_dataframe.columns.tolist() == [
'date', 'open', 'high', 'low', 'close', 'volume']
"date",
"open",
"high",
"low",
"close",
"volume",
]
assert populate_dataframe_with_trades_trades.columns.tolist() == [
'timestamp', 'id', 'type', 'side', 'price',
'amount', 'cost', 'date']
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]
def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
populate_dataframe_with_trades_dataframe,
populate_dataframe_with_trades_trades):
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
"""
Tests the `populate_dataframe_with_trades` function's order flow calculation.
@ -64,24 +83,25 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
dataframe = populate_dataframe_with_trades_dataframe.copy()
trades = populate_dataframe_with_trades_trades.copy()
# Convert the 'date' column to datetime format with milliseconds
dataframe['date'] = pd.to_datetime(
dataframe['date'], unit='ms')
dataframe["date"] = pd.to_datetime(dataframe["date"], unit="ms")
# Select the last rows and reset the index (optional, depends on usage)
dataframe = dataframe.copy().tail().reset_index(drop=True)
# Define the configuration for order flow calculation
config = {'timeframe': '5m',
'orderflow': {
'scale': 0.005,
'imbalance_volume': 0,
'imbalance_ratio': 300,
'stacked_imbalance_range': 3
}}
config = {
"timeframe": "5m",
"orderflow": {
"scale": 0.005,
"imbalance_volume": 0,
"imbalance_ratio": 300,
"stacked_imbalance_range": 3,
},
}
# Apply the function to populate the data frame with order flow data
df = populate_dataframe_with_trades(config, dataframe, trades)
# Extract results from the first row of the DataFrame
results = df.iloc[0]
t = results['trades']
of = results['orderflow']
t = results["trades"]
of = results["orderflow"]
# Assert basic properties of the results
assert 0 != len(results)
@ -98,7 +118,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
assert [0.0, 1.0, 0.103, 0.0, 0.103, 0.103, 1.0] == of.iloc[-1].values.tolist()
# Extract order flow from the last row of the DataFrame
of = df.iloc[-1]['orderflow']
of = df.iloc[-1]["orderflow"]
# Assert number of order flow data points in the last row
assert 19 == len(of) # Assert expected number of data points
@ -107,42 +127,49 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
assert [1.0, 0.0, -12.536, 12.536, 0.0, 12.536, 1.0] == of.iloc[0].values.tolist()
# Assert specific order flow values at the end of the last row
assert [4.0, 3.0, -40.94800000000001, 59.18200000000001,
18.233999999999998, 77.41600000000001, 7.0] == of.iloc[-1].values.tolist()
assert [
4.0,
3.0,
-40.94800000000001,
59.18200000000001,
18.233999999999998,
77.41600000000001,
7.0,
] == of.iloc[-1].values.tolist()
# --- Delta and Other Results ---
# Assert delta value from the first row
assert -50.519000000000005 == results['delta']
assert -50.519000000000005 == results["delta"]
# Assert min and max delta values from the first row
assert -79.469 == results['min_delta']
assert 17.298 == results['max_delta']
assert -79.469 == results["min_delta"]
assert 17.298 == results["max_delta"]
# Assert that stacked imbalances are NaN (not applicable in this test)
assert np.isnan(results['stacked_imbalances_bid'])
assert np.isnan(results['stacked_imbalances_ask'])
assert np.isnan(results["stacked_imbalances_bid"])
assert np.isnan(results["stacked_imbalances_ask"])
# Repeat assertions for the third from last row
results = df.iloc[-2]
assert -20.86200000000008 == results['delta']
assert -54.55999999999999 == results['min_delta']
assert 82.842 == results['max_delta']
assert 234.99 == results['stacked_imbalances_bid']
assert 234.96 == results['stacked_imbalances_ask']
assert -20.86200000000008 == results["delta"]
assert -54.55999999999999 == results["min_delta"]
assert 82.842 == results["max_delta"]
assert 234.99 == results["stacked_imbalances_bid"]
assert 234.96 == results["stacked_imbalances_ask"]
# Repeat assertions for the last row
results = df.iloc[-1]
assert -49.30200000000002 == results['delta']
assert -70.222 == results['min_delta']
assert 11.213000000000003 == results['max_delta']
assert np.isnan(results['stacked_imbalances_bid'])
assert np.isnan(results['stacked_imbalances_ask'])
assert -49.30200000000002 == results["delta"]
assert -70.222 == results["min_delta"]
assert 11.213000000000003 == results["max_delta"]
assert np.isnan(results["stacked_imbalances_bid"])
assert np.isnan(results["stacked_imbalances_ask"])
def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
populate_dataframe_with_trades_dataframe,
populate_dataframe_with_trades_trades):
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
"""
Tests the `populate_dataframe_with_trades` function's handling of trades,
ensuring correct integration of trades data into the generated DataFrame.
@ -155,7 +182,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
# --- Data Preparation ---
# Convert the 'date' column to datetime format with milliseconds
dataframe['date'] = pd.to_datetime(dataframe['date'], unit='ms')
dataframe["date"] = pd.to_datetime(dataframe["date"], unit="ms")
# Select the final row of the DataFrame
dataframe = dataframe.tail().reset_index(drop=True)
@ -165,19 +192,19 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
trades.reset_index(inplace=True, drop=True) # Reset index for clarity
# Assert the first trade ID to ensure filtering worked correctly
assert trades['id'][0] == '313881442'
assert trades["id"][0] == "313881442"
# --- Configuration and Function Call ---
# Define configuration for order flow calculation (used for context)
config = {
'timeframe': '5m',
'orderflow': {
'scale': 0.5,
'imbalance_volume': 0,
'imbalance_ratio': 300,
'stacked_imbalance_range': 3
}
"timeframe": "5m",
"orderflow": {
"scale": 0.5,
"imbalance_volume": 0,
"imbalance_ratio": 300,
"stacked_imbalance_range": 3,
},
}
# Populate the DataFrame with trades and order flow data
@ -190,28 +217,38 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
# Assert DataFrame structure
assert list(df.columns) == [
# ... (list of expected column names)
'date', 'open', 'high', 'low',
'close', 'volume', 'trades', 'orderflow',
'bid', 'ask', 'delta', 'min_delta',
'max_delta', 'total_trades',
'stacked_imbalances_bid',
'stacked_imbalances_ask'
"date",
"open",
"high",
"low",
"close",
"volume",
"trades",
"orderflow",
"bid",
"ask",
"delta",
"min_delta",
"max_delta",
"total_trades",
"stacked_imbalances_bid",
"stacked_imbalances_ask",
]
# Assert delta, bid, and ask values
assert -50.519 == pytest.approx(row['delta'])
assert 219.961 == row['bid']
assert 169.442 == row['ask']
assert -50.519 == pytest.approx(row["delta"])
assert 219.961 == row["bid"]
assert 169.442 == row["ask"]
# Assert the number of trades
assert 151 == len(row.trades)
# Assert specific details of the first trade
t = row['trades'].iloc[0]
assert trades['id'][0] == t["id"]
assert int(trades['timestamp'][0]) == int(t['timestamp'])
assert 'sell' == t['side']
assert '313881442' == t['id']
assert 234.72 == t['price']
t = row["trades"].iloc[0]
assert trades["id"][0] == t["id"]
assert int(trades["timestamp"][0]) == int(t["timestamp"])
assert "sell" == t["side"]
assert "313881442" == t["id"]
assert 234.72 == t["price"]
def test_public_trades_put_volume_profile_into_ohlcv_candles(public_trades_list_simple, candles):
@ -232,18 +269,19 @@ def test_public_trades_put_volume_profile_into_ohlcv_candles(public_trades_list_
df = trades_to_volumeprofile_with_total_delta_bid_ask(df, scale=BIN_SIZE_SCALE)
# Initialize the 'vp' column in the candles DataFrame with NaNs
candles['vp'] = np.nan
candles["vp"] = np.nan
# Select the second candle (index 1) and attempt to assign the volume profile data
# (as a DataFrame) to the 'vp' element.
candles.loc[candles.index == 1, ['vp']] = candles.loc[candles.index == 1, [
'vp']].applymap(lambda x: pd.DataFrame(df.to_dict()))
candles.loc[candles.index == 1, ["vp"]] = candles.loc[candles.index == 1, ["vp"]].applymap(
lambda x: pd.DataFrame(df.to_dict())
)
# Assert the delta value in the 'vp' element of the second candle
assert 0.14 == candles['vp'][1].values.tolist()[1][2]
assert 0.14 == candles["vp"][1].values.tolist()[1][2]
# Alternative assertion using `.iat` accessor (assuming correct assignment logic)
assert 0.14 == candles['vp'][1]['delta'].iat[1]
assert 0.14 == candles["vp"][1]["delta"].iat[1]
def test_public_trades_binned_big_sample_list(public_trades_list):
@ -262,8 +300,15 @@ def test_public_trades_binned_big_sample_list(public_trades_list):
df = trades_to_volumeprofile_with_total_delta_bid_ask(trades, scale=BIN_SIZE_SCALE)
# Assert that the DataFrame has the expected columns
assert df.columns.tolist() == ['bid', 'ask', 'delta', 'bid_amount',
'ask_amount', 'total_volume', 'total_trades']
assert df.columns.tolist() == [
"bid",
"ask",
"delta",
"bid_amount",
"ask_amount",
"total_volume",
"total_trades",
]
# Assert the number of rows in the DataFrame (expected 23 for this bin size)
assert len(df) == 23
@ -271,22 +316,22 @@ def test_public_trades_binned_big_sample_list(public_trades_list):
# Assert that the index values are in ascending order and spaced correctly
assert all(df.index[i] < df.index[i + 1] for i in range(len(df) - 1))
assert df.index[0] + BIN_SIZE_SCALE == df.index[1]
assert (trades['price'].min() - BIN_SIZE_SCALE) < df.index[0] < trades['price'].max()
assert (trades["price"].min() - BIN_SIZE_SCALE) < df.index[0] < trades["price"].max()
assert (df.index[0] + BIN_SIZE_SCALE) >= df.index[1]
assert (trades['price'].max() - BIN_SIZE_SCALE) < df.index[-1] < trades['price'].max()
assert (trades["price"].max() - BIN_SIZE_SCALE) < df.index[-1] < trades["price"].max()
# Assert specific values in the first and last rows of the DataFrame
assert 32 == df['bid'].iloc[0] # bid price
assert 197.512 == df['bid_amount'].iloc[0] # total bid amount
assert 88.98 == df['ask_amount'].iloc[0] # total ask amount
assert 26 == df['ask'].iloc[0] # ask price
assert -108.532 == pytest.approx(df['delta'].iloc[0]) # delta (bid amount - ask amount)
assert 32 == df["bid"].iloc[0] # bid price
assert 197.512 == df["bid_amount"].iloc[0] # total bid amount
assert 88.98 == df["ask_amount"].iloc[0] # total ask amount
assert 26 == df["ask"].iloc[0] # ask price
assert -108.532 == pytest.approx(df["delta"].iloc[0]) # delta (bid amount - ask amount)
assert 3 == df['bid'].iloc[-1] # bid price
assert 50.659 == df['bid_amount'].iloc[-1] # total bid amount
assert 108.21 == df['ask_amount'].iloc[-1] # total ask amount
assert 44 == df['ask'].iloc[-1] # ask price
assert 57.551 == df['delta'].iloc[-1] # delta (bid amount - ask amount)
assert 3 == df["bid"].iloc[-1] # bid price
assert 50.659 == df["bid_amount"].iloc[-1] # total bid amount
assert 108.21 == df["ask_amount"].iloc[-1] # total ask amount
assert 44 == df["ask"].iloc[-1] # ask price
assert 57.551 == df["delta"].iloc[-1] # delta (bid amount - ask amount)
# Repeat the process with a larger bin size
BIN_SIZE_SCALE = 1
@ -299,43 +344,89 @@ def test_public_trades_binned_big_sample_list(public_trades_list):
# Repeat similar assertions for index ordering and spacing
assert all(df.index[i] < df.index[i + 1] for i in range(len(df) - 1))
assert (trades['price'].min() - BIN_SIZE_SCALE) < df.index[0] < trades['price'].max()
assert (trades["price"].min() - BIN_SIZE_SCALE) < df.index[0] < trades["price"].max()
assert (df.index[0] + BIN_SIZE_SCALE) >= df.index[1]
assert (trades['price'].max() - BIN_SIZE_SCALE) < df.index[-1] < trades['price'].max()
assert (trades["price"].max() - BIN_SIZE_SCALE) < df.index[-1] < trades["price"].max()
# Assert the value in the last row of the DataFrame with the larger bin size
assert 1667.0 == df.index[-1]
assert 710.98 == df['bid_amount'].iat[0]
assert 111 == df['bid'].iat[0]
assert 52.7199999 == pytest.approx(df['delta'].iat[0]) # delta
assert 710.98 == df["bid_amount"].iat[0]
assert 111 == df["bid"].iat[0]
assert 52.7199999 == pytest.approx(df["delta"].iat[0]) # delta
def test_public_trades_testdata_sanity(
candles,
public_trades_list,
public_trades_list_simple,
populate_dataframe_with_trades_dataframe,
populate_dataframe_with_trades_trades):
candles,
public_trades_list,
public_trades_list_simple,
populate_dataframe_with_trades_dataframe,
populate_dataframe_with_trades_trades,
):
assert 10999 == len(candles)
assert 1000 == len(public_trades_list)
assert 999 == len(populate_dataframe_with_trades_dataframe)
assert 293532 == len(populate_dataframe_with_trades_trades)
assert 7 == len(public_trades_list_simple)
assert 5 == public_trades_list_simple.loc[
public_trades_list_simple['side'].str.contains('sell'), 'id'].count()
assert 2 == public_trades_list_simple.loc[
public_trades_list_simple['side'].str.contains('buy'), 'id'].count()
assert (
5
== public_trades_list_simple.loc[
public_trades_list_simple["side"].str.contains("sell"), "id"
].count()
)
assert (
2
== public_trades_list_simple.loc[
public_trades_list_simple["side"].str.contains("buy"), "id"
].count()
)
assert public_trades_list.columns.tolist() == [
'timestamp', 'id', 'type', 'side', 'price',
'amount', 'cost', 'date']
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]
assert public_trades_list.columns.tolist() == [
'timestamp', 'id', 'type', 'side', 'price', 'amount', 'cost', 'date']
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]
assert public_trades_list_simple.columns.tolist() == [
'timestamp', 'id', 'type', 'side', 'price', 'amount', 'cost', 'date']
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]
assert populate_dataframe_with_trades_dataframe.columns.tolist() == [
'date', 'open', 'high', 'low', 'close', 'volume']
"date",
"open",
"high",
"low",
"close",
"volume",
]
assert populate_dataframe_with_trades_trades.columns.tolist() == [
'timestamp', 'id', 'type', 'side', 'price', 'amount', 'cost', 'date']
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]

View File

@ -66,7 +66,7 @@ def test_historic_trades(mocker, default_conf, trades_history_df):
historymock = MagicMock(return_value=trades_history_df)
mocker.patch(
"freqtrade.data.history.datahandlers.featherdatahandler.FeatherDataHandler._trades_load",
historymock
historymock,
)
dp = DataProvider(default_conf, None)
@ -82,8 +82,8 @@ def test_historic_trades(mocker, default_conf, trades_history_df):
assert len(data) == 0
# Switch to backtest mode
default_conf['runmode'] = RunMode.BACKTEST
default_conf['dataformat_trades'] = 'feather'
default_conf["runmode"] = RunMode.BACKTEST
default_conf["dataformat_trades"] = "feather"
exchange = get_patched_exchange(mocker, default_conf)
dp = DataProvider(default_conf, exchange)
data = dp.trades("UNITTEST/BTC", "5m")
@ -91,7 +91,7 @@ def test_historic_trades(mocker, default_conf, trades_history_df):
assert len(data) == len(trades_history_df)
# Random other runmode
default_conf['runmode'] = RunMode.UTIL_EXCHANGE
default_conf["runmode"] = RunMode.UTIL_EXCHANGE
dp = DataProvider(default_conf, None)
data = dp.trades("UNITTEST/BTC", "5m")
assert isinstance(data, DataFrame)
@ -311,7 +311,7 @@ def test_refresh(mocker, default_conf):
# Test with public trades
refresh_mock.reset_mock()
refresh_mock.reset_mock()
default_conf['exchange']['use_public_trades'] = True
default_conf["exchange"]["use_public_trades"] = True
dp.refresh(pairs, pairs_non_trad)
assert mock_refresh_trades.call_count == 1
assert refresh_mock.call_count == 1

View File

@ -674,8 +674,9 @@ def test_download_trades_history(
mocker.patch(f"{EXMS}.get_historic_trades", MagicMock(side_effect=ValueError))
caplog.clear()
assert not _download_trades_history(data_handler=data_handler, exchange=exchange,
pair='ETH/BTC', trading_mode=TradingMode.SPOT)
assert not _download_trades_history(
data_handler=data_handler, exchange=exchange, pair="ETH/BTC", trading_mode=TradingMode.SPOT
)
assert log_has_re('Failed to download and store historic trades for pair: "ETH/BTC".*', caplog)
file2 = tmp_path / "XRP_ETH-trades.json.gz"

View File

@ -2414,8 +2414,7 @@ def test_refresh_latest_trades(mocker, default_conf, caplog, candle_type, tmp_pa
exchange._api_async.fetch_trades = get_mock_coro(trades)
exchange._ft_has["exchange_has_overrides"]["fetchTrades"] = True
pairs = [("IOTA/USDT:USDT", "5m", candle_type),
("XRP/USDT:USDT", "5m", candle_type)]
pairs = [("IOTA/USDT:USDT", "5m", candle_type), ("XRP/USDT:USDT", "5m", candle_type)]
# empty dicts
assert not exchange._trades
res = exchange.refresh_latest_trades(pairs, cache=False)
@ -2442,10 +2441,8 @@ def test_refresh_latest_trades(mocker, default_conf, caplog, candle_type, tmp_pa
# if copy is "True"
assert exchange.trades(pair) is not exchange.trades(pair)
assert exchange.trades(pair) is not exchange.trades(pair, copy=True)
assert exchange.trades(
pair, copy=True) is not exchange.trades(pair, copy=True)
assert exchange.trades(
pair, copy=False) is exchange.trades(pair, copy=False)
assert exchange.trades(pair, copy=True) is not exchange.trades(pair, copy=True)
assert exchange.trades(pair, copy=False) is exchange.trades(pair, copy=False)
# test caching
ohlcv = [
@ -2470,12 +2467,10 @@ def test_refresh_latest_trades(mocker, default_conf, caplog, candle_type, tmp_pa
trades_df = DataFrame(ohlcv, columns=cols)
trades_df["date"] = to_datetime(trades_df["date"], unit="ms", utc=True)
trades_df["date"] = trades_df["date"].apply(
lambda date: timeframe_to_prev_date("5m", date))
trades_df["date"] = trades_df["date"].apply(lambda date: timeframe_to_prev_date("5m", date))
exchange._klines[pair] = trades_df
res = exchange.refresh_latest_trades(
[("IOTA/USDT:USDT", "5m", candle_type),
("XRP/USDT:USDT", "5m", candle_type)]
[("IOTA/USDT:USDT", "5m", candle_type), ("XRP/USDT:USDT", "5m", candle_type)]
)
assert len(res) == 0
assert exchange._api_async.fetch_trades.call_count == 0
@ -2501,12 +2496,10 @@ def test_refresh_latest_trades(mocker, default_conf, caplog, candle_type, tmp_pa
}
]
trades_df = DataFrame(trades)
trades_df["date"] = to_datetime(
trades_df["timestamp"], unit="ms", utc=True)
trades_df["date"] = to_datetime(trades_df["timestamp"], unit="ms", utc=True)
exchange._trades[pair] = trades_df
res = exchange.refresh_latest_trades(
[("IOTA/USDT:USDT", "5m", candle_type),
("XRP/USDT:USDT", "5m", candle_type)]
[("IOTA/USDT:USDT", "5m", candle_type), ("XRP/USDT:USDT", "5m", candle_type)]
)
assert len(res) == len(pairs)

View File

@ -1076,14 +1076,20 @@ def test__validate_consumers(default_conf, caplog) -> None:
def test__validate_orderflow(default_conf) -> None:
conf = deepcopy(default_conf)
conf['exchange']['use_public_trades'] = True
with pytest.raises(ConfigurationError,
match="Orderflow is a required configuration key when using public trades."):
conf["exchange"]["use_public_trades"] = True
with pytest.raises(
ConfigurationError,
match="Orderflow is a required configuration key when using public trades.",
):
validate_config_consistency(conf)
conf.update({'orderflow': {
"scale": 0.5,
}})
conf.update(
{
"orderflow": {
"scale": 0.5,
}
}
)
# Should pass.
validate_config_consistency(conf)