freqtrade_origin/freqtrade/exchange/exchange_ws.py
2024-05-31 06:40:14 +02:00

169 lines
6.4 KiB
Python

import asyncio
import logging
import time
from copy import deepcopy
from functools import partial
from threading import Thread
from typing import Dict, Set
import ccxt
from freqtrade.constants import Config, PairWithTimeframe
from freqtrade.enums.candletype import CandleType
from freqtrade.exchange.exchange import timeframe_to_seconds
from freqtrade.exchange.types import OHLCVResponse
from freqtrade.util import dt_ts, format_ms_time
logger = logging.getLogger(__name__)
class ExchangeWS:
def __init__(self, config: Config, ccxt_object: ccxt.Exchange) -> None:
self.config = config
self.ccxt_object = ccxt_object
self._background_tasks: Set[asyncio.Task] = set()
self._klines_watching: Set[PairWithTimeframe] = set()
self._klines_scheduled: Set[PairWithTimeframe] = set()
self.klines_last_refresh: Dict[PairWithTimeframe, float] = {}
self.klines_last_request: Dict[PairWithTimeframe, float] = {}
self._thread = Thread(name="ccxt_ws", target=self._start_forever)
self._thread.start()
self.__cleanup_called = False
def _start_forever(self) -> None:
self._loop = asyncio.new_event_loop()
try:
self._loop.run_forever()
finally:
if self._loop.is_running():
self._loop.stop()
def cleanup(self) -> None:
logger.debug("Cleanup called - stopping")
self._klines_watching.clear()
for task in self._background_tasks:
task.cancel()
if hasattr(self, '_loop'):
asyncio.run_coroutine_threadsafe(self._cleanup_async(), loop=self._loop)
while not self.__cleanup_called:
time.sleep(0.1)
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()
logger.debug("Stopped")
async def _cleanup_async(self) -> None:
try:
await self.ccxt_object.close()
except Exception:
logger.exception("Exception in _cleanup_async")
finally:
self.__cleanup_called = True
def cleanup_expired(self) -> None:
"""
Remove pairs from watchlist if they've not been requested within
the last timeframe (+ offset)
"""
changed = False
for p in list(self._klines_watching):
_, timeframe, _ = p
timeframe_s = timeframe_to_seconds(timeframe)
last_refresh = self.klines_last_request.get(p, 0)
if (
last_refresh > 0
and (dt_ts() - last_refresh) > ((timeframe_s + 20) * 1000)
):
logger.info(f"Removing {p} from watchlist")
self._klines_watching.discard(p)
changed = True
if changed:
logger.info(f"Removal done: new watch list ({len(self._klines_watching)})")
async def _schedule_while_true(self) -> None:
# For the ones we should be watching
for p in self._klines_watching:
# Check if they're already scheduled
if p not in self._klines_scheduled:
self._klines_scheduled.add(p)
pair, timeframe, candle_type = p
task = asyncio.create_task(
self._continuously_async_watch_ohlcv(pair, timeframe, candle_type))
self._background_tasks.add(task)
task.add_done_callback(
partial(
self._continuous_stopped,
pair=pair,
timeframe=timeframe,
candle_type=candle_type
)
)
def _continuous_stopped(
self, task: asyncio.Task, pair: str, timeframe: str, candle_type: CandleType):
self._background_tasks.discard(task)
if task.cancelled():
result = "cancelled"
else:
result = task.result()
logger.info(f"{pair}, {timeframe} Task finished: {result}")
self._klines_scheduled.discard((pair, timeframe, candle_type))
async def _continuously_async_watch_ohlcv(
self, pair: str, timeframe: str, candle_type: CandleType) -> None:
try:
while (pair, timeframe, candle_type) in self._klines_watching:
start = dt_ts()
data = await self.ccxt_object.watch_ohlcv(pair, timeframe)
self.klines_last_refresh[(pair, timeframe, candle_type)] = dt_ts()
logger.debug(
f"watch done {pair}, {timeframe}, data {len(data)} "
f"in {dt_ts() - start:.2f}s")
except ccxt.BaseError:
logger.exception(
f"Exception in continuously_async_watch_ohlcv for {pair}, {timeframe}")
finally:
self._klines_watching.discard((pair, timeframe, candle_type))
def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
"""
Schedule a pair/timeframe combination to be watched
"""
self._klines_watching.add((pair, timeframe, candle_type))
self.klines_last_request[(pair, timeframe, candle_type)] = dt_ts()
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
asyncio.run_coroutine_threadsafe(self._schedule_while_true(), loop=self._loop)
self.cleanup_expired()
async def get_ohlcv(
self,
pair: str,
timeframe: str,
candle_type: CandleType,
candle_date: int,
) -> OHLCVResponse:
"""
Returns cached klines from ccxt's "watch" cache.
:param candle_date: timestamp of the end-time of the candle.
"""
# Deepcopy the response - as it might be modified in the background as new messages arrive
candles = deepcopy(self.ccxt_object.ohlcvs.get(pair, {}).get(timeframe))
refresh_date = self.klines_last_refresh[(pair, timeframe, candle_type)]
drop_hint = False
if refresh_date > candle_date:
# Refreshed after candle was complete.
# logger.info(f"{candles[-1][0]} >= {candle_date}")
drop_hint = candles[-1][0] >= candle_date
logger.info(
f"watch result for {pair}, {timeframe} with length {len(candles)}, "
f"{format_ms_time(candles[-1][0])}, "
f"lref={format_ms_time(refresh_date)}, "
f"candle_date={format_ms_time(candle_date)}, {drop_hint=}"
)
return pair, timeframe, candle_type, candles, drop_hint