feat: add hyperopt log handlers to allow for multiprocessing logging

This commit is contained in:
Matthias 2024-11-11 06:25:12 +01:00
parent 119b73ead2
commit 7e96e7af83
2 changed files with 66 additions and 5 deletions

View File

@ -9,6 +9,7 @@ import random
import sys
from datetime import datetime
from math import ceil
from multiprocessing import Manager
from pathlib import Path
from typing import Any
@ -21,6 +22,7 @@ from freqtrade.constants import FTHYPT_FILEVERSION, LAST_BT_RESULT_FN, Config
from freqtrade.enums import HyperoptState
from freqtrade.exceptions import OperationalException
from freqtrade.misc import file_dump_json, plural
from freqtrade.optimize.hyperopt.hyperopt_logger import logging_mp_handle, logging_mp_setup
from freqtrade.optimize.hyperopt.hyperopt_optimizer import HyperOptimizer
from freqtrade.optimize.hyperopt.hyperopt_output import HyperoptOutput
from freqtrade.optimize.hyperopt_tools import (
@ -162,10 +164,16 @@ class Hyperopt:
def run_optimizer_parallel(self, parallel: Parallel, asked: list[list]) -> list[dict[str, Any]]:
"""Start optimizer in a parallel way"""
return parallel(
delayed(wrap_non_picklable_objects(self.hyperopter.generate_optimizer))(v)
for v in asked
)
def optimizer_wrapper(*args, **kwargs):
# global log queue. This must happen in the file that initializes Parallel
logging_mp_setup(
log_queue, logging.INFO if self.config["verbosity"] < 1 else logging.DEBUG
)
return self.hyperopter.generate_optimizer(*args, **kwargs)
return parallel(delayed(wrap_non_picklable_objects(optimizer_wrapper))(v) for v in asked)
def _set_random_state(self, random_state: int | None) -> int:
return random_state or random.randint(1, 2**16 - 1) # noqa: S311
@ -243,6 +251,15 @@ class Hyperopt:
self._save_result(val)
def _setup_logging_mp_workaround(self) -> None:
"""
Workaround for logging in child processes.
local_queue must be a global in the file that initializes Parallel.
"""
global log_queue
m = Manager()
log_queue = m.Queue()
def start(self) -> None:
self.random_state = self._set_random_state(self.config.get("hyperopt_random_state"))
logger.info(f"Using optimizer random state: {self.random_state}")
@ -257,7 +274,7 @@ class Hyperopt:
self.opt = self.hyperopter.get_optimizer(
config_jobs, self.random_state, INITIAL_POINTS, SKOPT_MODEL_QUEUE_SIZE
)
self._setup_logging_mp_workaround()
try:
with Parallel(n_jobs=config_jobs) as parallel:
jobs = parallel._effective_n_jobs()
@ -302,6 +319,7 @@ class Hyperopt:
self.evaluate_result(val, current, is_random[j])
pbar.update(task, advance=1)
logging_mp_handle(log_queue)
except KeyboardInterrupt:
print("User interrupted..")

View File

@ -0,0 +1,43 @@
import logging
from logging.handlers import QueueHandler
from multiprocessing import Queue, current_process
from queue import Empty
logger = logging.getLogger(__name__)
def logging_mp_setup(log_queue: Queue, verbosity: int):
"""
Setup logging in a child process.
Must be called in the child process before logging.
log_queue MUST be passed to the child process via inheritance
Which essentially means that the log_queue must be a global, created in the same
file as Parallel is initialized.
"""
current_proc = current_process().name
if current_proc != "MainProcess":
h = QueueHandler(log_queue)
root = logging.getLogger()
root.setLevel(verbosity)
root.addHandler(h)
def logging_mp_handle(q: Queue):
"""
Handle logging in a child process.
Must be called in the child process after logging.
"""
try:
while True:
record = q.get(block=False)
# logger1 = logging.getLogger(record.name)
if record is None:
break
logger.handle(record)
except Empty:
logger.info("empty")
# print("empty")
pass