freqtrade_origin/freqtrade/rpc/external_message_consumer.py

392 lines
14 KiB
Python
Raw Permalink Normal View History

2022-08-31 01:21:34 +00:00
"""
ExternalMessageConsumer module
Main purpose is to connect to external bot's message websocket to consume data
from it
"""
2024-05-12 14:51:11 +00:00
2022-08-31 01:21:34 +00:00
import asyncio
import logging
import socket
from threading import Thread
from typing import Any, Callable, TypedDict, Union
2022-08-31 01:21:34 +00:00
import websockets
2022-09-08 19:58:28 +00:00
from pydantic import ValidationError
2022-08-31 01:21:34 +00:00
from freqtrade.constants import FULL_DATAFRAME_THRESHOLD
from freqtrade.data.dataprovider import DataProvider
2022-09-07 21:08:01 +00:00
from freqtrade.enums import RPCMessageType
2022-09-02 05:15:03 +00:00
from freqtrade.misc import remove_entry_exit_signals
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
from freqtrade.rpc.api_server.ws_schemas import (
WSAnalyzedDFMessage,
WSAnalyzedDFRequest,
WSMessageSchema,
WSRequestSchema,
WSSubscribeRequest,
WSWhitelistMessage,
WSWhitelistRequest,
)
2022-08-31 01:21:34 +00:00
2022-09-24 14:10:42 +00:00
class Producer(TypedDict):
name: str
host: str
port: int
2022-11-16 05:26:54 +00:00
secure: bool
2022-09-24 14:10:42 +00:00
ws_token: str
2022-08-31 01:21:34 +00:00
logger = logging.getLogger(__name__)
def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]):
2023-07-18 05:08:18 +00:00
return schema.model_dump(exclude_none=True)
2022-08-31 01:21:34 +00:00
class ExternalMessageConsumer:
"""
The main controller class for consuming external messages from
2022-09-22 17:58:38 +00:00
other freqtrade bot's
2022-08-31 01:21:34 +00:00
"""
def __init__(self, config: dict[str, Any], dataprovider: DataProvider):
2022-08-31 01:21:34 +00:00
self._config = config
self._dp = dataprovider
2022-08-31 01:21:34 +00:00
self._running = False
self._thread = None
self._loop = None
self._main_task = None
self._sub_tasks = None
2024-05-12 14:51:11 +00:00
self._emc_config = self._config.get("external_message_consumer", {})
2022-08-31 01:21:34 +00:00
2024-05-12 14:51:11 +00:00
self.enabled = self._emc_config.get("enabled", False)
self.producers: list[Producer] = self._emc_config.get("producers", [])
2022-08-31 01:21:34 +00:00
2024-05-12 14:51:11 +00:00
self.wait_timeout = self._emc_config.get("wait_timeout", 30) # in seconds
self.ping_timeout = self._emc_config.get("ping_timeout", 10) # in seconds
self.sleep_time = self._emc_config.get("sleep_time", 10) # in seconds
2022-08-31 01:21:34 +00:00
2022-09-02 22:01:33 +00:00
# The amount of candles per dataframe on the initial request
2024-05-12 14:51:11 +00:00
self.initial_candle_limit = self._emc_config.get("initial_candle_limit", 1500)
# Message size limit, in megabytes. Default 8mb, Use bitwise operator << 20 to convert
# as the websockets client expects bytes.
2024-05-12 14:51:11 +00:00
self.message_size_limit = self._emc_config.get("message_size_limit", 8) << 20
2022-08-31 01:21:34 +00:00
# Setting these explicitly as they probably shouldn't be changed by a user
# Unless we somehow integrate this with the strategy to allow creating
# callbacks for the messages
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
# Allow setting data for each initial request
self._initial_requests: list[WSRequestSchema] = [
2022-09-07 21:08:01 +00:00
WSSubscribeRequest(data=self.topics),
WSWhitelistRequest(),
2024-05-12 14:51:11 +00:00
WSAnalyzedDFRequest(),
]
2022-09-02 22:01:33 +00:00
# Specify which function to use for which RPCMessageType
self._message_handlers: dict[str, Callable[[str, WSMessageSchema], None]] = {
RPCMessageType.WHITELIST: self._consume_whitelist_message,
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
self._channel_streams: dict[str, MessageStream] = {}
2022-08-31 01:21:34 +00:00
self.start()
def start(self):
"""
Start the main internal loop in another thread to run coroutines
"""
if self._thread and self._loop:
return
2022-08-31 01:21:34 +00:00
logger.info("Starting ExternalMessageConsumer")
2022-08-31 01:21:34 +00:00
self._loop = asyncio.new_event_loop()
self._thread = Thread(target=self._loop.run_forever)
self._running = True
self._thread.start()
2022-08-31 01:21:34 +00:00
self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop)
def shutdown(self):
"""
Shutdown the loop, thread, and tasks
"""
if self._thread and self._loop:
logger.info("Stopping ExternalMessageConsumer")
self._running = False
2022-08-31 01:21:34 +00:00
self._channel_streams = {}
2022-08-31 01:21:34 +00:00
if self._sub_tasks:
# Cancel sub tasks
for task in self._sub_tasks:
task.cancel()
if self._main_task:
# Cancel the main task
self._main_task.cancel()
self._thread.join()
self._thread = None
self._loop = None
self._sub_tasks = None
self._main_task = None
2022-08-31 01:21:34 +00:00
async def _main(self):
"""
The main task coroutine
"""
lock = asyncio.Lock()
2022-08-31 01:21:34 +00:00
try:
# Create a connection to each producer
self._sub_tasks = [
self._loop.create_task(self._handle_producer_connection(producer, lock))
2022-08-31 01:21:34 +00:00
for producer in self.producers
]
await asyncio.gather(*self._sub_tasks)
except asyncio.CancelledError:
pass
finally:
# Stop the loop once we are done
self._loop.stop()
2022-09-24 14:10:42 +00:00
async def _handle_producer_connection(self, producer: Producer, lock: asyncio.Lock):
2022-08-31 01:21:34 +00:00
"""
Main connection loop for the consumer
2022-09-02 22:01:33 +00:00
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
2022-08-31 01:21:34 +00:00
"""
try:
await self._create_connection(producer, lock)
2022-08-31 01:21:34 +00:00
except asyncio.CancelledError:
# Exit silently
pass
2022-09-24 14:10:42 +00:00
async def _create_connection(self, producer: Producer, lock: asyncio.Lock):
"""
Actually creates and handles the websocket connection, pinging on timeout
and handling connection errors.
2022-09-02 22:01:33 +00:00
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
while self._running:
try:
2024-05-12 14:51:11 +00:00
host, port = producer["host"], producer["port"]
token = producer["ws_token"]
name = producer["name"]
scheme = "wss" if producer.get("secure", False) else "ws"
2022-11-16 05:26:54 +00:00
ws_url = f"{scheme}://{host}:{port}/api/v1/message/ws?token={token}"
# This will raise InvalidURI if the url is bad
2022-10-10 00:51:52 +00:00
async with websockets.connect(
2024-05-12 14:51:11 +00:00
ws_url, max_size=self.message_size_limit, ping_interval=None
2022-10-10 00:51:52 +00:00
) as ws:
2024-05-12 14:51:11 +00:00
async with create_channel(ws, channel_id=name, send_throttle=0.5) as channel:
# Create the message stream for this channel
self._channel_streams[name] = MessageStream()
# Run the channel tasks while connected
await channel.run_channel_tasks(
self._receive_messages(channel, producer, lock),
2024-05-12 14:51:11 +00:00
self._send_requests(channel, self._channel_streams[name]),
)
except (websockets.exceptions.InvalidURI, ValueError) as e:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break
except (
socket.gaierror,
ConnectionRefusedError,
2022-09-21 22:04:25 +00:00
websockets.exceptions.InvalidStatusCode,
2024-05-12 14:51:11 +00:00
websockets.exceptions.InvalidMessage,
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
2022-12-02 19:28:27 +00:00
await asyncio.sleep(self.sleep_time)
continue
2022-09-23 18:36:05 +00:00
except (
websockets.exceptions.ConnectionClosedError,
2024-05-12 14:51:11 +00:00
websockets.exceptions.ConnectionClosedOK,
2022-09-23 18:36:05 +00:00
):
# Just keep trying to connect again indefinitely
2022-12-02 19:28:27 +00:00
await asyncio.sleep(self.sleep_time)
continue
2022-09-11 05:57:17 +00:00
except Exception as e:
2024-04-18 20:51:25 +00:00
# An unforeseen error has occurred, log and continue
2022-09-11 05:57:17 +00:00
logger.error("Unexpected error has occurred:")
logger.exception(e)
await asyncio.sleep(self.sleep_time)
continue
async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream):
# Send the initial requests
for init_request in self._initial_requests:
await channel.send(schema_to_dict(init_request))
# Now send any subsequent requests published to
# this channel's stream
async for request, _ in channel_stream:
logger.debug(f"Sending request to channel - {channel} - {request}")
await channel.send(request)
async def _receive_messages(
2024-05-12 14:51:11 +00:00
self, channel: WebSocketChannel, producer: Producer, lock: asyncio.Lock
):
"""
Loop to handle receiving messages from a Producer
:param channel: The WebSocketChannel object for the WebSocket
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
2022-09-02 22:01:33 +00:00
while self._running:
try:
2024-05-12 14:51:11 +00:00
message = await asyncio.wait_for(channel.recv(), timeout=self.wait_timeout)
2022-09-06 18:12:05 +00:00
try:
async with lock:
# Handle the message
self.handle_producer_message(producer, message)
except Exception as e:
logger.exception(f"Error handling producer message: {e}")
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
pong = await channel.ping()
2024-05-12 14:51:11 +00:00
latency = await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")
continue
2022-09-13 22:06:25 +00:00
except Exception as e:
# Just eat the error and continue reconnecting
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
2022-09-13 22:06:25 +00:00
logger.debug(e, exc_info=e)
2022-11-29 19:22:06 +00:00
raise
def send_producer_request(
self, producer_name: str, request: Union[WSRequestSchema, dict[str, Any]]
):
"""
Publish a message to the producer's message stream to be
sent by the channel task.
:param producer_name: The name of the producer to publish the message to
:param request: The request to send to the producer
"""
if isinstance(request, WSRequestSchema):
request = schema_to_dict(request)
if channel_stream := self._channel_streams.get(producer_name):
channel_stream.publish(request)
def handle_producer_message(self, producer: Producer, message: dict[str, Any]):
2022-08-31 01:21:34 +00:00
"""
Handles external messages from a Producer
"""
2024-05-12 14:51:11 +00:00
producer_name = producer.get("name", "default")
2022-09-07 21:08:01 +00:00
try:
2023-07-18 05:05:30 +00:00
producer_message = WSMessageSchema.model_validate(message)
2022-09-07 21:08:01 +00:00
except ValidationError as e:
2022-09-10 19:44:27 +00:00
logger.error(f"Invalid message from `{producer_name}`: {e}")
2022-09-07 21:08:01 +00:00
return
2022-08-31 01:21:34 +00:00
if not producer_message.data:
logger.error(f"Empty message received from `{producer_name}`")
return
2022-09-29 05:10:00 +00:00
logger.debug(f"Received message of type `{producer_message.type}` from `{producer_name}`")
2022-08-31 01:21:34 +00:00
2022-09-07 21:08:01 +00:00
message_handler = self._message_handlers.get(producer_message.type)
if not message_handler:
2022-09-10 19:44:27 +00:00
logger.info(f"Received unhandled message: `{producer_message.data}`, ignoring...")
return
2022-09-07 21:08:01 +00:00
message_handler(producer_name, producer_message)
2022-09-12 18:00:01 +00:00
def _consume_whitelist_message(self, producer_name: str, message: WSMessageSchema):
2022-09-07 21:08:01 +00:00
try:
# Validate the message
2023-07-18 05:08:18 +00:00
whitelist_message = WSWhitelistMessage.model_validate(message.model_dump())
2022-09-10 20:29:15 +00:00
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
# Add the pairlist data to the DataProvider
2022-09-12 20:09:12 +00:00
self._dp._set_producer_pairs(whitelist_message.data, producer_name=producer_name)
2022-09-10 19:44:27 +00:00
logger.debug(f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`")
2022-09-12 18:00:01 +00:00
def _consume_analyzed_df_message(self, producer_name: str, message: WSMessageSchema):
2022-09-07 21:08:01 +00:00
try:
2023-07-18 05:08:18 +00:00
df_message = WSAnalyzedDFMessage.model_validate(message.model_dump())
2022-09-10 20:29:15 +00:00
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
2022-08-31 01:21:34 +00:00
key = df_message.data.key
df = df_message.data.df
la = df_message.data.la
2022-08-31 01:21:34 +00:00
2022-09-07 21:08:01 +00:00
pair, timeframe, candle_type = key
2022-08-31 01:21:34 +00:00
if df.empty:
logger.debug(f"Received Empty Dataframe for {key}")
return
2022-09-07 21:08:01 +00:00
# If set, remove the Entry and Exit signals from the Producer
2024-05-12 14:51:11 +00:00
if self._emc_config.get("remove_entry_exit_signals", False):
2022-09-07 21:08:01 +00:00
df = remove_entry_exit_signals(df)
2022-08-31 01:21:34 +00:00
logger.debug(f"Received {len(df)} candle(s) for {key}")
did_append, n_missing = self._dp._add_external_df(
pair,
df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
2024-05-12 14:51:11 +00:00
producer_name=producer_name,
)
2022-11-27 19:17:26 +00:00
if not did_append:
2024-04-18 20:51:25 +00:00
# We want an overlap in candles in case some data has changed
n_missing += 1
# Set to None for all candles if we missed a full df's worth of candles
n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
2024-05-12 14:51:11 +00:00
logger.warning(
f"Holes in data or no existing df, requesting {n_missing} candles "
f"for {key} from `{producer_name}`"
)
self.send_producer_request(
2024-05-12 14:51:11 +00:00
producer_name, WSAnalyzedDFRequest(data={"limit": n_missing, "pair": pair})
)
return
2022-08-31 01:30:14 +00:00
logger.debug(
f"Consumed message from `{producer_name}` "
2024-05-12 14:51:11 +00:00
f"of type `RPCMessageType.ANALYZED_DF` for {key}"
)