diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index 3f207eac3..01243b0cc 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -23,18 +23,6 @@ logger = logging.getLogger(__name__) router = APIRouter() -# async def is_websocket_alive(ws: WebSocket) -> bool: -# """ -# Check if a FastAPI Websocket is still open -# """ -# if ( -# ws.application_state == WebSocketState.CONNECTED and -# ws.client_state == WebSocketState.CONNECTED -# ): -# return True -# return False - - class WebSocketChannelClosed(Exception): """ General WebSocket exception to signal closing the channel @@ -153,51 +141,3 @@ async def message_endpoint( finally: logger.info(f"Channel disconnected - {channel}") channel_tasks.cancel() - - -# @router.websocket("/message/ws") -# async def message_endpoint( -# ws: WebSocket, -# rpc: RPC = Depends(get_rpc), -# channel_manager=Depends(get_channel_manager), -# token: str = Depends(validate_ws_token) -# ): -# """ -# Message WebSocket endpoint, facilitates sending RPC messages -# """ -# try: -# channel = await channel_manager.on_connect(ws) -# if await is_websocket_alive(ws): - -# logger.info(f"Consumer connected - {channel}") - -# # Keep connection open until explicitly closed, and process requests -# try: -# while not channel.is_closed(): -# request = await channel.recv() - -# # Process the request here -# await _process_consumer_request(request, channel, rpc, channel_manager) - -# except (WebSocketDisconnect, WebSocketException): -# # Handle client disconnects -# logger.info(f"Consumer disconnected - {channel}") -# except RuntimeError: -# # Handle cases like - -# # RuntimeError('Cannot call "send" once a closed message has been sent') -# pass -# except Exception as e: -# logger.info(f"Consumer connection failed - {channel}: {e}") -# logger.debug(e, exc_info=e) - -# except RuntimeError: -# # WebSocket was closed -# # Do nothing -# pass -# except Exception as e: -# logger.error(f"Failed to serve - {ws.client}") -# # Log tracebacks to keep track of what errors are happening -# logger.exception(e) -# finally: -# if channel: -# await channel_manager.on_disconnect(ws) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index d0695e06d..f100a46ef 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -45,10 +45,7 @@ class ApiServer(RPCHandler): _config: Config = {} # Exchange - only available in webserver mode. _exchange = None - # websocket message queue stuff - # _ws_channel_manager = None - # _ws_thread = None - # _ws_loop = None + # websocket message stuff _message_stream = None def __new__(cls, *args, **kwargs): @@ -72,8 +69,6 @@ class ApiServer(RPCHandler): api_config = self._config['api_server'] - # ApiServer._ws_channel_manager = ChannelManager() - self.app = FastAPI(title="Freqtrade API", docs_url='/docs' if api_config.get('enable_openapi', False) else None, redoc_url=None, @@ -101,19 +96,6 @@ class ApiServer(RPCHandler): logger.info("Stopping API Server") self._server.cleanup() - # if self._ws_thread and self._ws_loop: - # logger.info("Stopping API Server background tasks") - - # if self._ws_background_task: - # # Cancel the queue task - # self._ws_background_task.cancel() - - # self._ws_thread.join() - - # self._ws_thread = None - # self._ws_loop = None - # self._ws_background_task = None - @classmethod def shutdown(cls): cls.__initialized = False @@ -123,6 +105,9 @@ class ApiServer(RPCHandler): cls._rpc = None def send_msg(self, msg: Dict[str, Any]) -> None: + """ + Publish the message to the message stream + """ if ApiServer._message_stream: ApiServer._message_stream.publish(msg) @@ -173,57 +158,21 @@ class ApiServer(RPCHandler): ) async def _api_startup_event(self): + """ + Creates the MessageStream class on startup + so it has access to the same event loop + as uvicorn + """ if not ApiServer._message_stream: ApiServer._message_stream = MessageStream() async def _api_shutdown_event(self): + """ + Removes the MessageStream class on shutdown + """ if ApiServer._message_stream: ApiServer._message_stream = None - # def start_message_queue(self): - # if self._ws_thread: - # return - - # # Create a new loop, as it'll be just for the background thread - # self._ws_loop = asyncio.new_event_loop() - - # # Start the thread - # self._ws_thread = Thread(target=self._ws_loop.run_forever) - # self._ws_thread.start() - - # # Finally, submit the coro to the thread - # self._ws_background_task = asyncio.run_coroutine_threadsafe( - # self._broadcast_queue_data(), loop=self._ws_loop) - - # async def _broadcast_queue_data(self): - # # Instantiate the queue in this coroutine so it's attached to our loop - # self._ws_queue = ThreadedQueue() - # async_queue = self._ws_queue.async_q - - # try: - # while True: - # logger.debug("Getting queue messages...") - # # Get data from queue - # message: WSMessageSchemaType = await async_queue.get() - # logger.debug(f"Found message of type: {message.get('type')}") - # async_queue.task_done() - # # Broadcast it - # await self._ws_channel_manager.broadcast(message) - # except asyncio.CancelledError: - # pass - - # # For testing, shouldn't happen when stable - # except Exception as e: - # logger.exception(f"Exception happened in background task: {e}") - - # finally: - # # Disconnect channels and stop the loop on cancel - # await self._ws_channel_manager.disconnect_all() - # self._ws_loop.stop() - # # Avoid adding more items to the queue if they aren't - # # going to get broadcasted. - # self._ws_queue = None - def start_api(self): """ Start API ... should be run in thread. diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 39c8db516..ee16a95c6 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -125,223 +125,3 @@ class WebSocketChannel: yield self finally: await self.close() - - -# class WebSocketChannel: -# """ -# Object to help facilitate managing a websocket connection -# """ - -# def __init__( -# self, -# websocket: WebSocketType, -# channel_id: Optional[str] = None, -# drain_timeout: int = 3, -# throttle: float = 0.01, -# serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer -# ): - -# self.channel_id = channel_id if channel_id else uuid4().hex[:8] - -# # The WebSocket object -# self._websocket = WebSocketProxy(websocket) - -# self.drain_timeout = drain_timeout -# self.throttle = throttle - -# self._subscriptions: List[str] = [] -# # 32 is the size of the receiving queue in websockets package -# self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(maxsize=32) -# self._relay_task = asyncio.create_task(self.relay()) - -# # Internal event to signify a closed websocket -# self._closed = asyncio.Event() - -# # Wrap the WebSocket in the Serializing class -# self._wrapped_ws = serializer_cls(self._websocket) - -# def __repr__(self): -# return f"WebSocketChannel({self.channel_id}, {self.remote_addr})" - -# @property -# def raw_websocket(self): -# return self._websocket.raw_websocket - -# @property -# def remote_addr(self): -# return self._websocket.remote_addr - -# async def _send(self, data): -# """ -# Send data on the wrapped websocket -# """ -# await self._wrapped_ws.send(data) - -# async def send(self, data) -> bool: -# """ -# Add the data to the queue to be sent. -# :returns: True if data added to queue, False otherwise -# """ - -# # This block only runs if the queue is full, it will wait -# # until self.drain_timeout for the relay to drain the outgoing queue -# # We can't use asyncio.wait_for here because the queue may have been created with a -# # different eventloop -# start = time.time() -# while self.queue.full(): -# await asyncio.sleep(1) -# if (time.time() - start) > self.drain_timeout: -# return False - -# # If for some reason the queue is still full, just return False -# try: -# self.queue.put_nowait(data) -# except asyncio.QueueFull: -# return False - -# # If we got here everything is ok -# return True - -# async def recv(self): -# """ -# Receive data on the wrapped websocket -# """ -# return await self._wrapped_ws.recv() - -# async def ping(self): -# """ -# Ping the websocket -# """ -# return await self._websocket.ping() - -# async def close(self): -# """ -# Close the WebSocketChannel -# """ - -# try: -# await self.raw_websocket.close() -# except Exception: -# pass - -# self._closed.set() -# self._relay_task.cancel() - -# def is_closed(self) -> bool: -# """ -# Closed flag -# """ -# return self._closed.is_set() - -# def set_subscriptions(self, subscriptions: List[str] = []) -> None: -# """ -# Set which subscriptions this channel is subscribed to - -# :param subscriptions: List of subscriptions, List[str] -# """ -# self._subscriptions = subscriptions - -# def subscribed_to(self, message_type: str) -> bool: -# """ -# Check if this channel is subscribed to the message_type - -# :param message_type: The message type to check -# """ -# return message_type in self._subscriptions - -# async def relay(self): -# """ -# Relay messages from the channel's queue and send them out. This is started -# as a task. -# """ -# while not self._closed.is_set(): -# message = await self.queue.get() -# try: -# await self._send(message) -# self.queue.task_done() - -# # Limit messages per sec. -# # Could cause problems with queue size if too low, and -# # problems with network traffik if too high. -# # 0.01 = 100/s -# await asyncio.sleep(self.throttle) -# except RuntimeError: -# # The connection was closed, just exit the task -# return - - -# class ChannelManager: -# def __init__(self): -# self.channels = dict() -# self._lock = RLock() # Re-entrant Lock - -# async def on_connect(self, websocket: WebSocketType): -# """ -# Wrap websocket connection into Channel and add to list - -# :param websocket: The WebSocket object to attach to the Channel -# """ -# if isinstance(websocket, FastAPIWebSocket): -# try: -# await websocket.accept() -# except RuntimeError: -# # The connection was closed before we could accept it -# return - -# ws_channel = WebSocketChannel(websocket) - -# with self._lock: -# self.channels[websocket] = ws_channel - -# return ws_channel - -# async def on_disconnect(self, websocket: WebSocketType): -# """ -# Call close on the channel if it's not, and remove from channel list - -# :param websocket: The WebSocket objet attached to the Channel -# """ -# with self._lock: -# channel = self.channels.get(websocket) -# if channel: -# logger.info(f"Disconnecting channel {channel}") -# if not channel.is_closed(): -# await channel.close() - -# del self.channels[websocket] - -# async def disconnect_all(self): -# """ -# Disconnect all Channels -# """ -# with self._lock: -# for websocket in self.channels.copy().keys(): -# await self.on_disconnect(websocket) - -# async def broadcast(self, message: WSMessageSchemaType): -# """ -# Broadcast a message on all Channels - -# :param message: The message to send -# """ -# with self._lock: -# for channel in self.channels.copy().values(): -# if channel.subscribed_to(message.get('type')): -# await self.send_direct(channel, message) - -# async def send_direct( -# self, channel: WebSocketChannel, message: Union[WSMessageSchemaType, Dict[str, Any]]): -# """ -# Send a message directly through direct_channel only - -# :param direct_channel: The WebSocketChannel object to send the message through -# :param message: The message to send -# """ -# if not await channel.send(message): -# await self.on_disconnect(channel.raw_websocket) - -# def has_channels(self): -# """ -# Flag for more than 0 channels -# """ -# return len(self.channels) > 0 diff --git a/freqtrade/rpc/api_server/ws/serializer.py b/freqtrade/rpc/api_server/ws/serializer.py index 85703136b..625a0990c 100644 --- a/freqtrade/rpc/api_server/ws/serializer.py +++ b/freqtrade/rpc/api_server/ws/serializer.py @@ -34,9 +34,6 @@ class WebSocketSerializer(ABC): return self._deserialize(data) - # async def close(self, code: int = 1000): - # await self._websocket.close(code) - class HybridJSONWebSocketSerializer(WebSocketSerializer): def _serialize(self, data) -> str: