Apply ruff formatting to ws branch

This commit is contained in:
Matthias 2024-05-14 06:35:49 +02:00
parent cabd36253e
commit f33c4db572
4 changed files with 28 additions and 31 deletions

View File

@ -93,7 +93,7 @@ EXCHANGE_HAS_OPTIONAL = [
# 'fetchOpenOrder', 'fetchClosedOrder', # replacement for fetchOrder # 'fetchOpenOrder', 'fetchClosedOrder', # replacement for fetchOrder
# 'fetchOpenOrders', 'fetchClosedOrders', # 'fetchOrders', # Refinding balance... # 'fetchOpenOrders', 'fetchClosedOrders', # 'fetchOrders', # Refinding balance...
# ccxt.pro # ccxt.pro
'watchOHLCV' "watchOHLCV",
] ]

View File

@ -1,4 +1,3 @@
import asyncio import asyncio
import logging import logging
import time import time
@ -46,7 +45,7 @@ class ExchangeWS:
self._klines_watching.clear() self._klines_watching.clear()
for task in self._background_tasks: for task in self._background_tasks:
task.cancel() task.cancel()
if hasattr(self, '_loop'): if hasattr(self, "_loop"):
asyncio.run_coroutine_threadsafe(self._cleanup_async(), loop=self._loop) asyncio.run_coroutine_threadsafe(self._cleanup_async(), loop=self._loop)
while not self.__cleanup_called: while not self.__cleanup_called:
time.sleep(0.1) time.sleep(0.1)
@ -74,10 +73,7 @@ class ExchangeWS:
_, timeframe, _ = p _, timeframe, _ = p
timeframe_s = timeframe_to_seconds(timeframe) timeframe_s = timeframe_to_seconds(timeframe)
last_refresh = self.klines_last_request.get(p, 0) last_refresh = self.klines_last_request.get(p, 0)
if ( if last_refresh > 0 and (dt_ts() - last_refresh) > ((timeframe_s + 20) * 1000):
last_refresh > 0
and (dt_ts() - last_refresh) > ((timeframe_s + 20) * 1000)
):
logger.info(f"Removing {p} from watchlist") logger.info(f"Removing {p} from watchlist")
self._klines_watching.discard(p) self._klines_watching.discard(p)
changed = True changed = True
@ -92,19 +88,21 @@ class ExchangeWS:
self._klines_scheduled.add(p) self._klines_scheduled.add(p)
pair, timeframe, candle_type = p pair, timeframe, candle_type = p
task = asyncio.create_task( task = asyncio.create_task(
self._continuously_async_watch_ohlcv(pair, timeframe, candle_type)) self._continuously_async_watch_ohlcv(pair, timeframe, candle_type)
)
self._background_tasks.add(task) self._background_tasks.add(task)
task.add_done_callback( task.add_done_callback(
partial( partial(
self._continuous_stopped, self._continuous_stopped,
pair=pair, pair=pair,
timeframe=timeframe, timeframe=timeframe,
candle_type=candle_type candle_type=candle_type,
) )
) )
def _continuous_stopped( def _continuous_stopped(
self, task: asyncio.Task, pair: str, timeframe: str, candle_type: CandleType): self, task: asyncio.Task, pair: str, timeframe: str, candle_type: CandleType
):
self._background_tasks.discard(task) self._background_tasks.discard(task)
if task.cancelled(): if task.cancelled():
result = "cancelled" result = "cancelled"
@ -115,7 +113,8 @@ class ExchangeWS:
self._klines_scheduled.discard((pair, timeframe, candle_type)) self._klines_scheduled.discard((pair, timeframe, candle_type))
async def _continuously_async_watch_ohlcv( async def _continuously_async_watch_ohlcv(
self, pair: str, timeframe: str, candle_type: CandleType) -> None: self, pair: str, timeframe: str, candle_type: CandleType
) -> None:
try: try:
while (pair, timeframe, candle_type) in self._klines_watching: while (pair, timeframe, candle_type) in self._klines_watching:
start = dt_ts() start = dt_ts()
@ -123,10 +122,10 @@ class ExchangeWS:
self.klines_last_refresh[(pair, timeframe, candle_type)] = dt_ts() self.klines_last_refresh[(pair, timeframe, candle_type)] = dt_ts()
logger.debug( logger.debug(
f"watch done {pair}, {timeframe}, data {len(data)} " f"watch done {pair}, {timeframe}, data {len(data)} "
f"in {dt_ts() - start:.2f}s") f"in {dt_ts() - start:.2f}s"
)
except ccxt.BaseError: except ccxt.BaseError:
logger.exception( logger.exception(f"Exception in continuously_async_watch_ohlcv for {pair}, {timeframe}")
f"Exception in continuously_async_watch_ohlcv for {pair}, {timeframe}")
finally: finally:
self._klines_watching.discard((pair, timeframe, candle_type)) self._klines_watching.discard((pair, timeframe, candle_type))
@ -141,11 +140,11 @@ class ExchangeWS:
self.cleanup_expired() self.cleanup_expired()
async def get_ohlcv( async def get_ohlcv(
self, self,
pair: str, pair: str,
timeframe: str, timeframe: str,
candle_type: CandleType, candle_type: CandleType,
candle_date: int, candle_date: int,
) -> OHLCVResponse: ) -> OHLCVResponse:
""" """
Returns cached klines from ccxt's "watch" cache. Returns cached klines from ccxt's "watch" cache.
@ -164,5 +163,5 @@ class ExchangeWS:
f"{format_ms_time(candles[-1][0])}, " f"{format_ms_time(candles[-1][0])}, "
f"lref={format_ms_time(refresh_date)}, " f"lref={format_ms_time(refresh_date)}, "
f"candle_date={format_ms_time(candle_date)}, {drop_hint=}" f"candle_date={format_ms_time(candle_date)}, {drop_hint=}"
) )
return pair, timeframe, candle_type, candles, drop_hint return pair, timeframe, candle_type, candles, drop_hint

View File

@ -8,7 +8,6 @@ from freqtrade.exchange.exchange_ws import ExchangeWS
def test_exchangews_init(mocker): def test_exchangews_init(mocker):
config = MagicMock() config = MagicMock()
ccxt_object = MagicMock() ccxt_object = MagicMock()
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock()) mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
@ -36,6 +35,7 @@ def patch_eventloop_threading(exchange):
exchange._loop = asyncio.new_event_loop() exchange._loop = asyncio.new_event_loop()
is_init = True is_init = True
exchange._loop.run_forever() exchange._loop.run_forever()
x = threading.Thread(target=thread_fuck, daemon=True) x = threading.Thread(target=thread_fuck, daemon=True)
x.start() x.start()
while not is_init: while not is_init:
@ -52,16 +52,15 @@ async def test_exchangews_ohlcv(mocker):
exchange_ws = ExchangeWS(config, ccxt_object) exchange_ws = ExchangeWS(config, ccxt_object)
patch_eventloop_threading(exchange_ws) patch_eventloop_threading(exchange_ws)
try: try:
assert exchange_ws._klines_watching == set() assert exchange_ws._klines_watching == set()
assert exchange_ws._klines_scheduled == set() assert exchange_ws._klines_scheduled == set()
exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT) exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT)
sleep(.5) sleep(0.5)
assert exchange_ws._klines_watching == {("ETH/BTC", "1m", CandleType.SPOT)} assert exchange_ws._klines_watching == {("ETH/BTC", "1m", CandleType.SPOT)}
assert exchange_ws._klines_scheduled == {("ETH/BTC", "1m", CandleType.SPOT)} assert exchange_ws._klines_scheduled == {("ETH/BTC", "1m", CandleType.SPOT)}
sleep(.1) sleep(0.1)
assert ccxt_object.watch_ohlcv.call_count == 1 assert ccxt_object.watch_ohlcv.call_count == 1
except Exception as e: except Exception as e:
print(e) print(e)

View File

@ -20,15 +20,14 @@ from tests.exchange_online.conftest import EXCHANGE_WS_FIXTURE_TYPE
@pytest.mark.longrun @pytest.mark.longrun
class TestCCXTExchangeWs: class TestCCXTExchangeWs:
def test_ccxt_ohlcv(self, exchange_ws: EXCHANGE_WS_FIXTURE_TYPE, caplog, mocker): def test_ccxt_ohlcv(self, exchange_ws: EXCHANGE_WS_FIXTURE_TYPE, caplog, mocker):
exch, exchangename, pair = exchange_ws exch, exchangename, pair = exchange_ws
assert exch._ws_async is not None assert exch._ws_async is not None
timeframe = '1m' timeframe = "1m"
pair_tf = (pair, timeframe, CandleType.SPOT) pair_tf = (pair, timeframe, CandleType.SPOT)
m_hist = mocker.spy(exch, '_async_get_historic_ohlcv') m_hist = mocker.spy(exch, "_async_get_historic_ohlcv")
m_cand = mocker.spy(exch, '_async_get_candle_history') m_cand = mocker.spy(exch, "_async_get_candle_history")
res = exch.refresh_latest_ohlcv([pair_tf]) res = exch.refresh_latest_ohlcv([pair_tf])
assert m_cand.call_count == 1 assert m_cand.call_count == 1
@ -45,7 +44,7 @@ class TestCCXTExchangeWs:
df1 = res[pair_tf] df1 = res[pair_tf]
caplog.set_level(logging.DEBUG) caplog.set_level(logging.DEBUG)
set_loggers(1) set_loggers(1)
assert df1.iloc[-1]['date'] == curr_candle assert df1.iloc[-1]["date"] == curr_candle
# Wait until the next candle (might be up to 1 minute). # Wait until the next candle (might be up to 1 minute).
while True: while True:
@ -53,9 +52,9 @@ class TestCCXTExchangeWs:
res = exch.refresh_latest_ohlcv([pair_tf]) res = exch.refresh_latest_ohlcv([pair_tf])
df2 = res[pair_tf] df2 = res[pair_tf]
assert df2 is not None assert df2 is not None
if df2.iloc[-1]['date'] == next_candle: if df2.iloc[-1]["date"] == next_candle:
break break
assert df2.iloc[-1]['date'] == curr_candle assert df2.iloc[-1]["date"] == curr_candle
sleep(1) sleep(1)
assert m_hist.call_count == 0 assert m_hist.call_count == 0