ccxt.pro - first attempt at test

This commit is contained in:
Matthias 2022-11-29 13:51:40 +01:00
parent ec6c54367b
commit 3d6cef3555

View File

@ -1,8 +1,11 @@
import asyncio
import threading
from time import sleep
from unittest.mock import MagicMock
from unittest.mock import AsyncMock, MagicMock
from freqtrade.enums import CandleType
from freqtrade.exchange.exchange_ws import ExchangeWS
@ -13,6 +16,7 @@ def test_exchangews_init(mocker):
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
exchange_ws = ExchangeWS(config, ccxt_object)
sleep(0.1)
assert exchange_ws.config == config
assert exchange_ws.ccxt_object == ccxt_object
@ -25,6 +29,46 @@ def test_exchangews_init(mocker):
assert exchange_ws._ob_watching == set()
assert exchange_ws._ob_scheduled == set()
assert exchange_ws.ob_last_request == {}
sleep(0.1)
# Cleanup
exchange_ws.cleanup()
def patch_eventloop_threading(exchange):
is_init = False
def thread_fuck():
nonlocal is_init
exchange._loop = asyncio.new_event_loop()
is_init = True
exchange._loop.run_forever()
x = threading.Thread(target=thread_fuck, daemon=True)
x.start()
while not is_init:
pass
async def test_exchangews_ohlcv(mocker):
config = MagicMock()
ccxt_object = MagicMock()
ccxt_object.watch_ohlcv = AsyncMock()
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
exchange_ws = ExchangeWS(config, ccxt_object)
patch_eventloop_threading(exchange_ws)
try:
assert exchange_ws._klines_watching == set()
assert exchange_ws._klines_scheduled == set()
exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT)
sleep(.5)
assert exchange_ws._klines_watching == {("ETH/BTC", "1m", CandleType.SPOT)}
assert exchange_ws._klines_scheduled == {("ETH/BTC", "1m", CandleType.SPOT)}
sleep(.1)
assert ccxt_object.watch_ohlcv.call_count == 1
except Exception as e:
print(e)
finally:
# Cleanup
exchange_ws.cleanup()