From e87ac51ecfef39e92d6de0ec6c27a81458414853 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Paugam?= <35327799+FrancoisPgm@users.noreply.github.com> Date: Mon, 15 Sep 2025 14:26:16 +0200 Subject: [PATCH 1/7] add MetricMonitor callback --- sklearn/callback/__init__.py | 2 + sklearn/callback/_metric_monitor.py | 50 +++++++++++++++++++ sklearn/callback/_mixin.py | 24 +++++++++ sklearn/callback/tests/_utils.py | 32 ++++++++++++ sklearn/callback/tests/test_metric_monitor.py | 38 ++++++++++++++ 5 files changed, 146 insertions(+) create mode 100644 sklearn/callback/_metric_monitor.py create mode 100644 sklearn/callback/tests/test_metric_monitor.py diff --git a/sklearn/callback/__init__.py b/sklearn/callback/__init__.py index 1f0d661cb2381..3ffd96f1948c0 100644 --- a/sklearn/callback/__init__.py +++ b/sklearn/callback/__init__.py @@ -8,6 +8,7 @@ from sklearn.callback._base import AutoPropagatedProtocol, CallbackProtocol from sklearn.callback._callback_context import CallbackContext +from sklearn.callback._metric_monitor import MetricMonitor from sklearn.callback._mixin import CallbackSupportMixin from sklearn.callback._progressbar import ProgressBar @@ -16,5 +17,6 @@ "CallbackContext", "CallbackProtocol", "CallbackSupportMixin", + "MetricMonitor", "ProgressBar", ] diff --git a/sklearn/callback/_metric_monitor.py b/sklearn/callback/_metric_monitor.py new file mode 100644 index 0000000000000..e7c9ebf846fa2 --- /dev/null +++ b/sklearn/callback/_metric_monitor.py @@ -0,0 +1,50 @@ +# Authors: The scikit-learn developers +# SPDX-License-Identifier: BSD-3-Clause + +import inspect +from functools import partial + + +class MetricMonitor: + """Callback that monitors a metric for each iterative steps of an estimator. + + The specified metric function is called on the target values `y` and the predicted + values on the samples `y_pred = estimator.predict(X)` at each iterative step of the + estimator. + + Parameters + ---------- + metric : function + The metric to compute. + metric_kwargs : dict + Keyword argumments for the metric. + """ + + def __init__(self, metric, metric_kwargs): + valid_params = inspect.signature(metric).parameters + invalid_params = [arg for arg in metric_kwargs if arg not in valid_params] + if invalid_params: + raise ValueError( + f"The parameters '{invalid_params}' cannot be used with the function" + f"{metric.__module__}.{metric.__name__}." + ) + self.metric_func = partial(metric, **metric_kwargs) + self.log = [] + + def _on_fit_begin(self, estimator, *, data): + pass + + def _on_fit_iter_end( + self, estimator, task_info, data, from_reconstruction_attributes, **kwargs + ): + if not hasattr(estimator, "predict"): + raise ValueError( + f"Estimator {estimator.__class__} does not have a predict method, which" + "is necessary to use a MetricMonitor callback." + ) + y_pred = from_reconstruction_attributes().predict(data["X_train"]) + metric_value = self.metric_func(data["y_train"], y_pred) + self.log.append((task_info["task_id"], metric_value)) + + def _on_fit_end(self, estimator, task_info): + pass diff --git a/sklearn/callback/_mixin.py b/sklearn/callback/_mixin.py index 144632ed69971..281c6795692e7 100644 --- a/sklearn/callback/_mixin.py +++ b/sklearn/callback/_mixin.py @@ -1,6 +1,8 @@ # Authors: The scikit-learn developers # SPDX-License-Identifier: BSD-3-Clause +import copy + from sklearn.callback._base import CallbackProtocol from sklearn.callback._callback_context import CallbackContext @@ -57,3 +59,25 @@ def init_callback_context(self, task_name="fit", max_subtasks=None): ) return self._callback_fit_ctx + + def _from_reconstruction_attributes(self, *, reconstruction_attributes): + """Return an as if fitted copy of this estimator + + Parameters + ---------- + reconstruction_attributes : callable + A callable that has no arguments and returns the necessary fitted attributes + to create a working fitted estimator from this instance. + + Using a callable allows lazy evaluation of the potentially costly + reconstruction attributes. + + Returns + ------- + fitted_estimator : estimator instance + The fitted copy of this estimator. + """ + new_estimator = copy.copy(self) # XXX deepcopy ? + for key, val in reconstruction_attributes().items(): + setattr(new_estimator, key, val) + return new_estimator diff --git a/sklearn/callback/tests/_utils.py b/sklearn/callback/tests/_utils.py index 554ac94ccb94d..79475b7908951 100644 --- a/sklearn/callback/tests/_utils.py +++ b/sklearn/callback/tests/_utils.py @@ -2,6 +2,9 @@ # SPDX-License-Identifier: BSD-3-Clause import time +from functools import partial + +import numpy as np from sklearn.base import BaseEstimator, _fit_context, clone from sklearn.callback import CallbackSupportMixin @@ -54,6 +57,11 @@ def fit(self, X=None, y=None): if subcontext.eval_on_fit_iter_end( estimator=self, data={"X_train": X, "y_train": y}, + fit_state={}, + from_reconstruction_attributes=partial( + self._from_reconstruction_attributes, + reconstruction_attributes=lambda: {"n_iter_": i + 1}, + ), ): break @@ -61,6 +69,30 @@ def fit(self, X=None, y=None): return self + def predict(self, X): + return np.mean(X, axis=1) * self.n_iter_ + + +class EstimatorWithoutPredict(CallbackSupportMixin, BaseEstimator): + _parameter_constraints: dict = {} + + def fit(self, X=None, y=None): + callback_ctx = self.init_callback_context().eval_on_fit_begin( + estimator=self, data={"X_train": X, "y_train": y} + ) + subcontext = callback_ctx.subcontext(task_id=0) + subcontext.eval_on_fit_iter_end( + estimator=self, + data={"X_train": X, "y_train": y}, + fit_state={}, + from_reconstruction_attributes=partial( + self._from_reconstruction_attributes, + reconstruction_attributes=lambda: {}, + ), + ) + + return self + class WhileEstimator(CallbackSupportMixin, BaseEstimator): _parameter_constraints: dict = {} diff --git a/sklearn/callback/tests/test_metric_monitor.py b/sklearn/callback/tests/test_metric_monitor.py new file mode 100644 index 0000000000000..ab6ffc42c2d87 --- /dev/null +++ b/sklearn/callback/tests/test_metric_monitor.py @@ -0,0 +1,38 @@ +# Authors: The scikit-learn developers +# SPDX-License-Identifier: BSD-3-Clause + +import numpy as np +import pytest + +from sklearn.callback import MetricMonitor +from sklearn.callback.tests._utils import Estimator, EstimatorWithoutPredict +from sklearn.metrics import mean_pinball_loss + + +def test_metric_monitor(): + max_iter = 10 + n_dim = 5 + n_samples = 3 + alpha = 0.6 + estimator = Estimator(max_iter=max_iter) + callback = MetricMonitor(mean_pinball_loss, metric_kwargs={"alpha": alpha}) + estimator.set_callbacks(callback) + X, y = np.ones((n_dim, n_samples)), np.ones(n_dim) + + estimator.fit(X, y) + + expected_log = [ + (i, mean_pinball_loss(y, X.mean(axis=1) * (i + 1), alpha=alpha)) + for i in range(max_iter) + ] + + assert np.array_equal(callback.log, expected_log) + + +def test_no_predict_error(): + estimator = EstimatorWithoutPredict() + callback = MetricMonitor(mean_pinball_loss, metric_kwargs={"alpha": 0.6}) + estimator.set_callbacks(callback) + + with pytest.raises(ValueError, match="does not have a predict method"): + estimator.fit() From 2dc3ed31540211e2575f7cc128c376f1d63669e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Paugam?= <35327799+FrancoisPgm@users.noreply.github.com> Date: Tue, 16 Sep 2025 10:51:23 +0200 Subject: [PATCH 2/7] move the check for a predict method to _on_fit_begin to do it just once --- sklearn/callback/_metric_monitor.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sklearn/callback/_metric_monitor.py b/sklearn/callback/_metric_monitor.py index e7c9ebf846fa2..5e4e04dd5fbf9 100644 --- a/sklearn/callback/_metric_monitor.py +++ b/sklearn/callback/_metric_monitor.py @@ -32,16 +32,15 @@ def __init__(self, metric, metric_kwargs): self.log = [] def _on_fit_begin(self, estimator, *, data): - pass - - def _on_fit_iter_end( - self, estimator, task_info, data, from_reconstruction_attributes, **kwargs - ): if not hasattr(estimator, "predict"): raise ValueError( f"Estimator {estimator.__class__} does not have a predict method, which" "is necessary to use a MetricMonitor callback." ) + + def _on_fit_iter_end( + self, estimator, task_info, data, from_reconstruction_attributes, **kwargs + ): y_pred = from_reconstruction_attributes().predict(data["X_train"]) metric_value = self.metric_func(data["y_train"], y_pred) self.log.append((task_info["task_id"], metric_value)) From 6f4320f171dbe700faf07d76d95366b89f3566ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Paugam?= <35327799+FrancoisPgm@users.noreply.github.com> Date: Thu, 18 Sep 2025 16:04:53 +0200 Subject: [PATCH 3/7] make MetricMonitor callback support being parallelized in a meta-estimator --- sklearn/base.py | 2 +- sklearn/callback/_callback_context.py | 32 +++- sklearn/callback/_metric_monitor.py | 74 ++++++-- sklearn/callback/_progressbar.py | 21 +-- sklearn/callback/tests/_utils.py | 90 ++++++---- .../callback/tests/test_callback_context.py | 2 +- sklearn/callback/tests/test_metric_monitor.py | 166 ++++++++++++++++-- 7 files changed, 287 insertions(+), 100 deletions(-) diff --git a/sklearn/base.py b/sklearn/base.py index 3bc76bd7a4a3c..3f9cc68ea9acf 100644 --- a/sklearn/base.py +++ b/sklearn/base.py @@ -136,7 +136,7 @@ def _clone_parametrized(estimator, *, safe=True): # attach callbacks to the new estimator if hasattr(estimator, "_skl_callbacks"): - new_object._skl_callbacks = clone(estimator._skl_callbacks, safe=False) + new_object._skl_callbacks = estimator._skl_callbacks # quick sanity check of the parameters of the clone for name in new_object_params: diff --git a/sklearn/callback/_callback_context.py b/sklearn/callback/_callback_context.py index aa16e632a365a..a6c86e176a2ab 100644 --- a/sklearn/callback/_callback_context.py +++ b/sklearn/callback/_callback_context.py @@ -179,7 +179,7 @@ def _from_parent(cls, parent_context, *, task_name, task_id, max_subtasks=None): @property def task_info(self): - """Information about the corresponding computation task. the keys are + """Information about the corresponding computation task. The keys are - depth : int The depth of the task in the task tree. @@ -392,15 +392,33 @@ def propagate_callbacks(self, sub_estimator): ) ] - if not callbacks_to_propagate: - return self - # We store the parent context in the sub-estimator to be able to merge the # task trees of the sub-estimator and the meta-estimator. sub_estimator._parent_callback_ctx = self - sub_estimator.set_callbacks( - getattr(sub_estimator, "_skl_callbacks", []) + callbacks_to_propagate - ) + if callbacks_to_propagate: + sub_estimator.set_callbacks( + getattr(sub_estimator, "_skl_callbacks", []) + callbacks_to_propagate + ) return self + + +def _get_context_path(task_info): + """Helper function to get the path of task info from this task to the root task. + + Parameters + ---------- + task_info : dict + The dictionary representations of a CallbackContext's task node. + + Returns + ------- + list of dict + The list of dictionary representations of the parents of the given task. + """ + return ( + [task_info] + if task_info["parent"] is None + else _get_context_path(task_info["parent"]) + [task_info] + ) diff --git a/sklearn/callback/_metric_monitor.py b/sklearn/callback/_metric_monitor.py index 5e4e04dd5fbf9..fbc6d81c3d242 100644 --- a/sklearn/callback/_metric_monitor.py +++ b/sklearn/callback/_metric_monitor.py @@ -2,7 +2,11 @@ # SPDX-License-Identifier: BSD-3-Clause import inspect -from functools import partial +from multiprocessing import Manager + +import pandas as pd + +from sklearn.callback._callback_context import _get_context_path class MetricMonitor: @@ -16,34 +20,68 @@ class MetricMonitor: ---------- metric : function The metric to compute. - metric_kwargs : dict - Keyword argumments for the metric. + metric_kwargs : dict or None, default=None + Keyword argumments for the metric, None means no argument is used. + on_validation : bool, default=True + Whether to compute the metric on validation data (if True) or training data + (if False). """ - def __init__(self, metric, metric_kwargs): - valid_params = inspect.signature(metric).parameters - invalid_params = [arg for arg in metric_kwargs if arg not in valid_params] - if invalid_params: - raise ValueError( - f"The parameters '{invalid_params}' cannot be used with the function" - f"{metric.__module__}.{metric.__name__}." - ) - self.metric_func = partial(metric, **metric_kwargs) - self.log = [] + def __init__(self, metric, metric_params=None, on_validation=True): + self.on_validation = on_validation + self.metric_params = metric_params or dict() + if metric_params is not None: + valid_params = inspect.signature(metric).parameters + invalid_params = [arg for arg in metric_params if arg not in valid_params] + if invalid_params: + raise ValueError( + f"The parameters '{invalid_params}' cannot be used with the" + f" function {metric.__module__}.{metric.__name__}." + ) + self.metric_func = metric + self._shared_log = Manager().list() def _on_fit_begin(self, estimator, *, data): if not hasattr(estimator, "predict"): raise ValueError( f"Estimator {estimator.__class__} does not have a predict method, which" - "is necessary to use a MetricMonitor callback." + " is necessary to use a MetricMonitor callback." ) + self._fit_log = Manager().Queue() def _on_fit_iter_end( self, estimator, task_info, data, from_reconstruction_attributes, **kwargs ): - y_pred = from_reconstruction_attributes().predict(data["X_train"]) - metric_value = self.metric_func(data["y_train"], y_pred) - self.log.append((task_info["task_id"], metric_value)) + # TODO: add check to verify we're on the innermost level of the fit loop + # e.g. for the KMeans + X, y = ( + (data["X_val"], data["y_val"]) + if self.on_validation + else (data["X_train"], data["y_train"]) + ) + y_pred = from_reconstruction_attributes().predict(X) + metric_value = self.metric_func(y, y_pred, **self.metric_params) + log_item = {self.metric_func.__name__: metric_value} + for node_info in _get_context_path(task_info): + prev_task_str = ( + f"{node_info['prev_estimator_name']}_{node_info['prev_task_name']}|" + if node_info["prev_estimator_name"] is not None + else "" + ) + log_item[ + f"{node_info['depth']}_{prev_task_str}{node_info['estimator_name']}_" + f"{node_info['task_name']}" + ] = node_info["task_id"] + self._fit_log.put(log_item) def _on_fit_end(self, estimator, task_info): - pass + while not self._fit_log.empty(): + self._shared_log.append(self._fit_log.get()) + + def get_logs(self): + self.log = pd.DataFrame(list(self._shared_log)) + if not self.log.empty: + self.log = self.log.set_index( + [col for col in self.log.columns if col != self.metric_func.__name__] + ).sort_index() + return self.log.copy() diff --git a/sklearn/callback/_progressbar.py b/sklearn/callback/_progressbar.py index 22df8324d5d52..0273e838e0bfe 100644 --- a/sklearn/callback/_progressbar.py +++ b/sklearn/callback/_progressbar.py @@ -4,6 +4,7 @@ from multiprocessing import Manager from threading import Thread +from sklearn.callback._callback_context import _get_context_path from sklearn.utils._optional_dependencies import check_rich_support @@ -212,23 +213,3 @@ def __iter__(self): yield self for child in self.children.values(): yield from child - - -def _get_context_path(task_info): - """Helper function to get the path of task info from this task to the root task. - - Parameters - ---------- - task_info : dict - The dictionary representations of a CallbackContext's task node. - - Returns - ------- - list of dict - The list of dictionary representations of the parents of the given task. - """ - return ( - [task_info] - if task_info["parent"] is None - else _get_context_path(task_info["parent"]) + [task_info] - ) diff --git a/sklearn/callback/tests/_utils.py b/sklearn/callback/tests/_utils.py index 79475b7908951..7f9166398aab3 100644 --- a/sklearn/callback/tests/_utils.py +++ b/sklearn/callback/tests/_utils.py @@ -39,57 +39,51 @@ def _on_fit_iter_end(self, estimator, node, **kwargs): class Estimator(CallbackSupportMixin, BaseEstimator): _parameter_constraints: dict = {} - def __init__(self, max_iter=20, computation_intensity=0.001): + def __init__(self, intercept=0, max_iter=20, computation_intensity=0.001): self.max_iter = max_iter self.computation_intensity = computation_intensity + self.intercept = intercept @_fit_context(prefer_skip_nested_validation=False) - def fit(self, X=None, y=None): + def fit(self, X_train=None, y_train=None, X_val=None, y_val=None): + data = { + "X_train": X_train, + "y_train": y_train, + "X_val": X_val, + "y_val": y_val, + } callback_ctx = self.init_callback_context( max_subtasks=self.max_iter - ).eval_on_fit_begin(estimator=self, data={"X_train": X, "y_train": y}) + ).eval_on_fit_begin(estimator=self, data=data) for i in range(self.max_iter): - subcontext = callback_ctx.subcontext(task_id=i) + subcontext = callback_ctx.subcontext(task_id=i, task_name="fit_iter") time.sleep(self.computation_intensity) # Computation intensive task if subcontext.eval_on_fit_iter_end( estimator=self, - data={"X_train": X, "y_train": y}, - fit_state={}, + data=data, from_reconstruction_attributes=partial( self._from_reconstruction_attributes, - reconstruction_attributes=lambda: {"n_iter_": i + 1}, + reconstruction_attributes=lambda: {"coef_": i + 1}, ), ): break - self.n_iter_ = i + 1 + self.coef_ = i + 1 return self def predict(self, X): - return np.mean(X, axis=1) * self.n_iter_ + return np.mean(X, axis=1) * self.coef_ + self.intercept class EstimatorWithoutPredict(CallbackSupportMixin, BaseEstimator): _parameter_constraints: dict = {} - def fit(self, X=None, y=None): - callback_ctx = self.init_callback_context().eval_on_fit_begin( - estimator=self, data={"X_train": X, "y_train": y} - ) - subcontext = callback_ctx.subcontext(task_id=0) - subcontext.eval_on_fit_iter_end( - estimator=self, - data={"X_train": X, "y_train": y}, - fit_state={}, - from_reconstruction_attributes=partial( - self._from_reconstruction_attributes, - reconstruction_attributes=lambda: {}, - ), - ) + def fit(self): + self.init_callback_context().eval_on_fit_begin(estimator=self, data=None) return self @@ -97,34 +91,51 @@ def fit(self, X=None, y=None): class WhileEstimator(CallbackSupportMixin, BaseEstimator): _parameter_constraints: dict = {} - def __init__(self, computation_intensity=0.001): + def __init__(self, intercept=0, max_iter=20, computation_intensity=0.001): + self.intercept = intercept self.computation_intensity = computation_intensity + self.max_iter = max_iter @_fit_context(prefer_skip_nested_validation=False) - def fit(self, X=None, y=None): + def fit(self, X_train=None, y_train=None, X_val=None, y_val=None): + data = { + "X_train": X_train, + "y_train": y_train, + "X_val": X_val, + "y_val": y_val, + } callback_ctx = self.init_callback_context().eval_on_fit_begin( - estimator=self, data={"X_train": X, "y_train": y} + estimator=self, data=data ) i = 0 while True: - subcontext = callback_ctx.subcontext(task_id=i) + subcontext = callback_ctx.subcontext(task_id=i, task_name="fit_iter") time.sleep(self.computation_intensity) # Computation intensive task if subcontext.eval_on_fit_iter_end( estimator=self, - data={"X_train": X, "y_train": y}, + data=data, + from_reconstruction_attributes=partial( + self._from_reconstruction_attributes, + reconstruction_attributes=lambda: {"coef_": i + 1}, + ), ): break - if i == 20: + if i == self.max_iter - 1: break i += 1 + self.coef_ = i + 1 + return self + def predict(self, X): + return np.mean(X, axis=1) * self.coef_ + self.intercept + class MetaEstimator(CallbackSupportMixin, BaseEstimator): _parameter_constraints: dict = {} @@ -139,17 +150,22 @@ def __init__( self.prefer = prefer @_fit_context(prefer_skip_nested_validation=False) - def fit(self, X=None, y=None): + def fit(self, X_train=None, y_train=None, X_val=None, y_val=None): + data = { + "X_train": X_train, + "y_train": y_train, + "X_val": X_val, + "y_val": y_val, + } callback_ctx = self.init_callback_context( max_subtasks=self.n_outer - ).eval_on_fit_begin(estimator=self, data={"X_train": X, "y_train": y}) + ).eval_on_fit_begin(estimator=self, data=data) Parallel(n_jobs=self.n_jobs, prefer=self.prefer)( delayed(_func)( self, self.estimator, - X, - y, + data, callback_ctx=callback_ctx.subcontext( task_name="outer", task_id=i, max_subtasks=self.n_inner ), @@ -160,22 +176,22 @@ def fit(self, X=None, y=None): return self -def _func(meta_estimator, inner_estimator, X, y, *, callback_ctx): +def _func(meta_estimator, inner_estimator, data, *, callback_ctx): for i in range(meta_estimator.n_inner): est = clone(inner_estimator) + iter_id = callback_ctx.task_id * meta_estimator.n_inner + i + est.intercept = iter_id inner_ctx = callback_ctx.subcontext( task_name="inner", task_id=i ).propagate_callbacks(sub_estimator=est) - est.fit(X, y) + est.fit(**data) inner_ctx.eval_on_fit_iter_end( estimator=meta_estimator, - data={"X_train": X, "y_train": y}, ) callback_ctx.eval_on_fit_iter_end( estimator=meta_estimator, - data={"X_train": X, "y_train": y}, ) diff --git a/sklearn/callback/tests/test_callback_context.py b/sklearn/callback/tests/test_callback_context.py index 7c6a28e8c1eb7..e39919da64003 100644 --- a/sklearn/callback/tests/test_callback_context.py +++ b/sklearn/callback/tests/test_callback_context.py @@ -98,7 +98,7 @@ def test_auto_propagated_callbacks(): r"sub-estimator .*of a meta-estimator .*can't have auto-propagated callbacks" ) with pytest.raises(TypeError, match=match): - meta_estimator.fit(X=None, y=None) + meta_estimator.fit() def _make_task_tree(n_children, n_grandchildren): diff --git a/sklearn/callback/tests/test_metric_monitor.py b/sklearn/callback/tests/test_metric_monitor.py index ab6ffc42c2d87..2aca882842b9f 100644 --- a/sklearn/callback/tests/test_metric_monitor.py +++ b/sklearn/callback/tests/test_metric_monitor.py @@ -1,38 +1,172 @@ # Authors: The scikit-learn developers # SPDX-License-Identifier: BSD-3-Clause +from itertools import product + import numpy as np +import pandas as pd import pytest from sklearn.callback import MetricMonitor -from sklearn.callback.tests._utils import Estimator, EstimatorWithoutPredict -from sklearn.metrics import mean_pinball_loss +from sklearn.callback.tests._utils import ( + Estimator, + EstimatorWithoutPredict, + MetaEstimator, + WhileEstimator, +) +from sklearn.metrics import mean_pinball_loss, mean_squared_error -def test_metric_monitor(): - max_iter = 10 +@pytest.mark.parametrize("EstimatorClass", [Estimator, WhileEstimator]) +@pytest.mark.parametrize( + "metric, metric_params", + [(mean_squared_error, None), (mean_pinball_loss, {"alpha": 0.6})], +) +def test_metric_monitor(EstimatorClass, metric, metric_params): + max_iter = 3 n_dim = 5 n_samples = 3 - alpha = 0.6 - estimator = Estimator(max_iter=max_iter) - callback = MetricMonitor(mean_pinball_loss, metric_kwargs={"alpha": alpha}) - estimator.set_callbacks(callback) - X, y = np.ones((n_dim, n_samples)), np.ones(n_dim) + intercept = 1 + estimator = EstimatorClass(intercept=intercept, max_iter=max_iter) + callback_train = MetricMonitor( + metric, metric_params=metric_params, on_validation=False + ) + callback_val = MetricMonitor( + metric, metric_params=metric_params, on_validation=True + ) + estimator.set_callbacks([callback_train, callback_val]) + rng = np.random.RandomState(0) + X_train, y_train = rng.uniform(size=(n_dim, n_samples)), rng.uniform(size=n_dim) + X_val, y_val = rng.uniform(size=(n_dim, n_samples)), rng.uniform(size=n_dim) - estimator.fit(X, y) + estimator.fit(X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val) - expected_log = [ - (i, mean_pinball_loss(y, X.mean(axis=1) * (i + 1), alpha=alpha)) - for i in range(max_iter) - ] + metric_params = metric_params or dict() + log_train = callback_train.get_logs() + expected_log_train = pd.DataFrame( + [ + { + f"0_{estimator.__class__.__name__}_fit": 0, + f"1_{estimator.__class__.__name__}_fit_iter": i, + metric.__name__: metric( + y_train, X_train.mean(axis=1) * (i + 1) + intercept, **metric_params + ), + } + for i in range(max_iter) + ] + ) + expected_log_train = expected_log_train.set_index( + [col for col in expected_log_train.columns if col != metric.__name__] + ) + assert log_train.equals(expected_log_train) + assert np.array_equal(log_train.index.names, expected_log_train.index.names) - assert np.array_equal(callback.log, expected_log) + log_val = callback_val.get_logs() + expected_log_val = pd.DataFrame( + [ + { + f"0_{estimator.__class__.__name__}_fit": 0, + f"1_{estimator.__class__.__name__}_fit_iter": i, + metric.__name__: metric( + y_val, X_val.mean(axis=1) * (i + 1) + intercept, **metric_params + ), + } + for i in range(max_iter) + ] + ) + expected_log_val = expected_log_val.set_index( + [col for col in expected_log_val.columns if col != metric.__name__] + ) + assert log_val.equals(expected_log_val) + assert np.array_equal(log_val.index.names, expected_log_val.index.names) def test_no_predict_error(): estimator = EstimatorWithoutPredict() - callback = MetricMonitor(mean_pinball_loss, metric_kwargs={"alpha": 0.6}) + callback = MetricMonitor(mean_pinball_loss, metric_params={"alpha": 0.6}) estimator.set_callbacks(callback) with pytest.raises(ValueError, match="does not have a predict method"): estimator.fit() + + +def test_wrong_kwarg_error(): + with pytest.raises(ValueError, match="cannot be used with the function"): + MetricMonitor(mean_pinball_loss, metric_params={"wrong_name": 0.6}) + + +@pytest.mark.parametrize("prefer", ["processes", "threads"]) +@pytest.mark.parametrize( + "metric, metric_params", + [(mean_squared_error, None), (mean_pinball_loss, {"alpha": 0.6})], +) +def test_within_meta_estimator(prefer, metric, metric_params): + n_outer = 3 + n_inner = 2 + max_iter = 4 + n_dim = 5 + n_samples = 3 + rng = np.random.RandomState(0) + X_train, y_train = rng.uniform(size=(n_dim, n_samples)), rng.uniform(size=n_dim) + X_val, y_val = rng.uniform(size=(n_dim, n_samples)), rng.uniform(size=n_dim) + callback_train = MetricMonitor( + metric, metric_params=metric_params, on_validation=False + ) + callback_val = MetricMonitor( + metric, metric_params=metric_params, on_validation=True + ) + est = Estimator(max_iter=max_iter) + est.set_callbacks([callback_train, callback_val]) + meta_est = MetaEstimator( + est, n_outer=n_outer, n_inner=n_inner, n_jobs=2, prefer=prefer + ) + + meta_est.fit(X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val) + + metric_params = metric_params or dict() + expected_log_train = [] + expected_log_val = [] + for i_outer, i_inner in product(range(n_outer), range(n_inner)): + est = Estimator(intercept=i_outer * n_inner + i_inner) + for i_estimator_fit_iter in range(max_iter): + setattr(est, "coef_", i_estimator_fit_iter + 1) + expected_log_train.append( + { + metric.__name__: metric( + y_train, est.predict(X_train), **metric_params + ), + f"0_{meta_est.__class__.__name__}_fit": 0, + f"1_{meta_est.__class__.__name__}_outer": i_outer, + f"2_{meta_est.__class__.__name__}_inner|" + f"{est.__class__.__name__}_fit": i_inner, + f"3_{est.__class__.__name__}_fit_iter": i_estimator_fit_iter, + } + ) + expected_log_val.append( + { + metric.__name__: metric(y_val, est.predict(X_val), **metric_params), + f"0_{meta_est.__class__.__name__}_fit": 0, + f"1_{meta_est.__class__.__name__}_outer": i_outer, + f"2_{meta_est.__class__.__name__}_inner|" + f"{est.__class__.__name__}_fit": i_inner, + f"3_{est.__class__.__name__}_fit_iter": i_estimator_fit_iter, + } + ) + expected_log_train = pd.DataFrame(expected_log_train) + expected_log_train = expected_log_train.set_index( + [col for col in expected_log_train.columns if col != metric.__name__] + ) + expected_log_val = pd.DataFrame(expected_log_val) + expected_log_val = expected_log_val.set_index( + [col for col in expected_log_val.columns if col != metric.__name__] + ) + + log_train = callback_train.get_logs() + log_val = callback_val.get_logs() + + assert len(log_train) == len(expected_log_train) + assert len(log_val) == len(expected_log_val) + assert log_train.equals(expected_log_train) + assert np.array_equal(log_train.index.names, expected_log_train.index.names) + assert log_val.equals(expected_log_val) + assert np.array_equal(log_val.index.names, expected_log_val.index.names) From 20a5b8bfb2dcf5e33f2c7b5ddfe353db97249dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Paugam?= <35327799+FrancoisPgm@users.noreply.github.com> Date: Fri, 19 Sep 2025 11:02:03 +0200 Subject: [PATCH 4/7] update task_id to _task_id --- sklearn/callback/tests/_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sklearn/callback/tests/_utils.py b/sklearn/callback/tests/_utils.py index cac7867dadd76..17128cafb2919 100644 --- a/sklearn/callback/tests/_utils.py +++ b/sklearn/callback/tests/_utils.py @@ -197,7 +197,7 @@ def fit(self, X_train=None, y_train=None, X_val=None, y_val=None): def _func(meta_estimator, inner_estimator, data, *, callback_ctx): for i in range(meta_estimator.n_inner): est = clone(inner_estimator) - iter_id = callback_ctx.task_id * meta_estimator.n_inner + i + iter_id = callback_ctx._task_id * meta_estimator.n_inner + i est.intercept = iter_id inner_ctx = callback_ctx.subcontext( From beadfb76cf1d36a13d29b2fa347608e3a989ed4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Paugam?= <35327799+FrancoisPgm@users.noreply.github.com> Date: Fri, 19 Sep 2025 11:52:07 +0200 Subject: [PATCH 5/7] fix race-condition error when nesting parallelism on MetricMonitor --- sklearn/callback/_metric_monitor.py | 17 +++++++++++------ sklearn/callback/tests/test_metric_monitor.py | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sklearn/callback/_metric_monitor.py b/sklearn/callback/_metric_monitor.py index fae0ce36d1010..f1e6027f4f39a 100644 --- a/sklearn/callback/_metric_monitor.py +++ b/sklearn/callback/_metric_monitor.py @@ -39,7 +39,7 @@ def __init__(self, metric, metric_params=None, on_validation=True): f" function {metric.__module__}.{metric.__name__}." ) self.metric_func = metric - self._shared_log = Manager().list() + self._shared_mem_log = Manager().list() def _on_fit_begin(self, estimator): if not hasattr(estimator, "predict"): @@ -47,7 +47,6 @@ def _on_fit_begin(self, estimator): f"Estimator {estimator.__class__} does not have a predict method, which" " is necessary to use a MetricMonitor callback." ) - self._fit_log = Manager().Queue() def _on_fit_task_end( self, estimator, task_info, data, from_reconstruction_attributes, **kwargs @@ -72,14 +71,20 @@ def _on_fit_task_end( f"{node_info['depth']}_{prev_task_str}{node_info['estimator_name']}_" f"{node_info['task_name']}" ] = node_info["task_id"] - self._fit_log.put(log_item) + self._shared_mem_log.append(log_item) def _on_fit_end(self, estimator, task_info): - while not self._fit_log.empty(): - self._shared_log.append(self._fit_log.get()) + pass def get_logs(self): - self.log = pd.DataFrame(list(self._shared_log)) + """Generate a pandas Dataframe with the logged values. + + Returns + ------- + pandas.DataFrame + Multi-index DataFrame with indices corresponding to the task tree. + """ + self.log = pd.DataFrame(list(self._shared_mem_log)) if not self.log.empty: self.log = self.log.set_index( [col for col in self.log.columns if col != self.metric_func.__name__] diff --git a/sklearn/callback/tests/test_metric_monitor.py b/sklearn/callback/tests/test_metric_monitor.py index 2aca882842b9f..e837c81bfa548 100644 --- a/sklearn/callback/tests/test_metric_monitor.py +++ b/sklearn/callback/tests/test_metric_monitor.py @@ -166,7 +166,7 @@ def test_within_meta_estimator(prefer, metric, metric_params): assert len(log_train) == len(expected_log_train) assert len(log_val) == len(expected_log_val) - assert log_train.equals(expected_log_train) assert np.array_equal(log_train.index.names, expected_log_train.index.names) - assert log_val.equals(expected_log_val) assert np.array_equal(log_val.index.names, expected_log_val.index.names) + assert log_train.equals(expected_log_train) + assert log_val.equals(expected_log_val) From 67d6bafe8977993f8c6d156af131305d705f294d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Paugam?= <35327799+FrancoisPgm@users.noreply.github.com> Date: Tue, 23 Sep 2025 11:35:58 +0200 Subject: [PATCH 6/7] fix conflicts --- sklearn/callback/_metric_monitor.py | 17 ++++++++--------- sklearn/callback/tests/_utils.py | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sklearn/callback/_metric_monitor.py b/sklearn/callback/_metric_monitor.py index 69cadbb5518c7..aca46c7a24c5b 100644 --- a/sklearn/callback/_metric_monitor.py +++ b/sklearn/callback/_metric_monitor.py @@ -6,7 +6,7 @@ import pandas as pd -from sklearn.callback._callback_context import get_task_info_path +from sklearn.callback._callback_context import get_context_path class MetricMonitor: @@ -49,7 +49,7 @@ def _on_fit_begin(self, estimator): ) def _on_fit_task_end( - self, estimator, task_info, data, from_reconstruction_attributes, **kwargs + self, estimator, context, data, from_reconstruction_attributes, **kwargs ): # TODO: add check to verify we're on the innermost level of the fit loop # e.g. for the KMeans @@ -61,16 +61,15 @@ def _on_fit_task_end( y_pred = from_reconstruction_attributes().predict(X) metric_value = self.metric_func(y, y_pred, **self.metric_params) log_item = {self.metric_func.__name__: metric_value} - for depth, node_info in enumerate(get_task_info_path(task_info)): + for depth, ctx in enumerate(get_context_path(context)): prev_task_str = ( - f"{node_info['prev_estimator_name']}_{node_info['prev_task_name']}|" - if node_info["prev_estimator_name"] is not None + f"{ctx.prev_estimator_name}_{ctx.prev_task_name}|" + if ctx.prev_estimator_name is not None else "" ) - log_item[ - f"{depth}_{prev_task_str}{node_info['estimator_name']}_" - f"{node_info['task_name']}" - ] = node_info["task_id"] + log_item[f"{depth}_{prev_task_str}{ctx.estimator_name}_{ctx.task_name}"] = ( + ctx.task_id + ) self._shared_mem_log.append(log_item) def _on_fit_end(self, estimator, task_info): diff --git a/sklearn/callback/tests/_utils.py b/sklearn/callback/tests/_utils.py index 5f070b618f92a..c93b02672af79 100644 --- a/sklearn/callback/tests/_utils.py +++ b/sklearn/callback/tests/_utils.py @@ -197,7 +197,7 @@ def fit(self, X_train=None, y_train=None, X_val=None, y_val=None): def _func(meta_estimator, inner_estimator, data, *, callback_ctx): for i in range(meta_estimator.n_inner): est = clone(inner_estimator) - iter_id = callback_ctx._task_id * meta_estimator.n_inner + i + iter_id = callback_ctx.task_id * meta_estimator.n_inner + i est.intercept = iter_id inner_ctx = callback_ctx.subcontext( From 1f0608a51a903465cf35552f0c4f9dbf6e82564d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Paugam?= <35327799+FrancoisPgm@users.noreply.github.com> Date: Tue, 23 Sep 2025 17:00:49 +0200 Subject: [PATCH 7/7] Distinguish logs from successive runs of fit with the same callback, by returning the logs as a dict with a run identifier as the key --- sklearn/callback/_callback_context.py | 6 ++++ sklearn/callback/_metric_monitor.py | 31 ++++++++++++++----- sklearn/callback/tests/test_metric_monitor.py | 14 +++++++++ 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/sklearn/callback/_callback_context.py b/sklearn/callback/_callback_context.py index c78c8d008429e..93abb9f2c0de8 100644 --- a/sklearn/callback/_callback_context.py +++ b/sklearn/callback/_callback_context.py @@ -1,6 +1,8 @@ # Authors: The scikit-learn developers # SPDX-License-Identifier: BSD-3-Clause +import time + from sklearn.callback import AutoPropagatedCallback # TODO(callbacks): move these explanations into a dedicated user guide. @@ -115,8 +117,10 @@ def _from_estimator(cls, estimator, *, task_name, task_id, max_subtasks=None): # We don't store the estimator in the context to avoid circular references # because the estimator already holds a reference to the context. + new_ctx.init_time = time.time() new_ctx._callbacks = getattr(estimator, "_skl_callbacks", []) new_ctx.estimator_name = estimator.__class__.__name__ + new_ctx.estimator_id = id(estimator) new_ctx.task_name = task_name new_ctx.task_id = task_id new_ctx.parent = None @@ -161,8 +165,10 @@ def _from_parent(cls, parent_context, *, task_name, task_id, max_subtasks=None): """ new_ctx = cls.__new__(cls) + new_ctx.init_time = time.time() new_ctx._callbacks = parent_context._callbacks new_ctx.estimator_name = parent_context.estimator_name + new_ctx.estimator_id = parent_context.estimator_id new_ctx._estimator_depth = parent_context._estimator_depth new_ctx.task_name = task_name new_ctx.task_id = task_id diff --git a/sklearn/callback/_metric_monitor.py b/sklearn/callback/_metric_monitor.py index aca46c7a24c5b..430da043ce9b1 100644 --- a/sklearn/callback/_metric_monitor.py +++ b/sklearn/callback/_metric_monitor.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: BSD-3-Clause import inspect +import time from multiprocessing import Manager import pandas as pd @@ -62,6 +63,13 @@ def _on_fit_task_end( metric_value = self.metric_func(y, y_pred, **self.metric_params) log_item = {self.metric_func.__name__: metric_value} for depth, ctx in enumerate(get_context_path(context)): + if depth == 0: + timestamp = time.strftime( + "%Y-%m-%d_%H:%M:%S", time.localtime(ctx.init_time) + ) + log_item["_run"] = ( + f"{ctx.estimator_name}_{ctx.estimator_id}_{timestamp}" + ) prev_task_str = ( f"{ctx.prev_estimator_name}_{ctx.prev_task_name}|" if ctx.prev_estimator_name is not None @@ -72,7 +80,7 @@ def _on_fit_task_end( ) self._shared_mem_log.append(log_item) - def _on_fit_end(self, estimator, task_info): + def _on_fit_end(self, estimator, context): pass def get_logs(self): @@ -83,9 +91,18 @@ def get_logs(self): pandas.DataFrame Multi-index DataFrame with indices corresponding to the task tree. """ - self.log = pd.DataFrame(list(self._shared_mem_log)) - if not self.log.empty: - self.log = self.log.set_index( - [col for col in self.log.columns if col != self.metric_func.__name__] - ).sort_index() - return self.log.copy() + logs = pd.DataFrame(list(self._shared_mem_log)) + log_dict = {} + if not logs.empty: + for run_id in logs["_run"].unique(): + run_log = logs.loc[logs["_run"] == run_id].copy() + # Drop columns that correspond to other runs task_id and are filled with + # NaNs, and the run column, but always keep the metric column. + columns_to_keep = ~(run_log.isnull().all()) + columns_to_keep["_run"] = False + columns_to_keep[self.metric_func.__name__] = True + run_log = run_log.loc[:, columns_to_keep] + log_dict[run_id] = run_log.set_index( + [col for col in run_log.columns if col != self.metric_func.__name__] + ).sort_index() + return log_dict diff --git a/sklearn/callback/tests/test_metric_monitor.py b/sklearn/callback/tests/test_metric_monitor.py index e837c81bfa548..003f3675541fb 100644 --- a/sklearn/callback/tests/test_metric_monitor.py +++ b/sklearn/callback/tests/test_metric_monitor.py @@ -43,6 +43,10 @@ def test_metric_monitor(EstimatorClass, metric, metric_params): metric_params = metric_params or dict() log_train = callback_train.get_logs() + assert len(log_train) == 1 + run_id_train, log_train = next(iter(log_train.items())) + assert f"{estimator.__class__.__name__}_{id(estimator)}_" in run_id_train + expected_log_train = pd.DataFrame( [ { @@ -62,6 +66,10 @@ def test_metric_monitor(EstimatorClass, metric, metric_params): assert np.array_equal(log_train.index.names, expected_log_train.index.names) log_val = callback_val.get_logs() + assert len(log_val) == 1 + run_id_val, log_val = next(iter(log_val.items())) + assert f"{estimator.__class__.__name__}_{id(estimator)}_" in run_id_val + expected_log_val = pd.DataFrame( [ { @@ -162,8 +170,14 @@ def test_within_meta_estimator(prefer, metric, metric_params): ) log_train = callback_train.get_logs() + assert len(log_train) == 1 + run_id_train, log_train = next(iter(log_train.items())) log_val = callback_val.get_logs() + assert len(log_val) == 1 + run_id_val, log_val = next(iter(log_val.items())) + assert f"{meta_est.__class__.__name__}_{id(meta_est)}_" in run_id_train + assert f"{meta_est.__class__.__name__}_{id(meta_est)}_" in run_id_val assert len(log_train) == len(expected_log_train) assert len(log_val) == len(expected_log_val) assert np.array_equal(log_train.index.names, expected_log_train.index.names)