mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-10 10:21:59 +00:00
Improve telegram async tests
This commit is contained in:
parent
516b49ff50
commit
c06759223e
|
@ -1796,7 +1796,7 @@ class Telegram(RPCHandler):
|
|||
if new_market_dir is not None:
|
||||
self._rpc._update_market_direction(new_market_dir)
|
||||
await self._send_msg("Successfully updated market direction"
|
||||
f" from *{old_market_dir}* to *{new_market_dir}*.")
|
||||
f" from *{old_market_dir}* to *{new_market_dir}*.")
|
||||
else:
|
||||
raise RPCException("Invalid market direction provided. \n"
|
||||
"Valid market directions: *long, short, even, none*")
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import threading
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from functools import reduce
|
||||
from random import choice, randint
|
||||
|
@ -57,6 +58,20 @@ def update():
|
|||
return _update
|
||||
|
||||
|
||||
def patch_eventloop_threading(telegrambot):
|
||||
is_init = False
|
||||
|
||||
def thread_fuck():
|
||||
nonlocal is_init
|
||||
telegrambot._loop = asyncio.new_event_loop()
|
||||
is_init = True
|
||||
telegrambot._loop.run_forever()
|
||||
x = threading.Thread(target=thread_fuck, daemon=True)
|
||||
x.start()
|
||||
while not is_init:
|
||||
pass
|
||||
|
||||
|
||||
class DummyCls(Telegram):
|
||||
"""
|
||||
Dummy class for testing the Telegram @authorized_only decorator
|
||||
|
@ -99,6 +114,7 @@ def get_telegram_testobject(mocker, default_conf, mock=True, ftbot=None):
|
|||
rpc = RPC(ftbot)
|
||||
telegram = Telegram(rpc, default_conf)
|
||||
telegram._loop = MagicMock()
|
||||
patch_eventloop_threading(telegram)
|
||||
|
||||
return telegram, ftbot, msg_mock
|
||||
|
||||
|
@ -2216,7 +2232,7 @@ def test_send_msg_status_notification(default_conf, mocker) -> None:
|
|||
assert msg_mock.call_args[0][0] == '*Status:* `running`'
|
||||
|
||||
|
||||
def test_warning_notification(default_conf, mocker) -> None:
|
||||
async def test_warning_notification(default_conf, mocker) -> None:
|
||||
telegram, _, msg_mock = get_telegram_testobject(mocker, default_conf)
|
||||
telegram.send_msg({
|
||||
'type': RPCMessageType.WARNING,
|
||||
|
@ -2412,6 +2428,7 @@ async def test__send_msg_network_error(default_conf, mocker, caplog) -> None:
|
|||
assert log_has('Telegram NetworkError: Oh snap! Trying one more time.', caplog)
|
||||
|
||||
|
||||
@pytest.mark.filterwarnings("ignore:.*ChatPermissions")
|
||||
async def test__send_msg_keyboard(default_conf, mocker, caplog) -> None:
|
||||
mocker.patch('freqtrade.rpc.telegram.Telegram._init', MagicMock())
|
||||
bot = MagicMock()
|
||||
|
|
Loading…
Reference in New Issue
Block a user