Skip to content

[ENH] forecasting benchmarking task experiment#176

Draft
fkiraly wants to merge 5 commits intohyperactive-project:mainfrom
fkiraly:fcst-benchmark
Draft

[ENH] forecasting benchmarking task experiment#176
fkiraly wants to merge 5 commits intohyperactive-project:mainfrom
fkiraly:fcst-benchmark

Conversation

@fkiraly
Copy link
Copy Markdown
Collaborator

@fkiraly fkiraly commented Aug 24, 2025

This PR adds a SktimeForecastingTask, which defines a full benchmarking run for a forecaster that is passed later in _evaluate.

This object could be used as a "task" in the sktime ForecastingBenchmark.

Draft for discussion and reviewing the design:

  • it is quite similar to and partially duplicative with SktimeForecastingExperiment which is used in tuning. How should we deal with the similarity and intersection?
    • we could merge into a single class, depending on whether forecaster gets passed or not. Not sure where that leads though
  • is this a possible 1:1 dropin (or almost) for the task object in sktime?
@fkiraly fkiraly added the enhancement New feature or request label Aug 24, 2025
Copy link
Copy Markdown

@arnavk23 arnavk23 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some corrections to your file here -

# copyright: hyperactive developers, MIT License (see LICENSE file)

import numpy as np

from hyperactive.base import BaseExperiment


class SktimeForecastingTask(BaseExperiment):
    """Experiment adapter for forecast backtesting benchmark run.

    This class is used to perform backtesting experiments using a given
    sktime forecaster. It allows for hyperparameter tuning and evaluation of
    the model's performance.

    The score returned is the summary backtesting score,
    of applying ``sktime`` ``evaluate`` to an estimator passed as ``forecaster``
    in the ``score`` ``params``.

    The backtesting performed is specified by the ``cv`` parameter,
    and the scoring metric is specified by the ``scoring`` parameter.
    The ``X`` and ``y`` parameters are the input data and target values,
    which are used in fit/predict cross-validation.

    Differs from ``SktimeForecastingExperiment`` in that ``forecaster``
    is passed as a parameter directly to ``score`` and not to ``__init__``.
    """

    _tags = {
        "authors": "fkiraly",
        "maintainers": "fkiraly",
        "python_dependencies": "sktime",  # python dependencies
    }

    def __init__(
        self,
        cv,
        y,
        X=None,
        strategy="refit",
        scoring=None,
        error_score=np.nan,
        cv_X=None,
        backend=None,
        backend_params=None,
    ):
        self.X = X
        self.y = y
        self.strategy = strategy
        self.scoring = scoring
        self.cv = cv
        self.error_score = error_score
        self.cv_X = cv_X
        self.backend = backend
        self.backend_params = backend_params

        super().__init__()

        if scoring is None:
            from sktime.performance_metrics.forecasting import (
                MeanAbsolutePercentageError,
            )

            self._scoring = MeanAbsolutePercentageError(symmetric=True)
        else:
            self._scoring = scoring

        # Set a boolean tag indicating whether higher is better.
        # If the metric indicates lower_is_better, set False; otherwise True.
        try:
            lower_is_better = (
                True
                if scoring is None
                else bool(self._scoring.get_tag("lower_is_better", False))
            )
        except Exception:
            # If metric doesn't expose get_tag, default to False (lower is better)
            lower_is_better = True if scoring is None else False

        higher_is_better = not lower_is_better
        # Use a conventional boolean tag for the rest of the codebase
        try:
            self.set_tags(**{"higher_is_better": higher_is_better})
        except Exception:
            # If set_tags is not available or fails, ignore tagging but continue.
            pass

    def _paramnames(self):
        """Return the parameter names of the search."""
        return ["forecaster"]

    def _evaluate(self, params):
        """Evaluate the parameters.

        Parameters
        ----------
        params : dict with string keys
            Parameters to evaluate.

        Returns
        -------
        float
            The value of the parameters as per evaluation.
        dict
            Additional metadata about the search.
        """
        from sktime.forecasting.model_evaluation import evaluate

        forecaster = params.get("forecaster", None)
        if forecaster is None:
            raise ValueError("SktimeForecastingTask._evaluate requires params to include a 'forecaster' entry")

        try:
            results = evaluate(
                forecaster,
                cv=self.cv,
                y=self.y,
                X=self.X,
                strategy=self.strategy,
                scoring=self._scoring,
                error_score=self.error_score,
                cv_X=self.cv_X,
                backend=self.backend,
                backend_params=self.backend_params,
            )
        except Exception as e:
            # If user explicitly wants exceptions to propagate:
            if self.error_score == "raise":
                raise
            # Otherwise return error_score and capture the exception message
            return self.error_score, {"error": str(e)}

        # Determine scoring column name robustly
        scoring_name = getattr(self._scoring, "name", None) or self._scoring.__class__.__name__
        result_name = f"test_{scoring_name}"

        add_info = {"results": results}

        # Results handling robust to DataFrame-like or dict-like outputs
        try:
            # If results is a pandas DataFrame-like object:
            if hasattr(results, "columns"):
                if result_name in results.columns:
                    res_values = results[result_name]
                else:
                    # find a test_* column as fallback
                    test_cols = [c for c in results.columns if str(c).startswith("test_")]
                    if test_cols:
                        res_values = results[test_cols[0]]
                        add_info["warning"] = (
                            f"expected column '{result_name}' not found; using '{test_cols[0]}' instead"
                        )
                    else:
                        raise ValueError(f"No 'test_*' column found in evaluate results; expected '{result_name}'")
            else:
                # dict-like fallback
                if result_name in results:
                    res_values = results[result_name]
                else:
                    test_keys = [k for k in results.keys() if str(k).startswith("test_")]
                    if test_keys:
                        res_values = results[test_keys[0]]
                        add_info["warning"] = (
                            f"expected key '{result_name}' not found; using '{test_keys[0]}' instead"
                        )
                    else:
                        raise ValueError(f"No 'test_*' key found in evaluate results; expected '{result_name}'")
        except Exception as e:
            # Preserve original exception info
            if self.error_score == "raise":
                raise
            return self.error_score, {"error": str(e), **add_info}

        # Compute scalar summary result
        try:
            res_float = float(np.nanmean(res_values))
        except Exception:
            # Last-resort attempt: convert to numpy array and take mean
            try:
                res_float = float(np.nanmean(np.asarray(res_values)))
            except Exception as e:
                if self.error_score == "raise":
                    raise
                return self.error_score, {"error": f"Could not compute mean of results: {e}", **add_info}

        return res_float, add_info

    @classmethod
    def get_test_params(cls, parameter_set="default"):
        """Return testing parameter settings for the skbase object."""
        from sktime.datasets import load_airline, load_longley
        from sktime.split import ExpandingWindowSplitter

        y = load_airline()
        params0 = {
            "cv": ExpandingWindowSplitter(initial_window=36, step_length=12, fh=12),
            "y": y,
        }

        from sktime.performance_metrics.forecasting import MeanAbsolutePercentageError

        y, X = load_longley()
        params1 = {
            "cv": ExpandingWindowSplitter(initial_window=3, step_length=3, fh=1),
            "y": y,
            "X": X,
            "scoring": MeanAbsolutePercentageError(symmetric=False),
        }

        return [params0, params1]

    @classmethod
    def _get_score_params(cls):
        """Return settings for testing score/evaluate functions. Used in tests only."""
        from sktime.forecasting.naive import NaiveForecaster

        val0 = {"forecaster": NaiveForecaster(strategy="last")}
        val1 = {"forecaster": NaiveForecaster(strategy="last")}
        return [val0, val1]
@fkiraly
Copy link
Copy Markdown
Collaborator Author

fkiraly commented Nov 22, 2025

@arnavk23, can you kindly explain what you corrected and why?

@arnavk23
Copy link
Copy Markdown

arnavk23 commented Nov 22, 2025

@arnavk23, can you kindly explain what you corrected and why?

  1. Added validation for forecaster in params
    The original version assumed params["forecaster"] always existed.
    I added an explicit check and a clear error message because missing/incorrect parameters otherwise raise cryptic errors deeper inside sktime.evaluate.

  2. Made scoring metric handling more robust
    The previous code assumed that any scoring object implements get_tag("lower_is_better").
    I wrapped this in a try/except and added correct defaults for both cases (scoring=None or custom metrics).

  3. Safely applied higher_is_better tag
    set_tags() was called without handling the case where it fails or is not supported.

  4. Improved parsing of the output from sktime.evaluate()
    The previous implementation assumed: the result is always a DataFrame
    the scoring column name is always exactly "test_<scoring.name>"
    I added: support for both DataFrame-like and dict-like outputs fallback to the first available test_* column if the expected name isn’t present, warnings when fallback happens.

  5. Better error handling during evaluate
    Previously, any exception inside evaluate() could crash or create inconsistent behavior.
    Now: error_score="raise" preserves the expected behavior
    otherwise returns (error_score, {"error": })

  6. Robust conversion of results to a scalar
    The earlier implementation assumed you can always do float(results.mean()).
    I added: use of np.nanmean
    fallback to np.asarray if needed
    structured error reporting if even that fails

@fkiraly
Copy link
Copy Markdown
Collaborator Author

fkiraly commented Nov 28, 2025

@arnavk23, is this AI generated?

@arnavk23
Copy link
Copy Markdown

@arnavk23, is this AI generated?

Yes the remark is AI-generated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

2 participants