freqtrade_origin/freqtrade/exchange/exchange_ws.py

187 lines
7.2 KiB
Python
Raw Normal View History

2022-10-18 18:48:40 +00:00
import asyncio
import logging
import time
2024-04-26 18:18:39 +00:00
from copy import deepcopy
2024-04-19 16:27:47 +00:00
from functools import partial
2022-10-18 18:48:40 +00:00
from threading import Thread
2023-01-05 10:53:37 +00:00
from typing import Dict, Set
2022-10-18 18:48:40 +00:00
2022-11-07 06:07:15 +00:00
import ccxt
2022-11-28 18:46:50 +00:00
from freqtrade.constants import Config, PairWithTimeframe
2022-10-18 18:48:40 +00:00
from freqtrade.enums.candletype import CandleType
2022-10-29 17:44:27 +00:00
from freqtrade.exchange.exchange import timeframe_to_seconds
2023-01-05 10:53:37 +00:00
from freqtrade.exchange.types import OHLCVResponse
from freqtrade.util import dt_ts, format_ms_time
2022-10-18 18:48:40 +00:00
logger = logging.getLogger(__name__)
2023-08-10 19:25:23 +00:00
class ExchangeWS:
2022-11-28 19:54:00 +00:00
def __init__(self, config: Config, ccxt_object: ccxt.Exchange) -> None:
2022-10-18 18:48:40 +00:00
self.config = config
self.ccxt_object = ccxt_object
self._background_tasks: Set[asyncio.Task] = set()
2022-10-28 05:22:53 +00:00
2022-11-28 18:46:50 +00:00
self._klines_watching: Set[PairWithTimeframe] = set()
self._klines_scheduled: Set[PairWithTimeframe] = set()
self.klines_last_refresh: Dict[PairWithTimeframe, float] = {}
self.klines_last_request: Dict[PairWithTimeframe, float] = {}
2022-11-29 05:42:48 +00:00
self._thread = Thread(name="ccxt_ws", target=self._start_forever)
2022-10-18 18:48:40 +00:00
self._thread.start()
2023-12-28 09:59:53 +00:00
self.__cleanup_called = False
2022-10-18 18:48:40 +00:00
2022-11-29 05:42:48 +00:00
def _start_forever(self) -> None:
2022-10-18 18:48:40 +00:00
self._loop = asyncio.new_event_loop()
2023-08-10 18:48:14 +00:00
try:
self._loop.run_forever()
finally:
if self._loop.is_running():
self._loop.stop()
2022-10-18 18:48:40 +00:00
def cleanup(self) -> None:
logger.debug("Cleanup called - stopping")
2022-11-11 05:46:14 +00:00
self._klines_watching.clear()
2023-12-28 09:59:53 +00:00
for task in self._background_tasks:
task.cancel()
2024-06-04 17:52:19 +00:00
if hasattr(self, "_loop") and not self._loop.is_closed():
self.reset_connections()
2023-12-28 09:59:53 +00:00
2023-08-10 18:48:14 +00:00
self._loop.call_soon_threadsafe(self._loop.stop)
2024-06-04 17:52:19 +00:00
time.sleep(0.1)
if not self._loop.is_closed():
self._loop.close()
2023-08-10 18:48:14 +00:00
self._thread.join()
logger.debug("Stopped")
def reset_connections(self) -> None:
"""
Reset all connections - avoids "connection-reset" errors that happen after ~9 days
"""
2024-06-04 17:52:19 +00:00
if hasattr(self, "_loop") and not self._loop.is_closed():
logger.info("Resetting WS connections.")
asyncio.run_coroutine_threadsafe(self._cleanup_async(), loop=self._loop)
while not self.__cleanup_called:
time.sleep(0.1)
self.__cleanup_called = False
2023-12-28 09:59:53 +00:00
async def _cleanup_async(self) -> None:
try:
await self.ccxt_object.close()
# Clear the cache.
# Not doing this will cause problems on startup with dynamic pairlists
self.ccxt_object.ohlcvs.clear()
except Exception:
logger.exception("Exception in _cleanup_async")
finally:
self.__cleanup_called = True
2023-12-28 09:59:53 +00:00
2022-10-28 05:22:53 +00:00
def cleanup_expired(self) -> None:
"""
Remove pairs from watchlist if they've not been requested within
the last timeframe (+ offset)
"""
2024-04-19 16:19:32 +00:00
changed = False
2022-11-11 05:46:14 +00:00
for p in list(self._klines_watching):
2022-10-28 05:22:53 +00:00
_, timeframe, _ = p
timeframe_s = timeframe_to_seconds(timeframe)
2022-11-11 05:46:14 +00:00
last_refresh = self.klines_last_request.get(p, 0)
2024-05-14 04:35:49 +00:00
if last_refresh > 0 and (dt_ts() - last_refresh) > ((timeframe_s + 20) * 1000):
2022-10-28 05:22:53 +00:00
logger.info(f"Removing {p} from watchlist")
2022-11-11 05:46:14 +00:00
self._klines_watching.discard(p)
2024-04-19 16:19:32 +00:00
changed = True
if changed:
2024-04-19 17:02:01 +00:00
logger.info(f"Removal done: new watch list ({len(self._klines_watching)})")
2022-10-18 18:48:40 +00:00
async def _schedule_while_true(self) -> None:
# For the ones we should be watching
2022-11-11 05:46:14 +00:00
for p in self._klines_watching:
# Check if they're already scheduled
2022-11-11 05:46:14 +00:00
if p not in self._klines_scheduled:
self._klines_scheduled.add(p)
2022-10-18 18:48:40 +00:00
pair, timeframe, candle_type = p
task = asyncio.create_task(
2024-05-14 04:35:49 +00:00
self._continuously_async_watch_ohlcv(pair, timeframe, candle_type)
)
2022-10-18 18:48:40 +00:00
self._background_tasks.add(task)
task.add_done_callback(
partial(
self._continuous_stopped,
pair=pair,
timeframe=timeframe,
2024-05-14 04:35:49 +00:00
candle_type=candle_type,
)
2024-04-19 16:27:47 +00:00
)
2022-10-18 18:48:40 +00:00
def _continuous_stopped(
2024-05-14 04:35:49 +00:00
self, task: asyncio.Task, pair: str, timeframe: str, candle_type: CandleType
):
2022-10-18 18:48:40 +00:00
self._background_tasks.discard(task)
2024-05-31 05:12:02 +00:00
result = "done"
if task.cancelled():
result = "cancelled"
else:
2024-05-31 05:12:02 +00:00
if (result1 := task.result()) is not None:
result = str(result1)
2024-05-31 05:12:02 +00:00
logger.info(f"{pair}, {timeframe}, {candle_type} - Task finished - {result}")
2024-04-20 06:55:37 +00:00
self._klines_scheduled.discard((pair, timeframe, candle_type))
2022-10-18 18:48:40 +00:00
async def _continuously_async_watch_ohlcv(
2024-05-14 04:35:49 +00:00
self, pair: str, timeframe: str, candle_type: CandleType
) -> None:
2022-11-07 06:07:15 +00:00
try:
2022-11-11 05:46:14 +00:00
while (pair, timeframe, candle_type) in self._klines_watching:
start = dt_ts()
2022-11-07 06:07:15 +00:00
data = await self.ccxt_object.watch_ohlcv(pair, timeframe)
self.klines_last_refresh[(pair, timeframe, candle_type)] = dt_ts()
2023-08-11 04:47:14 +00:00
logger.debug(
f"watch done {pair}, {timeframe}, data {len(data)} "
2024-05-14 04:35:49 +00:00
f"in {dt_ts() - start:.2f}s"
)
except ccxt.ExchangeClosedByUser:
logger.debug("Exchange connection closed by user")
2022-11-07 06:07:15 +00:00
except ccxt.BaseError:
2024-05-14 04:35:49 +00:00
logger.exception(f"Exception in continuously_async_watch_ohlcv for {pair}, {timeframe}")
2022-11-07 06:07:15 +00:00
finally:
2022-11-11 05:46:14 +00:00
self._klines_watching.discard((pair, timeframe, candle_type))
2022-10-18 18:48:40 +00:00
def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
2022-11-11 05:46:14 +00:00
"""
Schedule a pair/timeframe combination to be watched
"""
self._klines_watching.add((pair, timeframe, candle_type))
self.klines_last_request[(pair, timeframe, candle_type)] = dt_ts()
2022-10-18 18:48:40 +00:00
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
asyncio.run_coroutine_threadsafe(self._schedule_while_true(), loop=self._loop)
2022-10-28 05:22:53 +00:00
self.cleanup_expired()
async def get_ohlcv(
2024-05-14 04:35:49 +00:00
self,
pair: str,
timeframe: str,
candle_type: CandleType,
candle_date: int,
2023-01-05 10:53:37 +00:00
) -> OHLCVResponse:
"""
Returns cached klines from ccxt's "watch" cache.
2023-08-02 16:11:25 +00:00
:param candle_date: timestamp of the end-time of the candle.
"""
2024-04-26 18:18:39 +00:00
# Deepcopy the response - as it might be modified in the background as new messages arrive
candles = deepcopy(self.ccxt_object.ohlcvs.get(pair, {}).get(timeframe))
2023-08-02 16:11:25 +00:00
refresh_date = self.klines_last_refresh[(pair, timeframe, candle_type)]
drop_hint = False
if refresh_date > candle_date:
# Refreshed after candle was complete.
# logger.info(f"{candles[-1][0]} >= {candle_date}")
drop_hint = candles[-1][0] >= candle_date
2024-05-26 17:10:32 +00:00
logger.debug(
f"watch result for {pair}, {timeframe} with length {len(candles)}, "
f"{format_ms_time(candles[-1][0])}, "
2024-04-20 06:26:43 +00:00
f"lref={format_ms_time(refresh_date)}, "
f"candle_date={format_ms_time(candle_date)}, {drop_hint=}"
2024-05-14 04:35:49 +00:00
)
2023-08-02 16:11:25 +00:00
return pair, timeframe, candle_type, candles, drop_hint