freqtrade_origin/freqtrade/rpc/api_server/ws/channel.py

128 lines
3.4 KiB
Python
Raw Normal View History

import asyncio
import logging
2022-11-15 03:27:45 +00:00
from contextlib import asynccontextmanager
from typing import Any, Dict, List, Optional, Type, Union
from uuid import uuid4
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
2022-09-02 06:05:36 +00:00
from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer,
WebSocketSerializer)
from freqtrade.rpc.api_server.ws.types import WebSocketType
from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType
logger = logging.getLogger(__name__)
class WebSocketChannel:
"""
Object to help facilitate managing a websocket connection
"""
def __init__(
self,
websocket: WebSocketType,
channel_id: Optional[str] = None,
2022-09-02 06:05:36 +00:00
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer
):
self.channel_id = channel_id if channel_id else uuid4().hex[:8]
self._websocket = WebSocketProxy(websocket)
# Internal event to signify a closed websocket
2022-11-02 19:26:27 +00:00
self._closed = asyncio.Event()
2022-11-15 05:21:40 +00:00
# The subscribed message types
self._subscriptions: List[str] = []
2022-11-15 03:27:45 +00:00
# Wrap the WebSocket in the Serializing class
2022-11-13 09:26:27 +00:00
self._wrapped_ws = serializer_cls(self._websocket)
def __repr__(self):
return f"WebSocketChannel({self.channel_id}, {self.remote_addr})"
@property
def raw_websocket(self):
return self._websocket.raw_websocket
@property
def remote_addr(self):
return self._websocket.remote_addr
2022-11-15 03:27:45 +00:00
async def send(self, message: Union[WSMessageSchemaType, Dict[str, Any]]):
"""
2022-11-15 03:27:45 +00:00
Send a message on the wrapped websocket
"""
2022-11-15 03:27:45 +00:00
await self._wrapped_ws.send(message)
async def recv(self):
"""
2022-11-15 03:27:45 +00:00
Receive a message on the wrapped websocket
"""
return await self._wrapped_ws.recv()
async def ping(self):
"""
Ping the websocket
"""
return await self._websocket.ping()
2022-11-15 03:27:45 +00:00
async def accept(self):
"""
Accept the underlying websocket connection
"""
return await self._websocket.accept()
async def close(self):
"""
Close the WebSocketChannel
"""
2022-11-02 19:26:27 +00:00
try:
2022-11-15 03:27:45 +00:00
await self._websocket.close()
2022-11-02 19:26:27 +00:00
except Exception:
pass
self._closed.set()
def is_closed(self) -> bool:
"""
Closed flag
"""
2022-11-02 19:26:27 +00:00
return self._closed.is_set()
def set_subscriptions(self, subscriptions: List[str] = []) -> None:
"""
Set which subscriptions this channel is subscribed to
:param subscriptions: List of subscriptions, List[str]
"""
self._subscriptions = subscriptions
def subscribed_to(self, message_type: str) -> bool:
"""
Check if this channel is subscribed to the message_type
:param message_type: The message type to check
"""
return message_type in self._subscriptions
2022-11-15 03:27:45 +00:00
async def __aiter__(self):
"""
2022-11-15 03:27:45 +00:00
Generator for received messages
"""
2022-11-15 03:27:45 +00:00
while True:
try:
2022-11-15 03:27:45 +00:00
yield await self.recv()
except Exception:
break
2022-11-15 03:27:45 +00:00
@asynccontextmanager
async def connect(self):
"""
2022-11-15 03:27:45 +00:00
Context manager for safely opening and closing the websocket connection
"""
2022-11-15 03:27:45 +00:00
try:
await self.accept()
yield self
finally:
await self.close()