diff --git a/freqtrade/constants.py b/freqtrade/constants.py index ca1be1d6a..ff6cc7c67 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -61,6 +61,7 @@ USERPATH_FREQAIMODELS = 'freqaimodels' TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent'] WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw'] +FULL_DATAFRAME_THRESHOLD = 100 ENV_VAR_PREFIX = 'FREQTRADE__' diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 6b220c8b4..df4a4c898 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -9,14 +9,16 @@ from collections import deque from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple -from pandas import DataFrame +from pandas import DataFrame, to_timedelta from freqtrade.configuration import TimeRange -from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe +from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes, + PairWithTimeframe) from freqtrade.data.history import load_pair_history from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.exchange import Exchange, timeframe_to_seconds +from freqtrade.misc import append_candles_to_dataframe from freqtrade.rpc import RPCManager from freqtrade.util import PeriodicCache @@ -120,7 +122,7 @@ class DataProvider: 'type': RPCMessageType.ANALYZED_DF, 'data': { 'key': pair_key, - 'df': dataframe, + 'df': dataframe.tail(1), 'la': datetime.now(timezone.utc) } } @@ -131,7 +133,7 @@ class DataProvider: 'data': pair_key, }) - def _add_external_df( + def _replace_external_df( self, pair: str, dataframe: DataFrame, @@ -157,6 +159,85 @@ class DataProvider: self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed) logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.") + def _add_external_df( + self, + pair: str, + dataframe: DataFrame, + last_analyzed: datetime, + timeframe: str, + candle_type: CandleType, + producer_name: str = "default" + ) -> Tuple[bool, int]: + """ + Append a candle to the existing external dataframe. The incoming dataframe + must have at least 1 candle. + + :param pair: pair to get the data for + :param timeframe: Timeframe to get data for + :param candle_type: Any of the enum CandleType (must match trading mode!) + :returns: False if the candle could not be appended, or the int number of missing candles. + """ + pair_key = (pair, timeframe, candle_type) + + if dataframe.empty: + # The incoming dataframe must have at least 1 candle + return (False, 0) + + if len(dataframe) >= FULL_DATAFRAME_THRESHOLD: + # This is likely a full dataframe + # Add the dataframe to the dataprovider + self._replace_external_df( + pair, + dataframe, + last_analyzed=last_analyzed, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ) + return (True, 0) + + if (producer_name not in self.__producer_pairs_df + or pair_key not in self.__producer_pairs_df[producer_name]): + # We don't have data from this producer yet, + # or we don't have data for this pair_key + # return False and 1000 for the full df + return (False, 1000) + + existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] + + # CHECK FOR MISSING CANDLES + timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas + local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy + incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming + + # Remove existing candles that are newer than the incoming first candle + existing_df1 = existing_df[existing_df['date'] < incoming_first] + + candle_difference = (incoming_first - local_last) / timeframe_delta + + # If the difference divided by the timeframe is 1, then this + # is the candle we want and the incoming data isn't missing any. + # If the candle_difference is more than 1, that means + # we missed some candles between our data and the incoming + # so return False and candle_difference. + if candle_difference > 1: + return (False, candle_difference) + if existing_df1.empty: + appended_df = dataframe + else: + appended_df = append_candles_to_dataframe(existing_df1, dataframe) + + # Everything is good, we appended + self._replace_external_df( + pair, + appended_df, + last_analyzed=last_analyzed, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ) + return (True, 0) + def get_producer_df( self, pair: str, diff --git a/freqtrade/misc.py b/freqtrade/misc.py index 2d2c7513a..93e8da6dd 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -301,3 +301,21 @@ def remove_entry_exit_signals(dataframe: pd.DataFrame): dataframe[SignalTagType.EXIT_TAG.value] = None return dataframe + + +def append_candles_to_dataframe(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: + """ + Append the `right` dataframe to the `left` dataframe + + :param left: The full dataframe you want appended to + :param right: The new dataframe containing the data you want appended + :returns: The dataframe with the right data in it + """ + if left.iloc[-1]['date'] != right.iloc[-1]['date']: + left = pd.concat([left, right]) + + # Only keep the last 1500 candles in memory + left = left[-1500:] if len(left) > 1500 else left + left.reset_index(drop=True, inplace=True) + + return left diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index e183cd7e7..18714f15f 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -91,9 +91,10 @@ async def _process_consumer_request( elif type == RPCRequestType.ANALYZED_DF: # Limit the amount of candles per dataframe to 'limit' or 1500 limit = min(data.get('limit', 1500), 1500) if data else None + pair = data.get('pair', None) if data else None # For every pair in the generator, send a separate message - for message in rpc._ws_request_analyzed_df(limit): + for message in rpc._ws_request_analyzed_df(limit, pair): # Format response response = WSAnalyzedDFMessage(data=message) await channel.send(response.dict(exclude_none=True)) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index c50aff8be..3c0a833d8 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -27,7 +27,8 @@ class WebSocketChannel: self, websocket: WebSocketType, channel_id: Optional[str] = None, - serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer + serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer, + send_throttle: float = 0.01 ): self.channel_id = channel_id if channel_id else uuid4().hex[:8] self._websocket = WebSocketProxy(websocket) @@ -41,6 +42,7 @@ class WebSocketChannel: self._send_times: Deque[float] = deque([], maxlen=10) # High limit defaults to 3 to start self._send_high_limit = 3 + self._send_throttle = send_throttle # The subscribed message types self._subscriptions: List[str] = [] @@ -106,7 +108,8 @@ class WebSocketChannel: # Explicitly give control back to event loop as # websockets.send does not - await asyncio.sleep(0.01) + # Also throttles how fast we send + await asyncio.sleep(self._send_throttle) async def recv(self): """ diff --git a/freqtrade/rpc/api_server/ws_schemas.py b/freqtrade/rpc/api_server/ws_schemas.py index 877232213..292672b60 100644 --- a/freqtrade/rpc/api_server/ws_schemas.py +++ b/freqtrade/rpc/api_server/ws_schemas.py @@ -47,7 +47,7 @@ class WSWhitelistRequest(WSRequestSchema): class WSAnalyzedDFRequest(WSRequestSchema): type: RPCRequestType = RPCRequestType.ANALYZED_DF - data: Dict[str, Any] = {"limit": 1500} + data: Dict[str, Any] = {"limit": 1500, "pair": None} # ------------------------------ MESSAGE SCHEMAS ---------------------------- diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 6078efd07..e888191ea 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -8,15 +8,17 @@ import asyncio import logging import socket from threading import Thread -from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict +from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union import websockets from pydantic import ValidationError +from freqtrade.constants import FULL_DATAFRAME_THRESHOLD from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import RPCMessageType from freqtrade.misc import remove_entry_exit_signals -from freqtrade.rpc.api_server.ws import WebSocketChannel +from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel +from freqtrade.rpc.api_server.ws.message_stream import MessageStream from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest, WSMessageSchema, WSRequestSchema, WSSubscribeRequest, WSWhitelistMessage, @@ -38,6 +40,10 @@ class Producer(TypedDict): logger = logging.getLogger(__name__) +def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]): + return schema.dict(exclude_none=True) + + class ExternalMessageConsumer: """ The main controller class for consuming external messages from @@ -92,6 +98,8 @@ class ExternalMessageConsumer: RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message, } + self._channel_streams: Dict[str, MessageStream] = {} + self.start() def start(self): @@ -118,6 +126,8 @@ class ExternalMessageConsumer: logger.info("Stopping ExternalMessageConsumer") self._running = False + self._channel_streams = {} + if self._sub_tasks: # Cancel sub tasks for task in self._sub_tasks: @@ -175,7 +185,6 @@ class ExternalMessageConsumer: :param producer: Dictionary containing producer info :param lock: An asyncio Lock """ - channel = None while self._running: try: host, port = producer['host'], producer['port'] @@ -190,19 +199,21 @@ class ExternalMessageConsumer: max_size=self.message_size_limit, ping_interval=None ) as ws: - channel = WebSocketChannel(ws, channel_id=name) + async with create_channel( + ws, + channel_id=name, + send_throttle=0.5 + ) as channel: - logger.info(f"Producer connection success - {channel}") + # Create the message stream for this channel + self._channel_streams[name] = MessageStream() - # Now request the initial data from this Producer - for request in self._initial_requests: - await channel.send( - request.dict(exclude_none=True) + # Run the channel tasks while connected + await channel.run_channel_tasks( + self._receive_messages(channel, producer, lock), + self._send_requests(channel, self._channel_streams[name]) ) - # Now receive data, if none is within the time limit, ping - await self._receive_messages(channel, producer, lock) - except (websockets.exceptions.InvalidURI, ValueError) as e: logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") break @@ -229,11 +240,19 @@ class ExternalMessageConsumer: # An unforseen error has occurred, log and continue logger.error("Unexpected error has occurred:") logger.exception(e) + await asyncio.sleep(self.sleep_time) continue - finally: - if channel: - await channel.close() + async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream): + # Send the initial requests + for init_request in self._initial_requests: + await channel.send(schema_to_dict(init_request)) + + # Now send any subsequent requests published to + # this channel's stream + async for request, _ in channel_stream: + logger.debug(f"Sending request to channel - {channel} - {request}") + await channel.send(request) async def _receive_messages( self, @@ -270,19 +289,31 @@ class ExternalMessageConsumer: latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000) logger.info(f"Connection to {channel} still alive, latency: {latency}ms") - continue - except (websockets.exceptions.ConnectionClosed): - # Just eat the error and continue reconnecting - logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - break + except Exception as e: + # Just eat the error and continue reconnecting logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) - await asyncio.sleep(self.sleep_time) + raise - break + def send_producer_request( + self, + producer_name: str, + request: Union[WSRequestSchema, Dict[str, Any]] + ): + """ + Publish a message to the producer's message stream to be + sent by the channel task. + + :param producer_name: The name of the producer to publish the message to + :param request: The request to send to the producer + """ + if isinstance(request, WSRequestSchema): + request = schema_to_dict(request) + + if channel_stream := self._channel_streams.get(producer_name): + channel_stream.publish(request) def handle_producer_message(self, producer: Producer, message: Dict[str, Any]): """ @@ -336,16 +367,45 @@ class ExternalMessageConsumer: pair, timeframe, candle_type = key + if df.empty: + logger.debug(f"Received Empty Dataframe for {key}") + return + # If set, remove the Entry and Exit signals from the Producer if self._emc_config.get('remove_entry_exit_signals', False): df = remove_entry_exit_signals(df) - # Add the dataframe to the dataprovider - self._dp._add_external_df(pair, df, - last_analyzed=la, - timeframe=timeframe, - candle_type=candle_type, - producer_name=producer_name) + logger.debug(f"Received {len(df)} candle(s) for {key}") + + did_append, n_missing = self._dp._add_external_df( + pair, + df, + last_analyzed=la, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ) + + if not did_append: + # We want an overlap in candles incase some data has changed + n_missing += 1 + # Set to None for all candles if we missed a full df's worth of candles + n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500 + + logger.warning(f"Holes in data or no existing df, requesting {n_missing} candles " + f"for {key} from `{producer_name}`") + + self.send_producer_request( + producer_name, + WSAnalyzedDFRequest( + data={ + "limit": n_missing, + "pair": pair + } + ) + ) + return logger.debug( - f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`") + f"Consumed message from `{producer_name}` " + f"of type `RPCMessageType.ANALYZED_DF` for {key}") diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index dae23d388..ed905d844 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1062,15 +1062,26 @@ class RPC: return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'], pair, timeframe, _data, last_analyzed) - def __rpc_analysed_dataframe_raw(self, pair: str, timeframe: str, - limit: Optional[int]) -> Tuple[DataFrame, datetime]: - """ Get the dataframe and last analyze from the dataprovider """ + def __rpc_analysed_dataframe_raw( + self, + pair: str, + timeframe: str, + limit: Optional[int] + ) -> Tuple[DataFrame, datetime]: + """ + Get the dataframe and last analyze from the dataprovider + + :param pair: The pair to get + :param timeframe: The timeframe of data to get + :param limit: The amount of candles in the dataframe + """ _data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe( pair, timeframe) _data = _data.copy() if limit: _data = _data.iloc[-limit:] + return _data, last_analyzed def _ws_all_analysed_dataframes( @@ -1078,7 +1089,16 @@ class RPC: pairlist: List[str], limit: Optional[int] ) -> Generator[Dict[str, Any], None, None]: - """ Get the analysed dataframes of each pair in the pairlist """ + """ + Get the analysed dataframes of each pair in the pairlist. + If specified, only return the most recent `limit` candles for + each dataframe. + + :param pairlist: A list of pairs to get + :param limit: If an integer, limits the size of dataframe + If a list of string date times, only returns those candles + :returns: A generator of dictionaries with the key, dataframe, and last analyzed timestamp + """ timeframe = self._freqtrade.config['timeframe'] candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT) @@ -1091,10 +1111,15 @@ class RPC: "la": last_analyzed } - def _ws_request_analyzed_df(self, limit: Optional[int]): + def _ws_request_analyzed_df( + self, + limit: Optional[int] = None, + pair: Optional[str] = None + ): """ Historical Analyzed Dataframes for WebSocket """ - whitelist = self._freqtrade.active_pair_whitelist - return self._ws_all_analysed_dataframes(whitelist, limit) + pairlist = [pair] if pair else self._freqtrade.active_pair_whitelist + + return self._ws_all_analysed_dataframes(pairlist, limit) def _ws_request_whitelist(self): """ Whitelist data for WebSocket """ diff --git a/tests/data/test_dataprovider.py b/tests/data/test_dataprovider.py index 025e6d08a..7d61a22be 100644 --- a/tests/data/test_dataprovider.py +++ b/tests/data/test_dataprovider.py @@ -2,13 +2,13 @@ from datetime import datetime, timezone from unittest.mock import MagicMock import pytest -from pandas import DataFrame +from pandas import DataFrame, Timestamp from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import CandleType, RunMode from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.plugins.pairlistmanager import PairListManager -from tests.conftest import get_patched_exchange +from tests.conftest import generate_test_data, get_patched_exchange @pytest.mark.parametrize('candle_type', [ @@ -161,9 +161,9 @@ def test_producer_pairs(mocker, default_conf, ohlcv_history): assert dataprovider.get_producer_pairs("bad") == [] -def test_get_producer_df(mocker, default_conf, ohlcv_history): +def test_get_producer_df(mocker, default_conf): dataprovider = DataProvider(default_conf, None) - + ohlcv_history = generate_test_data('5m', 150) pair = 'BTC/USDT' timeframe = default_conf['timeframe'] candle_type = CandleType.SPOT @@ -412,3 +412,80 @@ def test_dp_send_msg(default_conf): dp = DataProvider(default_conf, None) dp.send_msg(msg, always_send=True) assert msg not in dp._msg_queue + + +def test_dp__add_external_df(default_conf_usdt): + timeframe = '1h' + default_conf_usdt["timeframe"] = timeframe + dp = DataProvider(default_conf_usdt, None) + df = generate_test_data(timeframe, 24, '2022-01-01 00:00:00+00:00') + last_analyzed = datetime.now(timezone.utc) + + res = dp._add_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is False + # Why 1000 ?? + assert res[1] == 1000 + + # Hard add dataframe + dp._replace_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + # BTC is not stored yet + res = dp._add_external_df('BTC/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is False + df_res, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + assert len(df_res) == 24 + + # Add the same dataframe again - dataframe size shall not change. + res = dp._add_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is True + assert res[1] == 0 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + assert len(df) == 24 + + # Add a new day. + df2 = generate_test_data(timeframe, 24, '2022-01-02 00:00:00+00:00') + + res = dp._add_external_df('ETH/USDT', df2, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is True + assert res[1] == 0 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + assert len(df) == 48 + + # Add a dataframe with a 12 hour offset - so 12 candles are overlapping, and 12 valid. + df3 = generate_test_data(timeframe, 24, '2022-01-02 12:00:00+00:00') + + res = dp._add_external_df('ETH/USDT', df3, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is True + assert res[1] == 0 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + # New length = 48 + 12 (since we have a 12 hour offset). + assert len(df) == 60 + assert df.iloc[-1]['date'] == df3.iloc[-1]['date'] + assert df.iloc[-1]['date'] == Timestamp('2022-01-03 11:00:00+00:00') + + # Generate 1 new candle + df4 = generate_test_data(timeframe, 1, '2022-01-03 12:00:00+00:00') + res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + # assert res[0] is True + # assert res[1] == 0 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + # New length = 61 + 1 + assert len(df) == 61 + assert df.iloc[-2]['date'] == Timestamp('2022-01-03 11:00:00+00:00') + assert df.iloc[-1]['date'] == Timestamp('2022-01-03 12:00:00+00:00') + + # Gap in the data ... + df4 = generate_test_data(timeframe, 1, '2022-01-05 00:00:00+00:00') + res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is False + # 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00 + assert res[1] == 36 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + # New length = 61 + 1 + assert len(df) == 61 + + # Empty dataframe + df4 = generate_test_data(timeframe, 0, '2022-01-05 00:00:00+00:00') + res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is False + # 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00 + assert res[1] == 0 diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index 93ae829d5..e1537ec9e 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -83,6 +83,7 @@ def test_emc_init(patched_emc): def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"} producer_name = test_producer['name'] + invalid_msg = r"Invalid message .+" caplog.set_level(logging.DEBUG) @@ -94,7 +95,7 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): assert log_has( f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`", caplog) - # Test handle analyzed_df message + # Test handle analyzed_df single candle message df_message = { "type": "analyzed_df", "data": { @@ -106,8 +107,7 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): patched_emc.handle_producer_message(test_producer, df_message) assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) - assert log_has( - f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`", caplog) + assert log_has_re(r"Holes in data or no existing df,.+", caplog) # Test unhandled message unhandled_message = {"type": "status", "data": "RUNNING"} @@ -120,7 +120,8 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}} patched_emc.handle_producer_message(test_producer, malformed_message) - assert log_has_re(r"Invalid message .+", caplog) + assert log_has_re(invalid_msg, caplog) + caplog.clear() malformed_message = { "type": "analyzed_df", @@ -133,13 +134,30 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): patched_emc.handle_producer_message(test_producer, malformed_message) assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) - assert log_has_re(r"Invalid message .+", caplog) + assert log_has_re(invalid_msg, caplog) + caplog.clear() + + # Empty dataframe + malformed_message = { + "type": "analyzed_df", + "data": { + "key": ("BTC/USDT", "5m", "spot"), + "df": ohlcv_history.loc[ohlcv_history['open'] < 0], + "la": datetime.now(timezone.utc) + } + } + patched_emc.handle_producer_message(test_producer, malformed_message) + + assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) + assert not log_has_re(invalid_msg, caplog) + assert log_has_re(r"Received Empty Dataframe for.+", caplog) caplog.clear() malformed_message = {"some": "stuff"} patched_emc.handle_producer_message(test_producer, malformed_message) - assert log_has_re(r"Invalid message .+", caplog) + assert log_has_re(invalid_msg, caplog) + caplog.clear() caplog.clear() malformed_message = {"type": "whitelist", "data": None} @@ -183,7 +201,7 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker): async with websockets.serve(eat, _TEST_WS_HOST, _TEST_WS_PORT): await emc._create_connection(test_producer, lock) - assert log_has_re(r"Producer connection success.+", caplog) + assert log_has_re(r"Connected to channel.+", caplog) finally: emc.shutdown() @@ -212,7 +230,8 @@ async def test_emc_create_connection_invalid_url(default_conf, caplog, mocker, h dp = DataProvider(default_conf, None, None, None) # Handle start explicitly to avoid messing with threading in tests - mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start",) + mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start") + mocker.patch("freqtrade.rpc.api_server.ws.channel.create_channel") emc = ExternalMessageConsumer(default_conf, dp) try: @@ -390,7 +409,9 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker): try: change_running(emc) loop.call_soon(functools.partial(change_running, emc=emc)) - await emc._receive_messages(TestChannel(), test_producer, lock) + + with pytest.raises(asyncio.TimeoutError): + await emc._receive_messages(TestChannel(), test_producer, lock) assert log_has_re(r"Ping error.+", caplog) finally: