mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-14 04:03:55 +00:00
196 lines
7.6 KiB
Python
196 lines
7.6 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from copy import deepcopy
|
|
from functools import partial
|
|
from threading import Thread
|
|
from typing import Dict, Set
|
|
|
|
import ccxt
|
|
|
|
from freqtrade.constants import Config, PairWithTimeframe
|
|
from freqtrade.enums.candletype import CandleType
|
|
from freqtrade.exchange.exchange import timeframe_to_seconds
|
|
from freqtrade.exchange.exchange_types import OHLCVResponse
|
|
from freqtrade.util import dt_ts, format_ms_time
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ExchangeWS:
|
|
def __init__(self, config: Config, ccxt_object: ccxt.Exchange) -> None:
|
|
self.config = config
|
|
self.ccxt_object = ccxt_object
|
|
self._background_tasks: Set[asyncio.Task] = set()
|
|
|
|
self._klines_watching: Set[PairWithTimeframe] = set()
|
|
self._klines_scheduled: Set[PairWithTimeframe] = set()
|
|
self.klines_last_refresh: Dict[PairWithTimeframe, float] = {}
|
|
self.klines_last_request: Dict[PairWithTimeframe, float] = {}
|
|
self._thread = Thread(name="ccxt_ws", target=self._start_forever)
|
|
self._thread.start()
|
|
self.__cleanup_called = False
|
|
|
|
def _start_forever(self) -> None:
|
|
self._loop = asyncio.new_event_loop()
|
|
try:
|
|
self._loop.run_forever()
|
|
finally:
|
|
if self._loop.is_running():
|
|
self._loop.stop()
|
|
|
|
def cleanup(self) -> None:
|
|
logger.debug("Cleanup called - stopping")
|
|
self._klines_watching.clear()
|
|
for task in self._background_tasks:
|
|
task.cancel()
|
|
if hasattr(self, "_loop") and not self._loop.is_closed():
|
|
self.reset_connections()
|
|
|
|
self._loop.call_soon_threadsafe(self._loop.stop)
|
|
time.sleep(0.1)
|
|
if not self._loop.is_closed():
|
|
self._loop.close()
|
|
|
|
self._thread.join()
|
|
logger.debug("Stopped")
|
|
|
|
def reset_connections(self) -> None:
|
|
"""
|
|
Reset all connections - avoids "connection-reset" errors that happen after ~9 days
|
|
"""
|
|
if hasattr(self, "_loop") and not self._loop.is_closed():
|
|
logger.info("Resetting WS connections.")
|
|
asyncio.run_coroutine_threadsafe(self._cleanup_async(), loop=self._loop)
|
|
while not self.__cleanup_called:
|
|
time.sleep(0.1)
|
|
self.__cleanup_called = False
|
|
|
|
async def _cleanup_async(self) -> None:
|
|
try:
|
|
await self.ccxt_object.close()
|
|
# Clear the cache.
|
|
# Not doing this will cause problems on startup with dynamic pairlists
|
|
self.ccxt_object.ohlcvs.clear()
|
|
except Exception:
|
|
logger.exception("Exception in _cleanup_async")
|
|
finally:
|
|
self.__cleanup_called = True
|
|
|
|
def _pop_history(self, paircomb: PairWithTimeframe) -> None:
|
|
"""
|
|
Remove history for a pair/timeframe combination from ccxt cache
|
|
"""
|
|
self.ccxt_object.ohlcvs.get(paircomb[0], {}).pop(paircomb[1], None)
|
|
|
|
def cleanup_expired(self) -> None:
|
|
"""
|
|
Remove pairs from watchlist if they've not been requested within
|
|
the last timeframe (+ offset)
|
|
"""
|
|
changed = False
|
|
for p in list(self._klines_watching):
|
|
_, timeframe, _ = p
|
|
timeframe_s = timeframe_to_seconds(timeframe)
|
|
last_refresh = self.klines_last_request.get(p, 0)
|
|
if last_refresh > 0 and (dt_ts() - last_refresh) > ((timeframe_s + 20) * 1000):
|
|
logger.info(f"Removing {p} from websocket watchlist.")
|
|
self._klines_watching.discard(p)
|
|
# Pop history to avoid getting stale data
|
|
self._pop_history(p)
|
|
changed = True
|
|
if changed:
|
|
logger.info(f"Removal done: new watch list ({len(self._klines_watching)})")
|
|
|
|
async def _schedule_while_true(self) -> None:
|
|
# For the ones we should be watching
|
|
for p in self._klines_watching:
|
|
# Check if they're already scheduled
|
|
if p not in self._klines_scheduled:
|
|
self._klines_scheduled.add(p)
|
|
pair, timeframe, candle_type = p
|
|
task = asyncio.create_task(
|
|
self._continuously_async_watch_ohlcv(pair, timeframe, candle_type)
|
|
)
|
|
self._background_tasks.add(task)
|
|
task.add_done_callback(
|
|
partial(
|
|
self._continuous_stopped,
|
|
pair=pair,
|
|
timeframe=timeframe,
|
|
candle_type=candle_type,
|
|
)
|
|
)
|
|
|
|
def _continuous_stopped(
|
|
self, task: asyncio.Task, pair: str, timeframe: str, candle_type: CandleType
|
|
):
|
|
self._background_tasks.discard(task)
|
|
result = "done"
|
|
if task.cancelled():
|
|
result = "cancelled"
|
|
else:
|
|
if (result1 := task.result()) is not None:
|
|
result = str(result1)
|
|
|
|
logger.info(f"{pair}, {timeframe}, {candle_type} - Task finished - {result}")
|
|
self._klines_scheduled.discard((pair, timeframe, candle_type))
|
|
self._pop_history((pair, timeframe, candle_type))
|
|
|
|
async def _continuously_async_watch_ohlcv(
|
|
self, pair: str, timeframe: str, candle_type: CandleType
|
|
) -> None:
|
|
try:
|
|
while (pair, timeframe, candle_type) in self._klines_watching:
|
|
start = dt_ts()
|
|
data = await self.ccxt_object.watch_ohlcv(pair, timeframe)
|
|
self.klines_last_refresh[(pair, timeframe, candle_type)] = dt_ts()
|
|
logger.debug(
|
|
f"watch done {pair}, {timeframe}, data {len(data)} "
|
|
f"in {dt_ts() - start:.2f}s"
|
|
)
|
|
except ccxt.ExchangeClosedByUser:
|
|
logger.debug("Exchange connection closed by user")
|
|
except ccxt.BaseError:
|
|
logger.exception(f"Exception in continuously_async_watch_ohlcv for {pair}, {timeframe}")
|
|
finally:
|
|
self._klines_watching.discard((pair, timeframe, candle_type))
|
|
|
|
def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
|
"""
|
|
Schedule a pair/timeframe combination to be watched
|
|
"""
|
|
self._klines_watching.add((pair, timeframe, candle_type))
|
|
self.klines_last_request[(pair, timeframe, candle_type)] = dt_ts()
|
|
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
|
|
asyncio.run_coroutine_threadsafe(self._schedule_while_true(), loop=self._loop)
|
|
self.cleanup_expired()
|
|
|
|
async def get_ohlcv(
|
|
self,
|
|
pair: str,
|
|
timeframe: str,
|
|
candle_type: CandleType,
|
|
candle_date: int,
|
|
) -> OHLCVResponse:
|
|
"""
|
|
Returns cached klines from ccxt's "watch" cache.
|
|
:param candle_date: timestamp of the end-time of the candle.
|
|
"""
|
|
# Deepcopy the response - as it might be modified in the background as new messages arrive
|
|
candles = deepcopy(self.ccxt_object.ohlcvs.get(pair, {}).get(timeframe))
|
|
refresh_date = self.klines_last_refresh[(pair, timeframe, candle_type)]
|
|
drop_hint = False
|
|
if refresh_date > candle_date:
|
|
# Refreshed after candle was complete.
|
|
# logger.info(f"{candles[-1][0]} >= {candle_date}")
|
|
drop_hint = candles[-1][0] >= candle_date
|
|
logger.debug(
|
|
f"watch result for {pair}, {timeframe} with length {len(candles)}, "
|
|
f"{format_ms_time(candles[-1][0])}, "
|
|
f"lref={format_ms_time(refresh_date)}, "
|
|
f"candle_date={format_ms_time(candle_date)}, {drop_hint=}"
|
|
)
|
|
return pair, timeframe, candle_type, candles, drop_hint
|