diff --git a/freqtrade/optimize/hyperopt/hyperopt.py b/freqtrade/optimize/hyperopt/hyperopt.py index 261e9962e..1fd3bf2ba 100644 --- a/freqtrade/optimize/hyperopt/hyperopt.py +++ b/freqtrade/optimize/hyperopt/hyperopt.py @@ -9,6 +9,7 @@ import random import sys from datetime import datetime from math import ceil +from multiprocessing import Manager from pathlib import Path from typing import Any @@ -21,6 +22,7 @@ from freqtrade.constants import FTHYPT_FILEVERSION, LAST_BT_RESULT_FN, Config from freqtrade.enums import HyperoptState from freqtrade.exceptions import OperationalException from freqtrade.misc import file_dump_json, plural +from freqtrade.optimize.hyperopt.hyperopt_logger import logging_mp_handle, logging_mp_setup from freqtrade.optimize.hyperopt.hyperopt_optimizer import HyperOptimizer from freqtrade.optimize.hyperopt.hyperopt_output import HyperoptOutput from freqtrade.optimize.hyperopt_tools import ( @@ -162,10 +164,16 @@ class Hyperopt: def run_optimizer_parallel(self, parallel: Parallel, asked: list[list]) -> list[dict[str, Any]]: """Start optimizer in a parallel way""" - return parallel( - delayed(wrap_non_picklable_objects(self.hyperopter.generate_optimizer))(v) - for v in asked - ) + + def optimizer_wrapper(*args, **kwargs): + # global log queue. This must happen in the file that initializes Parallel + logging_mp_setup( + log_queue, logging.INFO if self.config["verbosity"] < 1 else logging.DEBUG + ) + + return self.hyperopter.generate_optimizer(*args, **kwargs) + + return parallel(delayed(wrap_non_picklable_objects(optimizer_wrapper))(v) for v in asked) def _set_random_state(self, random_state: int | None) -> int: return random_state or random.randint(1, 2**16 - 1) # noqa: S311 @@ -243,6 +251,15 @@ class Hyperopt: self._save_result(val) + def _setup_logging_mp_workaround(self) -> None: + """ + Workaround for logging in child processes. + local_queue must be a global in the file that initializes Parallel. + """ + global log_queue + m = Manager() + log_queue = m.Queue() + def start(self) -> None: self.random_state = self._set_random_state(self.config.get("hyperopt_random_state")) logger.info(f"Using optimizer random state: {self.random_state}") @@ -257,7 +274,7 @@ class Hyperopt: self.opt = self.hyperopter.get_optimizer( config_jobs, self.random_state, INITIAL_POINTS, SKOPT_MODEL_QUEUE_SIZE ) - + self._setup_logging_mp_workaround() try: with Parallel(n_jobs=config_jobs) as parallel: jobs = parallel._effective_n_jobs() @@ -302,6 +319,7 @@ class Hyperopt: self.evaluate_result(val, current, is_random[j]) pbar.update(task, advance=1) + logging_mp_handle(log_queue) except KeyboardInterrupt: print("User interrupted..") diff --git a/freqtrade/optimize/hyperopt/hyperopt_logger.py b/freqtrade/optimize/hyperopt/hyperopt_logger.py new file mode 100644 index 000000000..e0b55b45d --- /dev/null +++ b/freqtrade/optimize/hyperopt/hyperopt_logger.py @@ -0,0 +1,43 @@ +import logging +from logging.handlers import QueueHandler +from multiprocessing import Queue, current_process +from queue import Empty + + +logger = logging.getLogger(__name__) + + +def logging_mp_setup(log_queue: Queue, verbosity: int): + """ + Setup logging in a child process. + Must be called in the child process before logging. + log_queue MUST be passed to the child process via inheritance + Which essentially means that the log_queue must be a global, created in the same + file as Parallel is initialized. + """ + current_proc = current_process().name + if current_proc != "MainProcess": + h = QueueHandler(log_queue) + root = logging.getLogger() + root.setLevel(verbosity) + root.addHandler(h) + + +def logging_mp_handle(q: Queue): + """ + Handle logging in a child process. + Must be called in the child process after logging. + """ + + try: + while True: + record = q.get(block=False) + # logger1 = logging.getLogger(record.name) + if record is None: + break + logger.handle(record) + + except Empty: + logger.info("empty") + # print("empty") + pass