2022-10-18 18:48:40 +00:00
|
|
|
|
|
|
|
import asyncio
|
|
|
|
import logging
|
|
|
|
import time
|
|
|
|
from threading import Thread
|
2023-01-05 10:53:37 +00:00
|
|
|
from typing import Dict, Set
|
2022-10-18 18:48:40 +00:00
|
|
|
|
2022-11-07 06:07:15 +00:00
|
|
|
import ccxt
|
|
|
|
|
2022-11-28 18:46:50 +00:00
|
|
|
from freqtrade.constants import Config, PairWithTimeframe
|
2022-10-18 18:48:40 +00:00
|
|
|
from freqtrade.enums.candletype import CandleType
|
2022-10-29 17:44:27 +00:00
|
|
|
from freqtrade.exchange.exchange import timeframe_to_seconds
|
2023-01-05 10:53:37 +00:00
|
|
|
from freqtrade.exchange.types import OHLCVResponse
|
2023-08-11 04:47:14 +00:00
|
|
|
from freqtrade.util.datetime_helpers import dt_from_ts
|
2022-10-18 18:48:40 +00:00
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2023-08-10 19:25:23 +00:00
|
|
|
class ExchangeWS:
|
2022-11-28 19:54:00 +00:00
|
|
|
def __init__(self, config: Config, ccxt_object: ccxt.Exchange) -> None:
|
2022-10-18 18:48:40 +00:00
|
|
|
self.config = config
|
|
|
|
self.ccxt_object = ccxt_object
|
2022-10-29 17:19:30 +00:00
|
|
|
self._background_tasks: Set[asyncio.Task] = set()
|
2022-10-28 05:22:53 +00:00
|
|
|
|
2022-11-28 18:46:50 +00:00
|
|
|
self._klines_watching: Set[PairWithTimeframe] = set()
|
|
|
|
self._klines_scheduled: Set[PairWithTimeframe] = set()
|
|
|
|
self.klines_last_refresh: Dict[PairWithTimeframe, float] = {}
|
|
|
|
self.klines_last_request: Dict[PairWithTimeframe, float] = {}
|
2022-11-29 05:42:48 +00:00
|
|
|
self._thread = Thread(name="ccxt_ws", target=self._start_forever)
|
2022-10-18 18:48:40 +00:00
|
|
|
self._thread.start()
|
2023-12-28 09:59:53 +00:00
|
|
|
self.__cleanup_called = False
|
2022-10-18 18:48:40 +00:00
|
|
|
|
2022-11-29 05:42:48 +00:00
|
|
|
def _start_forever(self) -> None:
|
2022-10-18 18:48:40 +00:00
|
|
|
self._loop = asyncio.new_event_loop()
|
2023-08-10 18:48:14 +00:00
|
|
|
try:
|
|
|
|
self._loop.run_forever()
|
|
|
|
finally:
|
|
|
|
if self._loop.is_running():
|
|
|
|
self._loop.stop()
|
2022-10-18 18:48:40 +00:00
|
|
|
|
2022-10-23 07:49:50 +00:00
|
|
|
def cleanup(self) -> None:
|
|
|
|
logger.debug("Cleanup called - stopping")
|
2022-11-11 05:46:14 +00:00
|
|
|
self._klines_watching.clear()
|
2023-12-28 09:59:53 +00:00
|
|
|
for task in self._background_tasks:
|
|
|
|
task.cancel()
|
2022-11-29 05:42:48 +00:00
|
|
|
if hasattr(self, '_loop'):
|
2023-12-28 09:59:53 +00:00
|
|
|
asyncio.run_coroutine_threadsafe(self._cleanup_async(), loop=self._loop)
|
|
|
|
while not self.__cleanup_called:
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
2023-08-10 18:48:14 +00:00
|
|
|
self._loop.call_soon_threadsafe(self._loop.stop)
|
|
|
|
|
2022-10-23 07:49:50 +00:00
|
|
|
self._thread.join()
|
|
|
|
logger.debug("Stopped")
|
|
|
|
|
2023-12-28 09:59:53 +00:00
|
|
|
async def _cleanup_async(self) -> None:
|
2024-02-10 15:07:41 +00:00
|
|
|
try:
|
|
|
|
await self.ccxt_object.close()
|
|
|
|
except Exception:
|
|
|
|
logger.exception("Exception in _cleanup_async")
|
|
|
|
finally:
|
|
|
|
self.__cleanup_called = True
|
2023-12-28 09:59:53 +00:00
|
|
|
|
2022-10-28 05:22:53 +00:00
|
|
|
def cleanup_expired(self) -> None:
|
|
|
|
"""
|
|
|
|
Remove pairs from watchlist if they've not been requested within
|
|
|
|
the last timeframe (+ offset)
|
|
|
|
"""
|
2022-11-11 05:46:14 +00:00
|
|
|
for p in list(self._klines_watching):
|
2022-10-28 05:22:53 +00:00
|
|
|
_, timeframe, _ = p
|
|
|
|
timeframe_s = timeframe_to_seconds(timeframe)
|
2022-11-11 05:46:14 +00:00
|
|
|
last_refresh = self.klines_last_request.get(p, 0)
|
2022-10-28 05:22:53 +00:00
|
|
|
if last_refresh > 0 and time.time() - last_refresh > timeframe_s + 20:
|
|
|
|
logger.info(f"Removing {p} from watchlist")
|
2022-11-11 05:46:14 +00:00
|
|
|
self._klines_watching.discard(p)
|
2022-10-18 18:48:40 +00:00
|
|
|
|
2022-11-28 18:49:01 +00:00
|
|
|
async def _schedule_while_true(self) -> None:
|
2022-10-18 18:48:40 +00:00
|
|
|
|
2022-11-11 05:46:14 +00:00
|
|
|
for p in self._klines_watching:
|
|
|
|
if p not in self._klines_scheduled:
|
|
|
|
self._klines_scheduled.add(p)
|
2022-10-18 18:48:40 +00:00
|
|
|
pair, timeframe, candle_type = p
|
|
|
|
task = asyncio.create_task(
|
2022-11-28 18:49:01 +00:00
|
|
|
self._continuously_async_watch_ohlcv(pair, timeframe, candle_type))
|
2022-10-18 18:48:40 +00:00
|
|
|
self._background_tasks.add(task)
|
2022-11-28 18:49:01 +00:00
|
|
|
task.add_done_callback(self._continuous_stopped)
|
2022-10-18 18:48:40 +00:00
|
|
|
|
2022-11-28 18:49:01 +00:00
|
|
|
def _continuous_stopped(self, task: asyncio.Task):
|
2022-10-18 18:48:40 +00:00
|
|
|
self._background_tasks.discard(task)
|
2022-10-23 07:49:50 +00:00
|
|
|
result = task.result()
|
|
|
|
logger.info(f"Task finished {result}")
|
|
|
|
# self._pairs_scheduled.discard(pair, timeframe, candle_type)
|
2022-10-18 18:48:40 +00:00
|
|
|
|
2022-11-28 18:49:01 +00:00
|
|
|
async def _continuously_async_watch_ohlcv(
|
2022-10-29 17:19:30 +00:00
|
|
|
self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
2022-11-07 06:07:15 +00:00
|
|
|
try:
|
2022-11-11 05:46:14 +00:00
|
|
|
while (pair, timeframe, candle_type) in self._klines_watching:
|
2022-11-07 06:07:15 +00:00
|
|
|
start = time.time()
|
|
|
|
data = await self.ccxt_object.watch_ohlcv(pair, timeframe)
|
2022-11-11 05:46:14 +00:00
|
|
|
self.klines_last_refresh[(pair, timeframe, candle_type)] = time.time()
|
2023-08-11 04:47:14 +00:00
|
|
|
logger.debug(
|
|
|
|
f"watch done {pair}, {timeframe}, data {len(data)} "
|
|
|
|
f"in {time.time() - start:.2f}s")
|
2022-11-07 06:07:15 +00:00
|
|
|
except ccxt.BaseError:
|
|
|
|
logger.exception("Exception in continuously_async_watch_ohlcv")
|
|
|
|
finally:
|
2022-11-11 05:46:14 +00:00
|
|
|
self._klines_watching.discard((pair, timeframe, candle_type))
|
2022-10-18 18:48:40 +00:00
|
|
|
|
|
|
|
def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
2022-11-11 05:46:14 +00:00
|
|
|
"""
|
|
|
|
Schedule a pair/timeframe combination to be watched
|
|
|
|
"""
|
|
|
|
self._klines_watching.add((pair, timeframe, candle_type))
|
|
|
|
self.klines_last_request[(pair, timeframe, candle_type)] = time.time()
|
2022-10-18 18:48:40 +00:00
|
|
|
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
|
2022-11-28 18:49:01 +00:00
|
|
|
asyncio.run_coroutine_threadsafe(self._schedule_while_true(), loop=self._loop)
|
2022-10-28 05:22:53 +00:00
|
|
|
self.cleanup_expired()
|
2022-10-29 17:19:30 +00:00
|
|
|
|
|
|
|
async def get_ohlcv(
|
2022-11-28 19:54:00 +00:00
|
|
|
self,
|
|
|
|
pair: str,
|
|
|
|
timeframe: str,
|
2023-08-02 16:11:25 +00:00
|
|
|
candle_type: CandleType,
|
|
|
|
candle_date: int,
|
2023-01-05 10:53:37 +00:00
|
|
|
) -> OHLCVResponse:
|
2022-10-29 17:19:30 +00:00
|
|
|
"""
|
|
|
|
Returns cached klines from ccxt's "watch" cache.
|
2023-08-02 16:11:25 +00:00
|
|
|
:param candle_date: timestamp of the end-time of the candle.
|
2022-10-29 17:19:30 +00:00
|
|
|
"""
|
|
|
|
candles = self.ccxt_object.ohlcvs.get(pair, {}).get(timeframe)
|
2023-08-02 16:11:25 +00:00
|
|
|
refresh_date = self.klines_last_refresh[(pair, timeframe, candle_type)]
|
|
|
|
drop_hint = False
|
|
|
|
if refresh_date > candle_date:
|
|
|
|
# Refreshed after candle was complete.
|
2023-11-28 05:34:01 +00:00
|
|
|
# logger.info(f"{candles[-1][0] // 1000} >= {candle_date}")
|
2023-08-02 16:11:25 +00:00
|
|
|
drop_hint = (candles[-1][0] // 1000) >= candle_date
|
2022-11-06 09:35:09 +00:00
|
|
|
logger.info(
|
|
|
|
f"watch result for {pair}, {timeframe} with length {len(candles)}, "
|
2023-08-11 04:47:14 +00:00
|
|
|
f"{dt_from_ts(candles[-1][0] // 1000)}, "
|
2023-11-28 05:34:01 +00:00
|
|
|
f"lref={dt_from_ts(self.klines_last_refresh[(pair, timeframe, candle_type)])}, "
|
2023-08-11 04:47:14 +00:00
|
|
|
f"candle_date={dt_from_ts(candle_date)}, {drop_hint=}"
|
2023-08-02 16:11:25 +00:00
|
|
|
)
|
|
|
|
return pair, timeframe, candle_type, candles, drop_hint
|