fix dataframe serializing

This commit is contained in:
Timothy Pogue 2022-09-01 23:15:03 -06:00
parent 00f35f4870
commit dccde88c83
6 changed files with 37 additions and 26 deletions

View File

@ -17,7 +17,6 @@ from freqtrade.data.history import load_pair_history
from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.enums import CandleType, RPCMessageType, RunMode
from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.exceptions import ExchangeError, OperationalException
from freqtrade.exchange import Exchange, timeframe_to_seconds from freqtrade.exchange import Exchange, timeframe_to_seconds
from freqtrade.misc import dataframe_to_json
from freqtrade.rpc import RPCManager from freqtrade.rpc import RPCManager
from freqtrade.util import PeriodicCache from freqtrade.util import PeriodicCache
@ -119,7 +118,7 @@ class DataProvider:
'type': RPCMessageType.ANALYZED_DF, 'type': RPCMessageType.ANALYZED_DF,
'data': { 'data': {
'key': pair_key, 'key': pair_key,
'value': dataframe_to_json(dataframe) 'value': dataframe
} }
} }
) )

View File

@ -269,6 +269,7 @@ def json_to_dataframe(data: str) -> pandas.DataFrame:
:returns: A pandas DataFrame from the JSON string :returns: A pandas DataFrame from the JSON string
""" """
dataframe = pandas.read_json(data) dataframe = pandas.read_json(data)
if 'date' in dataframe.columns:
dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True)
return dataframe return dataframe

View File

@ -3,7 +3,7 @@ from threading import RLock
from typing import List, Type from typing import List, Type
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
from freqtrade.rpc.api_server.ws.serializer import ORJSONWebSocketSerializer, WebSocketSerializer from freqtrade.rpc.api_server.ws.serializer import RapidJSONWebSocketSerializer, WebSocketSerializer
from freqtrade.rpc.api_server.ws.types import WebSocketType from freqtrade.rpc.api_server.ws.types import WebSocketType
@ -18,7 +18,7 @@ class WebSocketChannel:
def __init__( def __init__(
self, self,
websocket: WebSocketType, websocket: WebSocketType,
serializer_cls: Type[WebSocketSerializer] = ORJSONWebSocketSerializer serializer_cls: Type[WebSocketSerializer] = RapidJSONWebSocketSerializer
): ):
# The WebSocket object # The WebSocket object
self._websocket = WebSocketProxy(websocket) self._websocket = WebSocketProxy(websocket)

View File

@ -3,8 +3,10 @@ import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import msgpack import msgpack
import orjson import rapidjson
from pandas import DataFrame
from freqtrade.misc import dataframe_to_json, json_to_dataframe
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
@ -34,27 +36,23 @@ class WebSocketSerializer(ABC):
async def close(self, code: int = 1000): async def close(self, code: int = 1000):
await self._websocket.close(code) await self._websocket.close(code)
# Going to explore using MsgPack as the serialization,
# as that might be the best method for sending pandas
# dataframes over the wire
class JSONWebSocketSerializer(WebSocketSerializer): class JSONWebSocketSerializer(WebSocketSerializer):
def _serialize(self, data): def _serialize(self, data):
return json.dumps(data) return json.dumps(data, default=_json_default)
def _deserialize(self, data): def _deserialize(self, data):
return json.loads(data) return json.loads(data, object_hook=_json_object_hook)
class ORJSONWebSocketSerializer(WebSocketSerializer): # ORJSON does not support .loads(object_hook=x) parameter, so we must use RapidJSON
ORJSON_OPTIONS = orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY
class RapidJSONWebSocketSerializer(WebSocketSerializer):
def _serialize(self, data): def _serialize(self, data):
return orjson.dumps(data, option=self.ORJSON_OPTIONS) return rapidjson.dumps(data, default=_json_default)
def _deserialize(self, data): def _deserialize(self, data):
return orjson.loads(data) return rapidjson.loads(data, object_hook=_json_object_hook)
class MsgPackWebSocketSerializer(WebSocketSerializer): class MsgPackWebSocketSerializer(WebSocketSerializer):
@ -63,3 +61,20 @@ class MsgPackWebSocketSerializer(WebSocketSerializer):
def _deserialize(self, data): def _deserialize(self, data):
return msgpack.unpackb(data, raw=False) return msgpack.unpackb(data, raw=False)
# Support serializing pandas DataFrames
def _json_default(z):
if isinstance(z, DataFrame):
return {
'__type__': 'dataframe',
'__value__': dataframe_to_json(z)
}
raise TypeError
# Support deserializing JSON to pandas DataFrames
def _json_object_hook(z):
if z.get('__type__') == 'dataframe':
return json_to_dataframe(z.get('__value__'))
return z

View File

@ -10,11 +10,12 @@ import socket
from threading import Thread from threading import Thread
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import pandas
import websockets import websockets
from freqtrade.data.dataprovider import DataProvider from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import RPCMessageType, RPCRequestType from freqtrade.enums import RPCMessageType, RPCRequestType
from freqtrade.misc import json_to_dataframe, remove_entry_exit_signals from freqtrade.misc import remove_entry_exit_signals
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel from freqtrade.rpc.api_server.ws.channel import WebSocketChannel
@ -262,11 +263,9 @@ class ExternalMessageConsumer:
key, value = message_data.get('key'), message_data.get('value') key, value = message_data.get('key'), message_data.get('value')
if key and value: if key and isinstance(value, pandas.DataFrame):
pair, timeframe, candle_type = key pair, timeframe, candle_type = key
dataframe = value
# Convert the JSON to a pandas DataFrame
dataframe = json_to_dataframe(value)
# If set, remove the Entry and Exit signals from the Producer # If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False): if self._emc_config.get('remove_entry_exit_signals', False):

View File

@ -24,7 +24,7 @@ from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirecti
from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exceptions import ExchangeError, PricingError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs
from freqtrade.loggers import bufferHandler from freqtrade.loggers import bufferHandler
from freqtrade.misc import dataframe_to_json, decimals_per_coin, shorten_date from freqtrade.misc import decimals_per_coin, shorten_date
from freqtrade.persistence import PairLocks, Trade from freqtrade.persistence import PairLocks, Trade
from freqtrade.persistence.models import PairLock from freqtrade.persistence.models import PairLock
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
@ -1064,10 +1064,7 @@ class RPC:
for pair in pairlist: for pair in pairlist:
dataframe, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit) dataframe, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit)
_data[pair] = { _data[pair] = {"key": (pair, timeframe, candle_type), "value": dataframe}
"key": (pair, timeframe, candle_type),
"value": dataframe_to_json(dataframe)
}
return _data return _data