Merge pull request #1689 from hroff-1902/main_refactoring

Main.py and freqtradebot refactoring
This commit is contained in:
Misagh 2019-04-04 11:19:15 +02:00 committed by GitHub
commit 9d6d60dcf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 338 additions and 255 deletions

View File

@ -4,17 +4,15 @@ Freqtrade is the main module of this bot. It contains the class Freqtrade()
import copy
import logging
import time
import traceback
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
import arrow
from requests.exceptions import RequestException
import sdnotify
from freqtrade import (DependencyException, OperationalException,
TemporaryError, __version__, constants, persistence)
__version__, constants, persistence)
from freqtrade.data.converter import order_book_to_dataframe
from freqtrade.data.dataprovider import DataProvider
from freqtrade.edge import Edge
@ -42,20 +40,14 @@ class FreqtradeBot(object):
to get the config dict.
"""
logger.info(
'Starting freqtrade %s',
__version__,
)
logger.info('Starting freqtrade %s', __version__)
# Init bot states
# Init bot state
self.state = State.STOPPED
# Init objects
self.config = config
self._sd_notify = sdnotify.SystemdNotifier() if \
self.config.get('internals', {}).get('sd_notify', False) else None
self.strategy: IStrategy = StrategyResolver(self.config).strategy
self.rpc: RPCManager = RPCManager(self)
@ -79,29 +71,12 @@ class FreqtradeBot(object):
self.config.get('edge', {}).get('enabled', False) else None
self.active_pair_whitelist: List[str] = self.config['exchange']['pair_whitelist']
self._init_modules()
# Tell the systemd that we completed initialization phase
if self._sd_notify:
logger.debug("sd_notify: READY=1")
self._sd_notify.notify("READY=1")
def _init_modules(self) -> None:
"""
Initializes all modules and updates the config
:return: None
"""
# Initialize all modules
persistence.init(self.config)
# Set initial application state
# Set initial bot state from config
initial_state = self.config.get('initial_state')
if initial_state:
self.state = State[initial_state.upper()]
else:
self.state = State.STOPPED
self.state = State[initial_state.upper()] if initial_state else State.STOPPED
def cleanup(self) -> None:
"""
@ -113,130 +88,50 @@ class FreqtradeBot(object):
self.rpc.cleanup()
persistence.cleanup()
def stopping(self) -> None:
# Tell systemd that we are exiting now
if self._sd_notify:
logger.debug("sd_notify: STOPPING=1")
self._sd_notify.notify("STOPPING=1")
def reconfigure(self) -> None:
# Tell systemd that we initiated reconfiguring
if self._sd_notify:
logger.debug("sd_notify: RELOADING=1")
self._sd_notify.notify("RELOADING=1")
def worker(self, old_state: State = None) -> State:
"""
Trading routine that must be run at each loop
:param old_state: the previous service state from the previous call
:return: current service state
"""
# Log state transition
state = self.state
if state != old_state:
self.rpc.send_msg({
'type': RPCMessageType.STATUS_NOTIFICATION,
'status': f'{state.name.lower()}'
})
logger.info('Changing state to: %s', state.name)
if state == State.RUNNING:
self.rpc.startup_messages(self.config, self.pairlists)
throttle_secs = self.config.get('internals', {}).get(
'process_throttle_secs',
constants.PROCESS_THROTTLE_SECS
)
if state == State.STOPPED:
# Ping systemd watchdog before sleeping in the stopped state
if self._sd_notify:
logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: STOPPED.")
self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: STOPPED.")
time.sleep(throttle_secs)
elif state == State.RUNNING:
# Ping systemd watchdog before throttling
if self._sd_notify:
logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: RUNNING.")
self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: RUNNING.")
self._throttle(func=self._process, min_secs=throttle_secs)
return state
def _throttle(self, func: Callable[..., Any], min_secs: float, *args, **kwargs) -> Any:
"""
Throttles the given callable that it
takes at least `min_secs` to finish execution.
:param func: Any callable
:param min_secs: minimum execution time in seconds
:return: Any
"""
start = time.time()
result = func(*args, **kwargs)
end = time.time()
duration = max(min_secs - (end - start), 0.0)
logger.debug('Throttling %s for %.2f seconds', func.__name__, duration)
time.sleep(duration)
return result
def _process(self) -> bool:
def process(self) -> bool:
"""
Queries the persistence layer for open trades and handles them,
otherwise a new trade is created.
:return: True if one or more trades has been created or closed, False otherwise
"""
state_changed = False
try:
# Check whether markets have to be reloaded
self.exchange._reload_markets()
# Refresh whitelist
self.pairlists.refresh_pairlist()
self.active_pair_whitelist = self.pairlists.whitelist
# Check whether markets have to be reloaded
self.exchange._reload_markets()
# Calculating Edge positioning
if self.edge:
self.edge.calculate()
self.active_pair_whitelist = self.edge.adjust(self.active_pair_whitelist)
# Refresh whitelist
self.pairlists.refresh_pairlist()
self.active_pair_whitelist = self.pairlists.whitelist
# Query trades from persistence layer
trades = Trade.get_open_trades()
# Calculating Edge positioning
if self.edge:
self.edge.calculate()
self.active_pair_whitelist = self.edge.adjust(self.active_pair_whitelist)
# Extend active-pair whitelist with pairs from open trades
# It ensures that tickers are downloaded for open trades
self._extend_whitelist_with_trades(self.active_pair_whitelist, trades)
# Query trades from persistence layer
trades = Trade.get_open_trades()
# Refreshing candles
self.dataprovider.refresh(self._create_pair_whitelist(self.active_pair_whitelist),
self.strategy.informative_pairs())
# Extend active-pair whitelist with pairs from open trades
# It ensures that tickers are downloaded for open trades
self._extend_whitelist_with_trades(self.active_pair_whitelist, trades)
# First process current opened trades
for trade in trades:
state_changed |= self.process_maybe_execute_sell(trade)
# Refreshing candles
self.dataprovider.refresh(self._create_pair_whitelist(self.active_pair_whitelist),
self.strategy.informative_pairs())
# Then looking for buy opportunities
if len(trades) < self.config['max_open_trades']:
state_changed = self.process_maybe_execute_buy()
# First process current opened trades
for trade in trades:
state_changed |= self.process_maybe_execute_sell(trade)
if 'unfilledtimeout' in self.config:
# Check and handle any timed out open orders
self.check_handle_timedout()
Trade.session.flush()
# Then looking for buy opportunities
if len(trades) < self.config['max_open_trades']:
state_changed = self.process_maybe_execute_buy()
if 'unfilledtimeout' in self.config:
# Check and handle any timed out open orders
self.check_handle_timedout()
Trade.session.flush()
except TemporaryError as error:
logger.warning(f"Error: {error}, retrying in {constants.RETRY_TIMEOUT} seconds...")
time.sleep(constants.RETRY_TIMEOUT)
except OperationalException:
tb = traceback.format_exc()
hint = 'Issue `/start` if you think it is safe to restart.'
self.rpc.send_msg({
'type': RPCMessageType.STATUS_NOTIFICATION,
'status': f'OperationalException:\n```\n{tb}```{hint}'
})
logger.exception('OperationalException. Stopping trader ...')
self.state = State.STOPPED
return state_changed
def _extend_whitelist_with_trades(self, whitelist: List[str], trades: List[Any]):

View File

@ -10,10 +10,9 @@ from typing import List
from freqtrade import OperationalException
from freqtrade.arguments import Arguments
from freqtrade.configuration import Configuration, set_loggers
from freqtrade.freqtradebot import FreqtradeBot
from freqtrade.state import State
from freqtrade.rpc import RPCMessageType
from freqtrade.configuration import set_loggers
from freqtrade.worker import Worker
logger = logging.getLogger('freqtrade')
@ -27,7 +26,7 @@ def main(sysargv: List[str]) -> None:
sysargv,
'Free, open source crypto trading bot'
)
args = arguments.get_parsed_arg()
args: Namespace = arguments.get_parsed_arg()
# A subcommand has been issued.
# Means if Backtesting or Hyperopt have been called we exit the bot
@ -35,20 +34,12 @@ def main(sysargv: List[str]) -> None:
args.func(args)
return
freqtrade = None
worker = None
return_code = 1
try:
# Load and validate configuration
config = Configuration(args, None).get_config()
# Init the bot
freqtrade = FreqtradeBot(config)
state = None
while True:
state = freqtrade.worker(old_state=state)
if state == State.RELOAD_CONF:
freqtrade = reconfigure(freqtrade, args)
# Load and run worker
worker = Worker(args)
worker.run()
except KeyboardInterrupt:
logger.info('SIGINT received, aborting ...')
@ -59,34 +50,11 @@ def main(sysargv: List[str]) -> None:
except BaseException:
logger.exception('Fatal exception!')
finally:
if freqtrade:
freqtrade.stopping()
freqtrade.rpc.send_msg({
'type': RPCMessageType.STATUS_NOTIFICATION,
'status': 'process died'
})
freqtrade.cleanup()
if worker:
worker.exit()
sys.exit(return_code)
def reconfigure(freqtrade: FreqtradeBot, args: Namespace) -> FreqtradeBot:
"""
Cleans up current instance, reloads the configuration and returns the new instance
"""
freqtrade.reconfigure()
# Clean up current modules
freqtrade.cleanup()
# Create new instance
freqtrade = FreqtradeBot(Configuration(args, None).get_config())
freqtrade.rpc.send_msg({
'type': RPCMessageType.STATUS_NOTIFICATION,
'status': 'config reloaded'
})
return freqtrade
if __name__ == '__main__':
set_loggers()
main(sys.argv[1:])

View File

@ -16,6 +16,7 @@ from freqtrade.edge import Edge, PairInfo
from freqtrade.exchange import Exchange
from freqtrade.freqtradebot import FreqtradeBot
from freqtrade.resolvers import ExchangeResolver
from freqtrade.worker import Worker
logging.getLogger('').setLevel(logging.INFO)
@ -87,7 +88,7 @@ def get_patched_edge(mocker, config) -> Edge:
# Functions for recurrent object patching
def get_patched_freqtradebot(mocker, config) -> FreqtradeBot:
def patch_freqtradebot(mocker, config) -> None:
"""
This function patch _init_modules() to not call dependencies
:param mocker: a Mocker object to apply patches
@ -100,9 +101,17 @@ def get_patched_freqtradebot(mocker, config) -> FreqtradeBot:
mocker.patch('freqtrade.freqtradebot.RPCManager._init', MagicMock())
mocker.patch('freqtrade.freqtradebot.RPCManager.send_msg', MagicMock())
def get_patched_freqtradebot(mocker, config) -> FreqtradeBot:
patch_freqtradebot(mocker, config)
return FreqtradeBot(config)
def get_patched_worker(mocker, config) -> Worker:
patch_freqtradebot(mocker, config)
return Worker(args=None, config=config)
@pytest.fixture(autouse=True)
def patch_coinmarketcap(mocker) -> None:
"""

View File

@ -21,12 +21,13 @@ from freqtrade.state import State
from freqtrade.strategy.interface import SellCheckTuple, SellType
from freqtrade.tests.conftest import (log_has, log_has_re, patch_edge,
patch_exchange, patch_wallet)
from freqtrade.worker import Worker
# Functions for recurrent object patching
def get_patched_freqtradebot(mocker, config) -> FreqtradeBot:
def patch_freqtradebot(mocker, config) -> None:
"""
This function patch _init_modules() to not call dependencies
This function patches _init_modules() to not call dependencies
:param mocker: a Mocker object to apply patches
:param config: Config to pass to the bot
:return: None
@ -35,9 +36,29 @@ def get_patched_freqtradebot(mocker, config) -> FreqtradeBot:
mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock())
patch_exchange(mocker)
def get_patched_freqtradebot(mocker, config) -> FreqtradeBot:
"""
This function patches _init_modules() to not call dependencies
:param mocker: a Mocker object to apply patches
:param config: Config to pass to the bot
:return: FreqtradeBot
"""
patch_freqtradebot(mocker, config)
return FreqtradeBot(config)
def get_patched_worker(mocker, config) -> Worker:
"""
This function patches _init_modules() to not call dependencies
:param mocker: a Mocker object to apply patches
:param config: Config to pass to the bot
:return: Worker
"""
patch_freqtradebot(mocker, config)
return Worker(args=None, config=config)
def patch_get_signal(freqtrade: FreqtradeBot, value=(True, False)) -> None:
"""
:param mocker: mocker to patch IStrategy class
@ -61,7 +82,7 @@ def patch_RPCManager(mocker) -> MagicMock:
# Unit tests
def test_freqtradebot(mocker, default_conf, markets) -> None:
def test_freqtradebot_state(mocker, default_conf, markets) -> None:
mocker.patch('freqtrade.exchange.Exchange.markets', PropertyMock(return_value=markets))
freqtrade = get_patched_freqtradebot(mocker, default_conf)
assert freqtrade.state is State.RUNNING
@ -71,6 +92,16 @@ def test_freqtradebot(mocker, default_conf, markets) -> None:
assert freqtrade.state is State.STOPPED
def test_worker_state(mocker, default_conf, markets) -> None:
mocker.patch('freqtrade.exchange.Exchange.markets', PropertyMock(return_value=markets))
worker = get_patched_worker(mocker, default_conf)
assert worker.state is State.RUNNING
default_conf.pop('initial_state')
worker = Worker(args=None, config=default_conf)
assert worker.state is State.STOPPED
def test_cleanup(mocker, default_conf, caplog) -> None:
mock_cleanup = MagicMock()
mocker.patch('freqtrade.persistence.cleanup', mock_cleanup)
@ -82,28 +113,28 @@ def test_cleanup(mocker, default_conf, caplog) -> None:
def test_worker_running(mocker, default_conf, caplog) -> None:
mock_throttle = MagicMock()
mocker.patch('freqtrade.freqtradebot.FreqtradeBot._throttle', mock_throttle)
mocker.patch('freqtrade.worker.Worker._throttle', mock_throttle)
freqtrade = get_patched_freqtradebot(mocker, default_conf)
worker = get_patched_worker(mocker, default_conf)
state = freqtrade.worker(old_state=None)
state = worker._worker(old_state=None)
assert state is State.RUNNING
assert log_has('Changing state to: RUNNING', caplog.record_tuples)
assert mock_throttle.call_count == 1
# Check strategy is loaded, and received a dataprovider object
assert freqtrade.strategy
assert freqtrade.strategy.dp
assert isinstance(freqtrade.strategy.dp, DataProvider)
assert worker.freqtrade.strategy
assert worker.freqtrade.strategy.dp
assert isinstance(worker.freqtrade.strategy.dp, DataProvider)
def test_worker_stopped(mocker, default_conf, caplog) -> None:
mock_throttle = MagicMock()
mocker.patch('freqtrade.freqtradebot.FreqtradeBot._throttle', mock_throttle)
mocker.patch('freqtrade.worker.Worker._throttle', mock_throttle)
mock_sleep = mocker.patch('time.sleep', return_value=None)
freqtrade = get_patched_freqtradebot(mocker, default_conf)
freqtrade.state = State.STOPPED
state = freqtrade.worker(old_state=State.RUNNING)
worker = get_patched_worker(mocker, default_conf)
worker.state = State.STOPPED
state = worker._worker(old_state=State.RUNNING)
assert state is State.STOPPED
assert log_has('Changing state to: STOPPED', caplog.record_tuples)
assert mock_throttle.call_count == 0
@ -115,17 +146,17 @@ def test_throttle(mocker, default_conf, caplog) -> None:
return 42
caplog.set_level(logging.DEBUG)
freqtrade = get_patched_freqtradebot(mocker, default_conf)
worker = get_patched_worker(mocker, default_conf)
start = time.time()
result = freqtrade._throttle(throttled_func, min_secs=0.1)
result = worker._throttle(throttled_func, min_secs=0.1)
end = time.time()
assert result == 42
assert end - start > 0.1
assert log_has('Throttling throttled_func for 0.10 seconds', caplog.record_tuples)
result = freqtrade._throttle(throttled_func, min_secs=-1)
result = worker._throttle(throttled_func, min_secs=-1)
assert result == 42
@ -133,12 +164,12 @@ def test_throttle_with_assets(mocker, default_conf) -> None:
def throttled_func(nb_assets=-1):
return nb_assets
freqtrade = get_patched_freqtradebot(mocker, default_conf)
worker = get_patched_worker(mocker, default_conf)
result = freqtrade._throttle(throttled_func, min_secs=0.1, nb_assets=666)
result = worker._throttle(throttled_func, min_secs=0.1, nb_assets=666)
assert result == 666
result = freqtrade._throttle(throttled_func, min_secs=0.1)
result = worker._throttle(throttled_func, min_secs=0.1)
assert result == -1
@ -224,7 +255,7 @@ def test_edge_called_in_process(mocker, edge_conf) -> None:
freqtrade = FreqtradeBot(edge_conf)
freqtrade.pairlists._validate_whitelist = _refresh_whitelist
patch_get_signal(freqtrade)
freqtrade._process()
freqtrade.process()
assert freqtrade.active_pair_whitelist == ['NEO/BTC', 'LTC/BTC']
@ -654,7 +685,7 @@ def test_process_trade_creation(default_conf, ticker, limit_buy_order,
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
assert not trades
result = freqtrade._process()
result = freqtrade.process()
assert result is True
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
@ -685,10 +716,10 @@ def test_process_exchange_failures(default_conf, ticker, markets, mocker) -> Non
)
sleep_mock = mocker.patch('time.sleep', side_effect=lambda _: None)
freqtrade = FreqtradeBot(default_conf)
patch_get_signal(freqtrade)
worker = Worker(args=None, config=default_conf)
patch_get_signal(worker.freqtrade)
result = freqtrade._process()
result = worker._process()
assert result is False
assert sleep_mock.has_calls()
@ -702,14 +733,14 @@ def test_process_operational_exception(default_conf, ticker, markets, mocker) ->
markets=PropertyMock(return_value=markets),
buy=MagicMock(side_effect=OperationalException)
)
freqtrade = FreqtradeBot(default_conf)
patch_get_signal(freqtrade)
worker = Worker(args=None, config=default_conf)
patch_get_signal(worker.freqtrade)
assert freqtrade.state == State.RUNNING
assert worker.state == State.RUNNING
result = freqtrade._process()
result = worker._process()
assert result is False
assert freqtrade.state == State.STOPPED
assert worker.state == State.STOPPED
assert 'OperationalException' in msg_mock.call_args_list[-1][0][0]['status']
@ -730,18 +761,18 @@ def test_process_trade_handling(
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
assert not trades
result = freqtrade._process()
result = freqtrade.process()
assert result is True
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
assert len(trades) == 1
result = freqtrade._process()
result = freqtrade.process()
assert result is False
def test_process_trade_no_whitelist_pair(
default_conf, ticker, limit_buy_order, markets, fee, mocker) -> None:
""" Test _process with trade not in pair list """
""" Test process with trade not in pair list """
patch_RPCManager(mocker)
patch_exchange(mocker)
mocker.patch.multiple(
@ -778,7 +809,7 @@ def test_process_trade_no_whitelist_pair(
))
assert pair not in freqtrade.active_pair_whitelist
result = freqtrade._process()
result = freqtrade.process()
assert pair in freqtrade.active_pair_whitelist
# Make sure each pair is only in the list once
assert len(freqtrade.active_pair_whitelist) == len(set(freqtrade.active_pair_whitelist))
@ -808,7 +839,7 @@ def test_process_informative_pairs_added(default_conf, ticker, markets, mocker)
freqtrade.strategy.informative_pairs = inf_pairs
# patch_get_signal(freqtrade)
freqtrade._process()
freqtrade.process()
assert inf_pairs.call_count == 1
assert refresh_mock.call_count == 1
assert ("BTC/ETH", "1m") in refresh_mock.call_args[0][0]
@ -3056,5 +3087,5 @@ def test_startup_messages(default_conf, mocker):
'config': {'number_assets': 20}
}
mocker.patch('freqtrade.exchange.Exchange.exchange_has', MagicMock(return_value=True))
freqtrade = get_patched_freqtradebot(mocker, default_conf)
assert freqtrade.state is State.RUNNING
worker = get_patched_worker(mocker, default_conf)
assert worker.state is State.RUNNING

View File

@ -7,8 +7,8 @@ import pytest
from freqtrade import OperationalException
from freqtrade.arguments import Arguments
from freqtrade.freqtradebot import FreqtradeBot
from freqtrade.main import main, reconfigure
from freqtrade.worker import Worker
from freqtrade.main import main
from freqtrade.state import State
from freqtrade.tests.conftest import log_has, patch_exchange
@ -43,17 +43,14 @@ def test_main_start_hyperopt(mocker) -> None:
def test_main_fatal_exception(mocker, default_conf, caplog) -> None:
patch_exchange(mocker)
mocker.patch.multiple(
'freqtrade.freqtradebot.FreqtradeBot',
_init_modules=MagicMock(),
worker=MagicMock(side_effect=Exception),
cleanup=MagicMock(),
)
mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock())
mocker.patch('freqtrade.worker.Worker._worker', MagicMock(side_effect=Exception))
mocker.patch(
'freqtrade.configuration.Configuration._load_config_file',
lambda *args, **kwargs: default_conf
)
mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock())
mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock())
args = ['-c', 'config.json.example']
@ -66,17 +63,14 @@ def test_main_fatal_exception(mocker, default_conf, caplog) -> None:
def test_main_keyboard_interrupt(mocker, default_conf, caplog) -> None:
patch_exchange(mocker)
mocker.patch.multiple(
'freqtrade.freqtradebot.FreqtradeBot',
_init_modules=MagicMock(),
worker=MagicMock(side_effect=KeyboardInterrupt),
cleanup=MagicMock(),
)
mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock())
mocker.patch('freqtrade.worker.Worker._worker', MagicMock(side_effect=KeyboardInterrupt))
mocker.patch(
'freqtrade.configuration.Configuration._load_config_file',
lambda *args, **kwargs: default_conf
)
mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock())
mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock())
args = ['-c', 'config.json.example']
@ -89,17 +83,17 @@ def test_main_keyboard_interrupt(mocker, default_conf, caplog) -> None:
def test_main_operational_exception(mocker, default_conf, caplog) -> None:
patch_exchange(mocker)
mocker.patch.multiple(
'freqtrade.freqtradebot.FreqtradeBot',
_init_modules=MagicMock(),
worker=MagicMock(side_effect=OperationalException('Oh snap!')),
cleanup=MagicMock(),
mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock())
mocker.patch(
'freqtrade.worker.Worker._worker',
MagicMock(side_effect=OperationalException('Oh snap!'))
)
mocker.patch(
'freqtrade.configuration.Configuration._load_config_file',
lambda *args, **kwargs: default_conf
)
mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock())
mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock())
args = ['-c', 'config.json.example']
@ -112,21 +106,18 @@ def test_main_operational_exception(mocker, default_conf, caplog) -> None:
def test_main_reload_conf(mocker, default_conf, caplog) -> None:
patch_exchange(mocker)
mocker.patch.multiple(
'freqtrade.freqtradebot.FreqtradeBot',
_init_modules=MagicMock(),
worker=MagicMock(return_value=State.RELOAD_CONF),
cleanup=MagicMock(),
)
mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock())
mocker.patch('freqtrade.worker.Worker._worker', MagicMock(return_value=State.RELOAD_CONF))
mocker.patch(
'freqtrade.configuration.Configuration._load_config_file',
lambda *args, **kwargs: default_conf
)
mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock())
mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock())
# Raise exception as side effect to avoid endless loop
reconfigure_mock = mocker.patch(
'freqtrade.main.reconfigure', MagicMock(side_effect=Exception)
'freqtrade.main.Worker._reconfigure', MagicMock(side_effect=Exception)
)
with pytest.raises(SystemExit):
@ -138,19 +129,21 @@ def test_main_reload_conf(mocker, default_conf, caplog) -> None:
def test_reconfigure(mocker, default_conf) -> None:
patch_exchange(mocker)
mocker.patch.multiple(
'freqtrade.freqtradebot.FreqtradeBot',
_init_modules=MagicMock(),
worker=MagicMock(side_effect=OperationalException('Oh snap!')),
cleanup=MagicMock(),
mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock())
mocker.patch(
'freqtrade.worker.Worker._worker',
MagicMock(side_effect=OperationalException('Oh snap!'))
)
mocker.patch(
'freqtrade.configuration.Configuration._load_config_file',
lambda *args, **kwargs: default_conf
)
mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock())
mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock())
freqtrade = FreqtradeBot(default_conf)
args = Arguments(['-c', 'config.json.example'], '').get_parsed_arg()
worker = Worker(args=args, config=default_conf)
freqtrade = worker.freqtrade
# Renew mock to return modified data
conf = deepcopy(default_conf)
@ -160,11 +153,10 @@ def test_reconfigure(mocker, default_conf) -> None:
lambda *args, **kwargs: conf
)
worker._config = conf
# reconfigure should return a new instance
freqtrade2 = reconfigure(
freqtrade,
Arguments(['-c', 'config.json.example'], '').get_parsed_arg()
)
worker._reconfigure()
freqtrade2 = worker.freqtrade
# Verify we have a new instance with the new config
assert freqtrade is not freqtrade2

188
freqtrade/worker.py Executable file
View File

@ -0,0 +1,188 @@
"""
Main Freqtrade worker class.
"""
import logging
import time
import traceback
from argparse import Namespace
from typing import Any, Callable, Optional
import sdnotify
from freqtrade import (constants, OperationalException, TemporaryError,
__version__)
from freqtrade.configuration import Configuration
from freqtrade.freqtradebot import FreqtradeBot
from freqtrade.state import State
from freqtrade.rpc import RPCMessageType
logger = logging.getLogger(__name__)
class Worker(object):
"""
Freqtradebot worker class
"""
def __init__(self, args: Namespace, config=None) -> None:
"""
Init all variables and objects the bot needs to work
"""
logger.info('Starting worker %s', __version__)
self._args = args
self._config = config
self._init(False)
# Tell systemd that we completed initialization phase
if self._sd_notify:
logger.debug("sd_notify: READY=1")
self._sd_notify.notify("READY=1")
def _init(self, reconfig: bool):
"""
Also called from the _reconfigure() method (with reconfig=True).
"""
if reconfig or self._config is None:
# Load configuration
self._config = Configuration(self._args, None).get_config()
# Init the instance of the bot
self.freqtrade = FreqtradeBot(self._config)
self._throttle_secs = self._config.get('internals', {}).get(
'process_throttle_secs',
constants.PROCESS_THROTTLE_SECS
)
self._sd_notify = sdnotify.SystemdNotifier() if \
self._config.get('internals', {}).get('sd_notify', False) else None
@property
def state(self) -> State:
return self.freqtrade.state
@state.setter
def state(self, value: State):
self.freqtrade.state = value
def run(self):
state = None
while True:
state = self._worker(old_state=state)
if state == State.RELOAD_CONF:
self.freqtrade = self._reconfigure()
def _worker(self, old_state: State, throttle_secs: Optional[float] = None) -> State:
"""
Trading routine that must be run at each loop
:param old_state: the previous service state from the previous call
:return: current service state
"""
state = self.freqtrade.state
if throttle_secs is None:
throttle_secs = self._throttle_secs
# Log state transition
if state != old_state:
self.freqtrade.rpc.send_msg({
'type': RPCMessageType.STATUS_NOTIFICATION,
'status': f'{state.name.lower()}'
})
logger.info('Changing state to: %s', state.name)
if state == State.RUNNING:
self.freqtrade.rpc.startup_messages(self._config, self.freqtrade.pairlists)
if state == State.STOPPED:
# Ping systemd watchdog before sleeping in the stopped state
if self._sd_notify:
logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: STOPPED.")
self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: STOPPED.")
time.sleep(throttle_secs)
elif state == State.RUNNING:
# Ping systemd watchdog before throttling
if self._sd_notify:
logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: RUNNING.")
self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: RUNNING.")
self._throttle(func=self._process, min_secs=throttle_secs)
return state
def _throttle(self, func: Callable[..., Any], min_secs: float, *args, **kwargs) -> Any:
"""
Throttles the given callable that it
takes at least `min_secs` to finish execution.
:param func: Any callable
:param min_secs: minimum execution time in seconds
:return: Any
"""
start = time.time()
result = func(*args, **kwargs)
end = time.time()
duration = max(min_secs - (end - start), 0.0)
logger.debug('Throttling %s for %.2f seconds', func.__name__, duration)
time.sleep(duration)
return result
def _process(self) -> bool:
state_changed = False
try:
state_changed = self.freqtrade.process()
except TemporaryError as error:
logger.warning(f"Error: {error}, retrying in {constants.RETRY_TIMEOUT} seconds...")
time.sleep(constants.RETRY_TIMEOUT)
except OperationalException:
tb = traceback.format_exc()
hint = 'Issue `/start` if you think it is safe to restart.'
self.freqtrade.rpc.send_msg({
'type': RPCMessageType.STATUS_NOTIFICATION,
'status': f'OperationalException:\n```\n{tb}```{hint}'
})
logger.exception('OperationalException. Stopping trader ...')
self.freqtrade.state = State.STOPPED
# TODO: The return value of _process() is not used apart tests
# and should (could) be eliminated later. See PR #1689.
# state_changed = True
return state_changed
def _reconfigure(self):
"""
Cleans up current freqtradebot instance, reloads the configuration and
replaces it with the new instance
"""
# Tell systemd that we initiated reconfiguration
if self._sd_notify:
logger.debug("sd_notify: RELOADING=1")
self._sd_notify.notify("RELOADING=1")
# Clean up current freqtrade modules
self.freqtrade.cleanup()
# Load and validate config and create new instance of the bot
self._init(True)
self.freqtrade.rpc.send_msg({
'type': RPCMessageType.STATUS_NOTIFICATION,
'status': 'config reloaded'
})
# Tell systemd that we completed reconfiguration
if self._sd_notify:
logger.debug("sd_notify: READY=1")
self._sd_notify.notify("READY=1")
def exit(self):
# Tell systemd that we are exiting now
if self._sd_notify:
logger.debug("sd_notify: STOPPING=1")
self._sd_notify.notify("STOPPING=1")
if self.freqtrade:
self.freqtrade.rpc.send_msg({
'type': RPCMessageType.STATUS_NOTIFICATION,
'status': 'process died'
})
self.freqtrade.cleanup()