From 1d12985b70e91737f1182e2bebf908483a0a062a Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 23 Oct 2022 09:49:50 +0200 Subject: [PATCH] Update exchange_ws with cleanup function --- freqtrade/exchange/exchange_ws.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/freqtrade/exchange/exchange_ws.py b/freqtrade/exchange/exchange_ws.py index 55b0fb372..1805aedef 100644 --- a/freqtrade/exchange/exchange_ws.py +++ b/freqtrade/exchange/exchange_ws.py @@ -3,7 +3,7 @@ import asyncio import logging import time from threading import Thread -from typing import List, Set, Tuple +from typing import Dict, List, Set, Tuple from freqtrade.constants import Config from freqtrade.enums.candletype import CandleType @@ -20,13 +20,21 @@ class ExchangeWS(): self._background_tasks = set() self._pairs_watching: Set[Tuple[str, str, CandleType]] = set() self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set() + self.pairs_last_refresh: Dict[Tuple[str, str, CandleType], int] = {} self._thread.start() def start(self) -> None: self._loop = asyncio.new_event_loop() self._loop.run_forever() -## One task per Watch + def cleanup(self) -> None: + logger.debug("Cleanup called - stopping") + self._pairs_watching.clear() + self._loop.stop() + self._thread.join() + logger.debug("Stopped") + +# One task per Watch # async def schedule_schedule(self) -> None: # for p in self._pairs_watching: @@ -58,7 +66,7 @@ class ExchangeWS(): # asyncio.run_coroutine_threadsafe(self.schedule_one_task( # pair, timeframe, candle_type), loop=self._loop) -## End one task epr watch +# End one task epr watch async def schedule_while_true(self) -> None: @@ -73,8 +81,9 @@ class ExchangeWS(): def continuous_stopped(self, task: asyncio.Task): self._background_tasks.discard(task) - pair, timeframe, candle_type, data = task.result() - self._pairs_scheduled.discard(p) + result = task.result() + logger.info(f"Task finished {result}") + # self._pairs_scheduled.discard(pair, timeframe, candle_type) logger.info(f"Task finished {task}") @@ -82,8 +91,10 @@ class ExchangeWS(): self, pair: str, timeframe: str, candle_type: CandleType) -> Tuple[str, str, str, List]: while (pair, timeframe, candle_type) in self._pairs_watching: + logger.info(self._pairs_watching) start = time.time() - data = await self.ccxt_object.watch_ohlcv(pair, timeframe, ) + data = await self.ccxt_object.watch_ohlcv(pair, timeframe) + self.pairs_last_refresh[(pair, timeframe, candle_type)] = time.time() logger.info( f"watch1 done {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s")