From 510cf4f30507ed4763d13e12a41e12ceb59a6748 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 31 Aug 2022 10:40:26 -0600 Subject: [PATCH] remove data waiting, remove explicit analyzing of external df --- freqtrade/data/dataprovider.py | 107 +++++++++---------- freqtrade/enums/__init__.py | 1 - freqtrade/enums/externalmessages.py | 7 -- freqtrade/freqtradebot.py | 16 ++- freqtrade/rpc/api_server/api_ws.py | 37 +++++-- freqtrade/rpc/emc.py | 153 +++++++++++++++------------- freqtrade/rpc/rpc.py | 14 +-- freqtrade/strategy/interface.py | 54 ++-------- 8 files changed, 182 insertions(+), 207 deletions(-) delete mode 100644 freqtrade/enums/externalmessages.py diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 2d473683c..9376c0b33 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -7,7 +7,6 @@ Common Interface for bot and strategy to access data. import logging from collections import deque from datetime import datetime, timezone -from threading import Event from typing import Any, Dict, List, Optional, Tuple from pandas import DataFrame @@ -15,9 +14,11 @@ from pandas import DataFrame from freqtrade.configuration import TimeRange from freqtrade.constants import ListPairsWithTimeframes, PairWithTimeframe from freqtrade.data.history import load_pair_history -from freqtrade.enums import CandleType, RunMode, WaitDataPolicy +from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.exchange import Exchange, timeframe_to_seconds +from freqtrade.misc import dataframe_to_json +from freqtrade.rpc import RPCManager from freqtrade.util import PeriodicCache @@ -33,16 +34,18 @@ class DataProvider: self, config: dict, exchange: Optional[Exchange], + rpc: Optional[RPCManager] = None, pairlists=None ) -> None: self._config = config self._exchange = exchange self._pairlists = pairlists + self.__rpc = rpc self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} self.__slice_index: Optional[int] = None self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {} self.__external_pairs_df: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} - self.__external_pairs_event: Dict[PairWithTimeframe, Tuple[int, Event]] = {} + self.__producer_pairs: List[str] = [] self._msg_queue: deque = deque() self.__msg_cache = PeriodicCache( @@ -51,10 +54,7 @@ class DataProvider: self._num_sources = len( self._config.get('external_message_consumer', {}).get('producers', []) ) - self._wait_data_policy = self._config.get('external_message_consumer', {}).get( - 'wait_data_policy', WaitDataPolicy.all) - self._wait_data_timeout = self._config.get('external_message_consumer', {}).get( - 'wait_data_timeout', 5) + self.external_data_enabled = self._num_sources > 0 def _set_dataframe_max_index(self, limit_index: int): """ @@ -83,6 +83,46 @@ class DataProvider: self.__cached_pairs[pair_key] = ( dataframe, datetime.now(timezone.utc)) + # For multiple producers we will want to merge the pairlists instead of overwriting + def set_producer_pairs(self, pairlist: List[str]): + """ + Set the pairs received to later be used. + This only supports 1 Producer right now. + + :param pairlist: List of pairs + """ + self.__producer_pairs = pairlist.copy() + + def get_producer_pairs(self) -> List[str]: + """ + Get the pairs cached from the producer + + :returns: List of pairs + """ + return self.__producer_pairs + + def emit_df( + self, + pair_key: PairWithTimeframe, + dataframe: DataFrame + ) -> None: + """ + Send this dataframe as an ANALYZED_DF message to RPC + + :param pair_key: PairWithTimeframe tuple + :param data: Tuple containing the DataFrame and the datetime it was cached + """ + if self.__rpc: + self.__rpc.send_msg( + { + 'type': RPCMessageType.ANALYZED_DF, + 'data': { + 'key': pair_key, + 'value': dataframe_to_json(dataframe) + } + } + ) + def add_external_df( self, pair: str, @@ -101,7 +141,6 @@ class DataProvider: # For multiple leaders, if the data already exists, we'd merge self.__external_pairs_df[pair_key] = (dataframe, datetime.now(timezone.utc)) - self._set_data_event(pair_key) def get_external_df( self, @@ -120,59 +159,11 @@ class DataProvider: pair_key = (pair, timeframe, candle_type) if pair_key not in self.__external_pairs_df: - self._wait_on_data(pair_key) - - if pair_key not in self.__external_pairs_df: - return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) + # We don't have this data yet, return empty DataFrame and datetime (01-01-1970) + return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) return self.__external_pairs_df[pair_key] - def _set_data_event(self, key: PairWithTimeframe): - """ - Depending on the WaitDataPolicy, if an event exists for this PairWithTimeframe - then set the event to release main thread from waiting. - - :param key: PairWithTimeframe - """ - pair_event = self.__external_pairs_event.get(key) - - if pair_event: - num_concat, event = pair_event - self.__external_pairs_event[key] = (num_concat + 1, event) - - if self._wait_data_policy == WaitDataPolicy.one: - logger.debug("Setting Data as policy is One") - event.set() - elif self._wait_data_policy == WaitDataPolicy.all and num_concat == self._num_sources: - logger.debug("Setting Data as policy is all, and is complete") - event.set() - - del self.__external_pairs_event[key] - - def _wait_on_data(self, key: PairWithTimeframe): - """ - Depending on the WaitDataPolicy, we will create and wait on an event until - set that determines the full amount of data is available - - :param key: PairWithTimeframe - """ - if self._wait_data_policy is not WaitDataPolicy.none: - pair, timeframe, candle_type = key - - pair_event = Event() - self.__external_pairs_event[key] = (0, pair_event) - - timeout = self._wait_data_timeout \ - if self._wait_data_policy is not WaitDataPolicy.all else 0 - - timeout_str = f"for {timeout} seconds" if timeout > 0 else "indefinitely" - logger.debug(f"Waiting for external data on {pair} for {timeout_str}") - - if timeout > 0: - pair_event.wait(timeout=timeout) - else: - pair_event.wait() - def add_pairlisthandler(self, pairlists) -> None: """ Allow adding pairlisthandler after initialization diff --git a/freqtrade/enums/__init__.py b/freqtrade/enums/__init__.py index 229d770ce..d32e04e17 100644 --- a/freqtrade/enums/__init__.py +++ b/freqtrade/enums/__init__.py @@ -3,7 +3,6 @@ from freqtrade.enums.backteststate import BacktestState from freqtrade.enums.candletype import CandleType from freqtrade.enums.exitchecktuple import ExitCheckTuple from freqtrade.enums.exittype import ExitType -from freqtrade.enums.externalmessages import WaitDataPolicy from freqtrade.enums.marginmode import MarginMode from freqtrade.enums.ordertypevalue import OrderTypeValues from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType diff --git a/freqtrade/enums/externalmessages.py b/freqtrade/enums/externalmessages.py deleted file mode 100644 index e43899ab5..000000000 --- a/freqtrade/enums/externalmessages.py +++ /dev/null @@ -1,7 +0,0 @@ -from enum import Enum - - -class WaitDataPolicy(str, Enum): - none = "none" - one = "one" - all = "all" diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index c0d658c61..f7b7ad80b 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -85,21 +85,19 @@ class FreqtradeBot(LoggingMixin): # Keep this at the end of this initialization method. self.rpc: RPCManager = RPCManager(self) - self.dataprovider = DataProvider(self.config, self.exchange, self.pairlists) + self.dataprovider = DataProvider(self.config, self.exchange, self.rpc, self.pairlists) # Attach Dataprovider to strategy instance self.strategy.dp = self.dataprovider # Attach Wallets to strategy instance self.strategy.wallets = self.wallets - # Attach rpc to strategy instance - self.strategy.rpc = self.rpc # Initializing Edge only if enabled self.edge = Edge(self.config, self.exchange, self.strategy) if \ self.config.get('edge', {}).get('enabled', False) else None # Init ExternalMessageConsumer if enabled - self.emc = ExternalMessageConsumer(self.rpc._rpc, self.config) if \ + self.emc = ExternalMessageConsumer(self.config, self.dataprovider) if \ self.config.get('external_message_consumer', {}).get('enabled', False) else None self.active_pair_whitelist = self._refresh_active_whitelist() @@ -201,11 +199,11 @@ class FreqtradeBot(LoggingMixin): strategy_safe_wrapper(self.strategy.bot_loop_start, supress_error=True)() - if self.emc: - leader_pairs = self.pairlists._whitelist - self.strategy.analyze_external(self.active_pair_whitelist, leader_pairs) - else: - self.strategy.analyze(self.active_pair_whitelist) + # This just means we won't broadcast dataframes if we're listening to a producer + # Doesn't necessarily NEED to be this way, as maybe we'd like to broadcast + # even if we are using external dataframes in the future. + self.strategy.analyze(self.active_pair_whitelist, + external_data=self.dataprovider.external_data_enabled) with self._exit_lock: # Check for exchange cancelations, timeouts and user requested replace diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index 88bae099a..d7c7239d1 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -1,22 +1,48 @@ import logging +from typing import Any, Dict from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect -from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc_optional +from freqtrade.enums import RPCMessageType, RPCRequestType +from freqtrade.rpc.api_server.deps import get_channel_manager +from freqtrade.rpc.api_server.ws.channel import WebSocketChannel from freqtrade.rpc.api_server.ws.utils import is_websocket_alive +# from typing import Any, Dict + + logger = logging.getLogger(__name__) # Private router, protected by API Key authentication router = APIRouter() +# We are passed a Channel object, we can only do sync functions on that channel object +def _process_consumer_request(request: Dict[str, Any], channel: WebSocketChannel): + type, data = request.get('type'), request.get('data') + + # If the request is empty, do nothing + if not data: + return + + # If we have a request of type SUBSCRIBE, set the topics in this channel + if type == RPCRequestType.SUBSCRIBE: + if isinstance(data, list): + logger.error(f"Improper request from channel: {channel} - {request}") + return + + # If all topics passed are a valid RPCMessageType, set subscriptions on channel + if all([any(x.value == topic for x in RPCMessageType) for topic in data]): + + logger.debug(f"{channel} subscribed to topics: {data}") + channel.set_subscriptions(data) + + @router.websocket("/message/ws") async def message_endpoint( ws: WebSocket, - channel_manager=Depends(get_channel_manager), - rpc=Depends(get_rpc_optional) + channel_manager=Depends(get_channel_manager) ): try: if is_websocket_alive(ws): @@ -32,9 +58,8 @@ async def message_endpoint( request = await channel.recv() # Process the request here. Should this be a method of RPC? - if rpc: - logger.info(f"Request: {request}") - rpc._process_consumer_request(request, channel) + logger.info(f"Request: {request}") + _process_consumer_request(request, channel) except WebSocketDisconnect: # Handle client disconnects diff --git a/freqtrade/rpc/emc.py b/freqtrade/rpc/emc.py index 42061079a..3d78bc257 100644 --- a/freqtrade/rpc/emc.py +++ b/freqtrade/rpc/emc.py @@ -12,9 +12,9 @@ from typing import Any, Dict import websockets +from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import RPCMessageType, RPCRequestType from freqtrade.misc import json_to_dataframe, remove_entry_exit_signals -from freqtrade.rpc import RPC from freqtrade.rpc.api_server.ws.channel import WebSocketChannel @@ -29,11 +29,11 @@ class ExternalMessageConsumer: def __init__( self, - rpc: RPC, config: Dict[str, Any], + dataprovider: DataProvider ): - self._rpc = rpc self._config = config + self._dp = dataprovider self._running = False self._thread = None @@ -99,12 +99,12 @@ class ExternalMessageConsumer: """ The main task coroutine """ - rpc_lock = asyncio.Lock() + lock = asyncio.Lock() try: # Create a connection to each producer self._sub_tasks = [ - self._loop.create_task(self._handle_producer_connection(producer, rpc_lock)) + self._loop.create_task(self._handle_producer_connection(producer, lock)) for producer in self.producers ] @@ -115,73 +115,90 @@ class ExternalMessageConsumer: # Stop the loop once we are done self._loop.stop() - async def _handle_producer_connection(self, producer, lock): + async def _handle_producer_connection(self, producer: Dict[str, Any], lock: asyncio.Lock): """ Main connection loop for the consumer + + :param producer: Dictionary containing producer info: {'url': '', 'ws_token': ''} + :param lock: An asyncio Lock """ try: - while True: - try: - url, token = producer['url'], producer['ws_token'] - ws_url = f"{url}?token={token}" - - async with websockets.connect(ws_url) as ws: - logger.info("Connection successful") - channel = WebSocketChannel(ws) - - # Tell the producer we only want these topics - # Should always be the first thing we send - await channel.send( - self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics) - ) - - # Now receive data, if none is within the time limit, ping - while True: - try: - message = await asyncio.wait_for( - channel.recv(), - timeout=5 - ) - - async with lock: - # Handle the data here - # We use a lock because it will call RPC methods - self.handle_producer_message(message) - - except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): - # We haven't received data yet. Check the connection and continue. - try: - # ping - ping = await channel.ping() - - await asyncio.wait_for(ping, timeout=self.ping_timeout) - logger.debug(f"Connection to {url} still alive...") - - continue - except Exception: - logger.info( - f"Ping error {url} - retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - - break - except Exception as e: - logger.exception(e) - continue - except ( - socket.gaierror, - ConnectionRefusedError, - websockets.exceptions.InvalidStatusCode - ) as e: - logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - - continue - + await self._create_connection(producer, lock) except asyncio.CancelledError: # Exit silently pass - def compose_consumer_request(self, type_: str, data: Any) -> Dict[str, Any]: + async def _create_connection(self, producer: Dict[str, Any], lock: asyncio.Lock): + """ + Actually creates and handles the websocket connection, pinging on timeout + and handling connection errors. + + :param producer: Dictionary containing producer info: {'url': '', 'ws_token': ''} + :param lock: An asyncio Lock + """ + while self._running: + try: + url, token = producer['url'], producer['ws_token'] + ws_url = f"{url}?token={token}" + + # This will raise InvalidURI if the url is bad + async with websockets.connect(ws_url) as ws: + logger.info("Connection successful") + channel = WebSocketChannel(ws) + + # Tell the producer we only want these topics + # Should always be the first thing we send + await channel.send( + self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics) + ) + + # Now receive data, if none is within the time limit, ping + while True: + try: + message = await asyncio.wait_for( + channel.recv(), + timeout=self.reply_timeout + ) + + async with lock: + # Handle the message + self.handle_producer_message(message) + + except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): + # We haven't received data yet. Check the connection and continue. + try: + # ping + ping = await channel.ping() + + await asyncio.wait_for(ping, timeout=self.ping_timeout) + logger.debug(f"Connection to {url} still alive...") + + continue + except Exception: + logger.info( + f"Ping error {url} - retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + + break + except Exception as e: + logger.exception(e) + continue + except ( + socket.gaierror, + ConnectionRefusedError, + websockets.exceptions.InvalidStatusCode + ) as e: + logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + + continue + + # Catch invalid ws_url, and break the loop + except websockets.exceptions.InvalidURI as e: + logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") + break + + def compose_consumer_request(self, type_: RPCRequestType, data: Any) -> Dict[str, Any]: """ Create a request for sending to a producer @@ -211,9 +228,8 @@ class ExternalMessageConsumer: if message_type == RPCMessageType.WHITELIST: pairlist = message_data - # Add the pairlist data to the ExternalPairlist plugin - external_pairlist = self._rpc._freqtrade.pairlists._pairlist_handlers[0] - external_pairlist.add_pairlist_data(pairlist) + # Add the pairlist data to the DataProvider + self._dp.set_producer_pairs(pairlist) # Handle analyzed dataframes elif message_type == RPCMessageType.ANALYZED_DF: @@ -230,5 +246,4 @@ class ExternalMessageConsumer: dataframe = remove_entry_exit_signals(dataframe) # Add the dataframe to the dataprovider - dataprovider = self._rpc._freqtrade.dataprovider - dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) + self._dp.add_external_df(pair, timeframe, dataframe, candle_type) diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index a41d08d55..ed7f13a96 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -19,8 +19,8 @@ from freqtrade.configuration.timerange import TimeRange from freqtrade.constants import CANCEL_REASON, DATETIME_PRINT_FORMAT from freqtrade.data.history import load_data from freqtrade.data.metrics import calculate_max_drawdown -from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RPCMessageType, RPCRequestType, - SignalDirection, State, TradingMode) +from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, State, + TradingMode) from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.loggers import bufferHandler @@ -1089,13 +1089,3 @@ class RPC: 'last_process_loc': last_p.astimezone(tzlocal()).strftime(DATETIME_PRINT_FORMAT), 'last_process_ts': int(last_p.timestamp()), } - - # We are passed a Channel object, we can only do sync functions on that channel object - def _process_consumer_request(self, request, channel): - # Should we ensure that request is Dict[str, Any]? - type, data = request.get('type'), request.get('data') - - if type == RPCRequestType.SUBSCRIBE: - if all([any(x.value == topic for x in RPCMessageType) for topic in data]): - logger.debug(f"{channel} subscribed to topics: {data}") - channel.set_subscriptions(data) diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 7120928ff..a06b6506e 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -12,14 +12,13 @@ from pandas import DataFrame from freqtrade.constants import ListPairsWithTimeframes from freqtrade.data.dataprovider import DataProvider -from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RPCMessageType, SignalDirection, - SignalTagType, SignalType, TradingMode) +from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, SignalTagType, + SignalType, TradingMode) from freqtrade.enums.runmode import RunMode from freqtrade.exceptions import OperationalException, StrategyError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds -from freqtrade.misc import dataframe_to_json, remove_entry_exit_signals +from freqtrade.misc import remove_entry_exit_signals from freqtrade.persistence import Order, PairLocks, Trade -from freqtrade.rpc import RPCManager from freqtrade.strategy.hyper import HyperStrategyMixin from freqtrade.strategy.informative_decorator import (InformativeData, PopulateIndicators, _create_and_merge_informative_pair, @@ -113,7 +112,6 @@ class IStrategy(ABC, HyperStrategyMixin): # and wallets - access to the current balance. dp: DataProvider wallets: Optional[Wallets] = None - rpc: RPCManager # Filled from configuration stake_currency: str # container variable for strategy source code @@ -731,16 +729,8 @@ class IStrategy(ABC, HyperStrategyMixin): candle_type = self.config.get('candle_type_def', CandleType.SPOT) self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type) - if not external_data: - self.rpc.send_msg( - { - 'type': RPCMessageType.ANALYZED_DF, - 'data': { - 'key': (pair, self.timeframe, candle_type), - 'value': dataframe_to_json(dataframe) - } - } - ) + if populate_indicators: + self.dp.emit_df((pair, self.timeframe, candle_type), dataframe) else: logger.debug("Skipping TA Analysis for already analyzed candle") @@ -763,10 +753,7 @@ class IStrategy(ABC, HyperStrategyMixin): """ candle_type = self.config.get('candle_type_def', CandleType.SPOT) - if not external_data: - dataframe = self.dp.ohlcv(pair, self.timeframe, candle_type) - else: - dataframe, _ = self.dp.get_external_df(pair, self.timeframe, candle_type) + dataframe = self.dp.ohlcv(pair, self.timeframe, candle_type) if not isinstance(dataframe, DataFrame) or dataframe.empty: logger.warning('Empty candle (OHLCV) data for pair %s', pair) @@ -790,38 +777,15 @@ class IStrategy(ABC, HyperStrategyMixin): def analyze( self, - pairs: List[str] + pairs: List[str], + external_data: bool = False ) -> None: """ Analyze all pairs using analyze_pair(). :param pairs: List of pairs to analyze """ for pair in pairs: - self.analyze_pair(pair) - - def analyze_external(self, pairs: List[str], leader_pairs: List[str]) -> None: - """ - Analyze the pre-populated dataframes from the Leader - - :param pairs: The active pair whitelist - :param leader_pairs: The list of pairs from the Leaders - """ - - # Get the extra pairs not listed in Leader pairs, and process - # them normally. - # List order is not preserved when doing this! - # We use ^ instead of - for symmetric difference - extra_pairs = list(set(pairs) ^ set(leader_pairs)) - # These would be the pairs that we have trades in, which means - # we would have to analyze them normally - # Eventually maybe request data from the Leader if we don't have it? - - for pair in leader_pairs: - # Analyze the pairs, but get the dataframe from the external data - self.analyze_pair(pair, external_data=True) - - for pair in extra_pairs: - self.analyze_pair(pair) + self.analyze_pair(pair, external_data) @ staticmethod def preserve_df(dataframe: DataFrame) -> Tuple[int, float, datetime]: