diff --git a/scripts/rest_client.py b/scripts/rest_client.py index c22dd18ae..88862b044 100755 --- a/scripts/rest_client.py +++ b/scripts/rest_client.py @@ -10,5 +10,5 @@ so it can be used as a standalone script. from freqtrade_client.ft_client import main -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/scripts/ws_client.py b/scripts/ws_client.py index 818426da2..ec6df5742 100755 --- a/scripts/ws_client.py +++ b/scripts/ws_client.py @@ -6,6 +6,7 @@ a Freqtrade bot's message websocket Should not import anything from freqtrade, so it can be used as a standalone script. """ + import argparse import asyncio import logging @@ -25,35 +26,33 @@ logger = logging.getLogger("WebSocketClient") # --------------------------------------------------------------------------- + def setup_logging(filename: str): logging.basicConfig( level=logging.DEBUG, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(filename), - logging.StreamHandler() - ] + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.FileHandler(filename), logging.StreamHandler()], ) def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( - '-c', - '--config', - help='Specify configuration file (default: %(default)s). ', - dest='config', + "-c", + "--config", + help="Specify configuration file (default: %(default)s). ", + dest="config", type=str, - metavar='PATH', - default='config.json' + metavar="PATH", + default="config.json", ) parser.add_argument( - '-l', - '--logfile', - help='The filename to log to.', - dest='logfile', + "-l", + "--logfile", + help="The filename to log to.", + dest="logfile", type=str, - default='ws_client.log' + default="ws_client.log", ) args = parser.parse_args() @@ -64,8 +63,9 @@ def load_config(configfile): file = Path(configfile) if file.is_file(): with file.open("r") as f: - config = rapidjson.load(f, parse_mode=rapidjson.PM_COMMENTS | - rapidjson.PM_TRAILING_COMMAS) + config = rapidjson.load( + f, parse_mode=rapidjson.PM_COMMENTS | rapidjson.PM_TRAILING_COMMAS + ) return config else: logger.warning(f"Could not load config file {file}.") @@ -84,6 +84,7 @@ def readable_timedelta(delta): return f"{int(minutes)}:{int(seconds)}.{int(milliseconds)}" + # ---------------------------------------------------------------------------- @@ -100,16 +101,17 @@ def json_deserialize(message): Deserialize JSON to a dict :param message: The message to deserialize """ + def json_to_dataframe(data: str) -> pandas.DataFrame: - dataframe = pandas.read_json(data, orient='split') - if 'date' in dataframe.columns: - dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) + dataframe = pandas.read_json(data, orient="split") + if "date" in dataframe.columns: + dataframe["date"] = pandas.to_datetime(dataframe["date"], unit="ms", utc=True) return dataframe def _json_object_hook(z): - if z.get('__type__') == 'dataframe': - return json_to_dataframe(z.get('__value__')) + if z.get("__type__") == "dataframe": + return json_to_dataframe(z.get("__value__")) return z return rapidjson.loads(message, object_hook=_json_object_hook) @@ -128,16 +130,13 @@ class ClientProtocol: initial_requests = [ { "type": "subscribe", # The subscribe request should always be first - "data": ["analyzed_df", "whitelist"] # The message types we want + "data": ["analyzed_df", "whitelist"], # The message types we want }, { "type": "whitelist", "data": None, }, - { - "type": "analyzed_df", - "data": {"limit": 1500} - } + {"type": "analyzed_df", "data": {"limit": 1500}}, ] for request in initial_requests: @@ -147,8 +146,8 @@ class ClientProtocol: deserialized = json_deserialize(message) message_size = sys.getsizeof(message) - message_type = deserialized.get('type') - message_data = deserialized.get('data') + message_type = deserialized.get("type") + message_data = deserialized.get("data") self.logger.info( f"Received message of type {message_type} [{message_size} bytes] @ [{name}]" @@ -177,7 +176,7 @@ class ClientProtocol: self.logger.info(data) async def _handle_analyzed_df(self, name, type, data): - key, la, df = data['key'], data['la'], data['df'] + key, la, df = data["key"], data["la"], data["df"] if not df.empty: columns = ", ".join([str(column) for column in df.columns]) @@ -196,16 +195,16 @@ class ClientProtocol: async def create_client( - host, - port, - token, - scheme='ws', - name='default', - protocol=None, - sleep_time=10, - ping_timeout=10, - wait_timeout=30, - **kwargs + host, + port, + token, + scheme="ws", + name="default", + protocol=None, + sleep_time=10, + ping_timeout=10, + wait_timeout=30, + **kwargs, ): """ Create a websocket client and listen for messages @@ -231,21 +230,15 @@ async def create_client( # Now listen for messages while 1: try: - message = await asyncio.wait_for( - ws.recv(), - timeout=wait_timeout - ) + message = await asyncio.wait_for(ws.recv(), timeout=wait_timeout) await protocol.on_message(ws, name, message) - except ( - asyncio.TimeoutError, - websockets.exceptions.WebSocketException - ): + except (asyncio.TimeoutError, websockets.exceptions.WebSocketException): # Try pinging try: pong = await ws.ping() - latency = (await asyncio.wait_for(pong, timeout=ping_timeout) * 1000) + latency = await asyncio.wait_for(pong, timeout=ping_timeout) * 1000 logger.info(f"Connection still alive, latency: {latency}ms") @@ -261,7 +254,7 @@ async def create_client( socket.gaierror, ConnectionRefusedError, websockets.exceptions.InvalidStatusCode, - websockets.exceptions.InvalidMessage + websockets.exceptions.InvalidMessage, ) as e: logger.error(f"Connection Refused - {e} retrying in {sleep_time}s") await asyncio.sleep(sleep_time) @@ -270,7 +263,7 @@ async def create_client( except ( websockets.exceptions.ConnectionClosedError, - websockets.exceptions.ConnectionClosedOK + websockets.exceptions.ConnectionClosedOK, ): logger.info("Connection was closed") # Just keep trying to connect again indefinitely @@ -291,30 +284,30 @@ async def create_client( async def _main(args): - setup_logging(args['logfile']) - config = load_config(args['config']) + setup_logging(args["logfile"]) + config = load_config(args["config"]) - emc_config = config.get('external_message_consumer', {}) + emc_config = config.get("external_message_consumer", {}) - producers = emc_config.get('producers', []) + producers = emc_config.get("producers", []) producer = producers[0] - wait_timeout = emc_config.get('wait_timeout', 30) - ping_timeout = emc_config.get('ping_timeout', 10) - sleep_time = emc_config.get('sleep_time', 10) - message_size_limit = (emc_config.get('message_size_limit', 8) << 20) + wait_timeout = emc_config.get("wait_timeout", 30) + ping_timeout = emc_config.get("ping_timeout", 10) + sleep_time = emc_config.get("sleep_time", 10) + message_size_limit = emc_config.get("message_size_limit", 8) << 20 await create_client( - producer['host'], - producer['port'], - producer['ws_token'], - 'wss' if producer.get('secure', False) else 'ws', - producer['name'], + producer["host"], + producer["port"], + producer["ws_token"], + "wss" if producer.get("secure", False) else "ws", + producer["name"], sleep_time=sleep_time, ping_timeout=ping_timeout, wait_timeout=wait_timeout, max_size=message_size_limit, - ping_interval=None + ping_interval=None, )