ruff format: update scripts

This commit is contained in:
Matthias 2024-05-12 16:18:55 +02:00
parent dc3a3d1cf9
commit 9121d3af65
2 changed files with 60 additions and 67 deletions

View File

@ -10,5 +10,5 @@ so it can be used as a standalone script.
from freqtrade_client.ft_client import main from freqtrade_client.ft_client import main
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View File

@ -6,6 +6,7 @@ a Freqtrade bot's message websocket
Should not import anything from freqtrade, Should not import anything from freqtrade,
so it can be used as a standalone script. so it can be used as a standalone script.
""" """
import argparse import argparse
import asyncio import asyncio
import logging import logging
@ -25,35 +26,33 @@ logger = logging.getLogger("WebSocketClient")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def setup_logging(filename: str): def setup_logging(filename: str):
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[ handlers=[logging.FileHandler(filename), logging.StreamHandler()],
logging.FileHandler(filename),
logging.StreamHandler()
]
) )
def parse_args(): def parse_args():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
'-c', "-c",
'--config', "--config",
help='Specify configuration file (default: %(default)s). ', help="Specify configuration file (default: %(default)s). ",
dest='config', dest="config",
type=str, type=str,
metavar='PATH', metavar="PATH",
default='config.json' default="config.json",
) )
parser.add_argument( parser.add_argument(
'-l', "-l",
'--logfile', "--logfile",
help='The filename to log to.', help="The filename to log to.",
dest='logfile', dest="logfile",
type=str, type=str,
default='ws_client.log' default="ws_client.log",
) )
args = parser.parse_args() args = parser.parse_args()
@ -64,8 +63,9 @@ def load_config(configfile):
file = Path(configfile) file = Path(configfile)
if file.is_file(): if file.is_file():
with file.open("r") as f: with file.open("r") as f:
config = rapidjson.load(f, parse_mode=rapidjson.PM_COMMENTS | config = rapidjson.load(
rapidjson.PM_TRAILING_COMMAS) f, parse_mode=rapidjson.PM_COMMENTS | rapidjson.PM_TRAILING_COMMAS
)
return config return config
else: else:
logger.warning(f"Could not load config file {file}.") logger.warning(f"Could not load config file {file}.")
@ -84,6 +84,7 @@ def readable_timedelta(delta):
return f"{int(minutes)}:{int(seconds)}.{int(milliseconds)}" return f"{int(minutes)}:{int(seconds)}.{int(milliseconds)}"
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
@ -100,16 +101,17 @@ def json_deserialize(message):
Deserialize JSON to a dict Deserialize JSON to a dict
:param message: The message to deserialize :param message: The message to deserialize
""" """
def json_to_dataframe(data: str) -> pandas.DataFrame: def json_to_dataframe(data: str) -> pandas.DataFrame:
dataframe = pandas.read_json(data, orient='split') dataframe = pandas.read_json(data, orient="split")
if 'date' in dataframe.columns: if "date" in dataframe.columns:
dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) dataframe["date"] = pandas.to_datetime(dataframe["date"], unit="ms", utc=True)
return dataframe return dataframe
def _json_object_hook(z): def _json_object_hook(z):
if z.get('__type__') == 'dataframe': if z.get("__type__") == "dataframe":
return json_to_dataframe(z.get('__value__')) return json_to_dataframe(z.get("__value__"))
return z return z
return rapidjson.loads(message, object_hook=_json_object_hook) return rapidjson.loads(message, object_hook=_json_object_hook)
@ -128,16 +130,13 @@ class ClientProtocol:
initial_requests = [ initial_requests = [
{ {
"type": "subscribe", # The subscribe request should always be first "type": "subscribe", # The subscribe request should always be first
"data": ["analyzed_df", "whitelist"] # The message types we want "data": ["analyzed_df", "whitelist"], # The message types we want
}, },
{ {
"type": "whitelist", "type": "whitelist",
"data": None, "data": None,
}, },
{ {"type": "analyzed_df", "data": {"limit": 1500}},
"type": "analyzed_df",
"data": {"limit": 1500}
}
] ]
for request in initial_requests: for request in initial_requests:
@ -147,8 +146,8 @@ class ClientProtocol:
deserialized = json_deserialize(message) deserialized = json_deserialize(message)
message_size = sys.getsizeof(message) message_size = sys.getsizeof(message)
message_type = deserialized.get('type') message_type = deserialized.get("type")
message_data = deserialized.get('data') message_data = deserialized.get("data")
self.logger.info( self.logger.info(
f"Received message of type {message_type} [{message_size} bytes] @ [{name}]" f"Received message of type {message_type} [{message_size} bytes] @ [{name}]"
@ -177,7 +176,7 @@ class ClientProtocol:
self.logger.info(data) self.logger.info(data)
async def _handle_analyzed_df(self, name, type, data): async def _handle_analyzed_df(self, name, type, data):
key, la, df = data['key'], data['la'], data['df'] key, la, df = data["key"], data["la"], data["df"]
if not df.empty: if not df.empty:
columns = ", ".join([str(column) for column in df.columns]) columns = ", ".join([str(column) for column in df.columns])
@ -196,16 +195,16 @@ class ClientProtocol:
async def create_client( async def create_client(
host, host,
port, port,
token, token,
scheme='ws', scheme="ws",
name='default', name="default",
protocol=None, protocol=None,
sleep_time=10, sleep_time=10,
ping_timeout=10, ping_timeout=10,
wait_timeout=30, wait_timeout=30,
**kwargs **kwargs,
): ):
""" """
Create a websocket client and listen for messages Create a websocket client and listen for messages
@ -231,21 +230,15 @@ async def create_client(
# Now listen for messages # Now listen for messages
while 1: while 1:
try: try:
message = await asyncio.wait_for( message = await asyncio.wait_for(ws.recv(), timeout=wait_timeout)
ws.recv(),
timeout=wait_timeout
)
await protocol.on_message(ws, name, message) await protocol.on_message(ws, name, message)
except ( except (asyncio.TimeoutError, websockets.exceptions.WebSocketException):
asyncio.TimeoutError,
websockets.exceptions.WebSocketException
):
# Try pinging # Try pinging
try: try:
pong = await ws.ping() pong = await ws.ping()
latency = (await asyncio.wait_for(pong, timeout=ping_timeout) * 1000) latency = await asyncio.wait_for(pong, timeout=ping_timeout) * 1000
logger.info(f"Connection still alive, latency: {latency}ms") logger.info(f"Connection still alive, latency: {latency}ms")
@ -261,7 +254,7 @@ async def create_client(
socket.gaierror, socket.gaierror,
ConnectionRefusedError, ConnectionRefusedError,
websockets.exceptions.InvalidStatusCode, websockets.exceptions.InvalidStatusCode,
websockets.exceptions.InvalidMessage websockets.exceptions.InvalidMessage,
) as e: ) as e:
logger.error(f"Connection Refused - {e} retrying in {sleep_time}s") logger.error(f"Connection Refused - {e} retrying in {sleep_time}s")
await asyncio.sleep(sleep_time) await asyncio.sleep(sleep_time)
@ -270,7 +263,7 @@ async def create_client(
except ( except (
websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedError,
websockets.exceptions.ConnectionClosedOK websockets.exceptions.ConnectionClosedOK,
): ):
logger.info("Connection was closed") logger.info("Connection was closed")
# Just keep trying to connect again indefinitely # Just keep trying to connect again indefinitely
@ -291,30 +284,30 @@ async def create_client(
async def _main(args): async def _main(args):
setup_logging(args['logfile']) setup_logging(args["logfile"])
config = load_config(args['config']) config = load_config(args["config"])
emc_config = config.get('external_message_consumer', {}) emc_config = config.get("external_message_consumer", {})
producers = emc_config.get('producers', []) producers = emc_config.get("producers", [])
producer = producers[0] producer = producers[0]
wait_timeout = emc_config.get('wait_timeout', 30) wait_timeout = emc_config.get("wait_timeout", 30)
ping_timeout = emc_config.get('ping_timeout', 10) ping_timeout = emc_config.get("ping_timeout", 10)
sleep_time = emc_config.get('sleep_time', 10) sleep_time = emc_config.get("sleep_time", 10)
message_size_limit = (emc_config.get('message_size_limit', 8) << 20) message_size_limit = emc_config.get("message_size_limit", 8) << 20
await create_client( await create_client(
producer['host'], producer["host"],
producer['port'], producer["port"],
producer['ws_token'], producer["ws_token"],
'wss' if producer.get('secure', False) else 'ws', "wss" if producer.get("secure", False) else "ws",
producer['name'], producer["name"],
sleep_time=sleep_time, sleep_time=sleep_time,
ping_timeout=ping_timeout, ping_timeout=ping_timeout,
wait_timeout=wait_timeout, wait_timeout=wait_timeout,
max_size=message_size_limit, max_size=message_size_limit,
ping_interval=None ping_interval=None,
) )