Document for sklearn_ensemble_cv

sklearn_ensemble_cv.ensemble API documentation

Module sklearn_ensemble_cv.ensemble

Expand source code
from typing import List
from sklearn.ensemble import BaggingRegressor
from sklearn_ensemble_cv.utils import risk_estimate, degree_of_freedom, avg_sq_err

from joblib import Parallel, delayed
n_jobs = 16

import numpy as np
import pandas as pd
from functools import reduce
import itertools

import warnings
warnings.filterwarnings('ignore') 



class Ensemble(BaggingRegressor):
    '''
    Ensemble class is built on top of sklearn.ensemble.BaggingRegressor.
    It provides additional methods for computing ECV estimates.
    '''
    def __init__(self, **kwargs):
        super(BaggingRegressor, self).__init__(**kwargs)

    # def get_coef(self, M=-1):
    #     if M < 0:
    #         M = self.n_estimators
    #     coef_ = np.mean(np.array([self.estimators_[i].coef_ for i in range(M)]), axis=0)
    #     return coef_

    def predict_individual(self: BaggingRegressor, X: np.ndarray, M: int=-1, n_jobs: int=-1, verbose: bool=0) -> np.ndarray:
        '''
        Predicts the target values for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
            regr : BaggingRegressor
                The BaggingRegressor model to use for prediction.
            X : np.ndarray
                [n, p] The input data to predict target values for.

        Returns
        ----------
            Y_hat : np.ndarray
                [n, M] The predicted target values of all $M$ estimators for the input data.
        '''
        if M < 0:
            M = self.n_estimators
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            Y_hat = parallel(
                delayed(lambda reg,X: reg.predict(X[:,features]))(reg, X)
                for reg,features in zip(self.estimators_[:M],self.estimators_features_[:M])
            )
        Y_hat = np.stack(Y_hat, axis=-1)
        return Y_hat
    

    def compute_risk(
            self, X, Y, M_test=None, return_df=False, 
            avg=True, n_jobs=-1, verbose=0, **kwargs_est):
        if M_test is None:
            M_test = self.n_estimators
        if M_test>self.n_estimators:
            raise ValueError('The ensemble size M must be less than or equal to the number of estimators in the BaggingRegressor model.')        

        Y_hat = self.predict_individual(X, M_test, n_jobs, verbose)
        if Y_hat.ndim>2:
            Y_hat = Y_hat.reshape((-1,Y_hat.shape[-1]))
        Y = Y.reshape((*Y_hat.shape[:-1],1))

        if avg:
            err_eval = avg_sq_err(Y_hat - Y)
        else:
            Y_hat = np.cumsum(Y_hat, axis=1) / np.arange(1, M_test+1)
            err_eval = (Y_hat - Y)**2
        risk = risk_estimate(err_eval, axis=0, **kwargs_est)

        if return_df:
            df = pd.DataFrame({'M':np.arange(1,M_test+1), 'risk':risk})
            return df
        else:
            return risk
    
    
    def compute_ecv_estimate(self, X_train, Y_train, M_test=None, M0=None, return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the ECV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ...] The target values of the input data.
        M_test : int or np.ndarray
            The maximum ensemble size of the ECV estimate.
        M0 : int, optional
            The number of estimators to use for the OOB estimate. If None, M0 is set to the number of estimators in the BaggingRegressor model.
        return_df : bool, optional
            If True, returns the ECV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_ecv : np.ndarray or pandas.DataFrame
            [M_test, ] The ECV estimate for each ensemble size in M_test.
        '''
        if M_test is None:
            M_test = self.n_estimators
        if M0 is None:
            M0 = self.n_estimators
        if M0<2:
            raise ValueError('The ensemble size M or M0 must be at least 2.')
        if np.isscalar(M_test):
            M_test = np.arange(1,M_test+1)
        else:
            M_test = np.array(M_test)
        ids_list = self.estimators_samples_[:M0]
        Y_hat = self.predict_individual(X_train, M0, n_jobs)
        Y_train = Y_train.reshape((*Y_hat.shape[:-1],1))
        dev_eval = Y_hat - Y_train
        err_eval = dev_eval**2
        
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            res_1 = parallel(
                delayed(lambda j:risk_estimate(
                    np.delete(err_eval[...,j], ids_list[j], axis=0).reshape(-1,1), **kwargs_est)
                       )(j)
                for j in np.arange(M0)
            )
            res_2 = parallel(
                delayed(lambda i,j:risk_estimate(
                    (np.mean(
                        np.delete(dev_eval[...,[i,j]], 
                                  np.union1d(ids_list[i], ids_list[j]), axis=0), axis=-1)**2).reshape(-1,1),
                    **kwargs_est
                ))(i,j)
                for i,j in itertools.combinations(np.arange(M0), 2)
            )
                    
        risk_ecv_1 = np.nanmean(res_1)
        risk_ecv_2 = np.nanmean(res_2)
        risk_ecv = - (1-2/M_test) * risk_ecv_1 + 2*(1-1/M_test) * risk_ecv_2
        if return_df:
            df = pd.DataFrame({'M':M_test, 'estimate':risk_ecv})
            return df
        else:
            return risk_ecv
        
    
    def extrapolate(self, risk, M_test=None):
        if M_test is None:
            M_test = self.n_estimators
        M0 = risk.shape[0]        
        if np.isscalar(M_test):
            M_test = np.arange(1,M_test+1)
        else:
            M_test = np.array(M_test)
        if M0 <2 or np.max(M_test) < M0:
            raise ValueError('The ensemble size M or M0 must be at least 2.')
        
        risk_1 = risk[0]
        risk_inf = np.sum(risk - risk_1/np.arange(1, M0+1)) / np.sum(1 - 1/np.arange(1, M0+1))
        
        risk_ecv = (1/M_test) * risk_1 + (1-1/M_test) * risk_inf
        risk_ecv[:M0] = risk
        return risk_ecv
        

    def compute_gcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the naive GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of GCV estimate to compute. Can be either 'full' (the naive GCV using full observations) or 
            'union' (the naive GCV using training observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The GCV estimate for each ensemble size in M_test.
        '''
        if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
            raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        elif Y_train.ndim>2:
            raise NotImplementedError('CGCV for multi-task regression not implemented yet.')
        if M is None:
            M = self.n_estimators        
        M_arr = np.arange(M)
        ids_list = self.estimators_samples_
        Y_hat = self.predict_individual(X_train, M, n_jobs)        
        n = X_train.shape[0]

        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            dof = parallel(
                delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]])
                       )(j)
                for j in M_arr
            )
            dof = np.mean(dof)

        if type=='full':
            err_eval = avg_sq_err(Y_hat - Y_train)
            err_train = np.mean(err_eval, axis=0)
            deno = (1 - dof/n)**2
            risk_gcv = err_train / deno
        else:
            Y_hat = np.cumsum(Y_hat, axis=1) / (M_arr+1)
            err_eval = (Y_hat - Y_train)**2
            ids_union_list = [reduce(np.union1d, ids_list[:j+1]) for j in M_arr]
            n_ids = np.array([len(ids) for ids in ids_union_list])
            err_train = np.array([np.mean(err_eval[ids_union_list[j],j]) for j in M_arr])
            deno = (1 - dof/n_ids)**2
            risk_gcv = err_train / deno
        
        if return_df:
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_gcv, 'err_train':err_train, 'deno':deno})
            return df
        else:
            return risk_gcv
        
    

    def compute_cgcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 
            'ovlp' (using overlapping observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The CGCV estimate for each ensemble size in M_test.
        '''
        if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
            # raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
            return self._compute_cgcv_estimate_general(X_train, Y_train, M, type, return_df, n_jobs, verbose, **kwargs_est)
            
        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        if M is None:
            M = self.n_estimators            
        M_arr = np.arange(M)
        ids_list = self.estimators_samples_
        Y_hat = self.predict_individual(X_train, M, n_jobs)

        n, p = X_train.shape
        if hasattr(self.estimators_[0], 'fit_intercept') and self.estimators_[0].fit_intercept:
            p += 1
        phi = p/n
        k = self.max_samples if isinstance(self.max_samples, int) else int(n * self.max_samples)
        psi = p/k

        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            dof = parallel(
                delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]]))(j)
                for j in M_arr
            )
            dof = np.mean(dof)

        err_train = (Y_hat-Y_train)**2
        if type=='full':
            correct_term = np.mean(err_train) / (1 - 2*dof/n + dof**2/(k*n))
        elif type=='ovlp':
            correct_term = np.mean([np.mean(err_train[ids_list[j],j]) for j in M_arr]) / (1 - dof/k)**2
        else:
            raise ValueError('The type must be either "full" or "ovlp".')
        
        err_eval = avg_sq_err(Y_hat - Y_train)
        err_train = np.mean(err_eval, axis=0)
        deno = (1 - dof/n)**2
        C = 1/ (n/dof - 1)**2 / (M_arr+1) * (psi/phi - 1) * correct_term
        risk_cgcv = err_train / deno - C
        
        if return_df:
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_cgcv, 'err_train':err_train, 'deno':deno, 'C':C})
            return df
        else:
            return risk_cgcv



    def _compute_cgcv_estimate_general(
            self, X_train, Y_train, M=None, type='full',
            return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 
            'ovlp' (using overlapping observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The CGCV estimate for each ensemble size in M_test.
        '''
        if not hasattr(self.estimator_.__class__, 'get_gcv_input'):
            raise ValueError(
                "Explicit GCV calculation is only implemented for Ridge, Lasso, and ElasticNet regression.\n"
                "For estimator class '{estimator_class.__name__}', it needs to have 'get_gcv_input' "
                "method to compute the GCV input. The function takes training samples (X,y) in [n,p] by [n,1] "
                "as input and return a tuple of:\n"
                "(1) [n, ] residuals;\n"
                "(2) [n, ] derivative of loss evaluated at residuals;\n"
                "(3) [1, ] degrees of freedom;\n"
                "(4) [1, ] the trace of generalized smoothing matrix tr_V.")

        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        elif Y_train.ndim>2:
            raise NotImplementedError('CGCV for multi-task regression not implemented yet.')
        if M is None:
            M = self.n_estimators            
        M_arr = np.arange(M)
        ids_list = [np.sort(ids) for ids in self.estimators_samples_]
        Y_hat = self.predict_individual(X_train, M, n_jobs)

        n, p = X_train.shape
        if hasattr(self.estimators_[0], 'fit_intercept') and self.estimators_[0].fit_intercept:
            p += 1
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            res = parallel(
                delayed(
                    lambda j:self.estimators_[j].get_gcv_input(X_train[ids_list[j]], Y_train[ids_list[j]])
                        )(j) for j in M_arr
            )
            r, loss_p, dof, tr_V = list(zip(*res))
            r = np.array(r).T
            loss_p = np.array(loss_p).T
            dof = np.array(dof)
            tr_V = np.array(tr_V)
        
        tmp = r + (dof / tr_V) [None,:] * loss_p
        
        if type=='full':
            r_full = Y_train - Y_hat            
            tmp_full = np.zeros_like(r_full)
            for j in range(tmp.shape[1]):
                tmp_full[ids_list[j],j] = tmp[:,j]

            del r, tmp

            def _get_est(i,j):
                return np.mean(np.where(np.isin(np.arange(n), ids_list[i]), tmp_full[:,i], r_full[:,i]) * np.where(np.isin(np.arange(n), ids_list[j]), tmp_full[:,j], r_full[:,j]))
        elif type=='ovlp':        
            def _get_est(i,j):
                ids = np.intersect1d(ids_list[i], ids_list[j])
                if len(ids)>0:
                    return np.mean(tmp[np.isin(ids_list[i],ids),i] * tmp[np.isin(ids_list[j],ids),j])
                else:
                    return np.nan                        
        else:
            raise ValueError('The type must be either "full" or "ovlp".')

        risk_M1 = np.mean(Parallel(n_jobs=n_jobs-2, verbose=verbose)(delayed(_get_est)(i=i, j=i) for i in M_arr))
        if M>1:
            risk_Minf = np.nanmean(Parallel(n_jobs=n_jobs-2, verbose=verbose)(delayed(_get_est)(i=i, j=j) for j in M_arr for i in M_arr if j<i))
            risk_cgcv = (1/(1+M_arr)) * risk_M1 + (1-1/(1+M_arr)) * risk_Minf
        else:
            risk_cgcv = np.append([risk_M1], np.ones(M-1)*np.nan)

        if return_df:
            err_eval = avg_sq_err(Y_train - Y_hat)
            err_train = np.mean(err_eval, axis=0)
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_cgcv, 'err_train':err_train})
            return df
        else:
            return risk_cgcv            
    

Classes

class Ensemble (**kwargs)

Ensemble class is built on top of sklearn.ensemble.BaggingRegressor. It provides additional methods for computing ECV estimates.

Expand source code
class Ensemble(BaggingRegressor):
    '''
    Ensemble class is built on top of sklearn.ensemble.BaggingRegressor.
    It provides additional methods for computing ECV estimates.
    '''
    def __init__(self, **kwargs):
        super(BaggingRegressor, self).__init__(**kwargs)

    # def get_coef(self, M=-1):
    #     if M < 0:
    #         M = self.n_estimators
    #     coef_ = np.mean(np.array([self.estimators_[i].coef_ for i in range(M)]), axis=0)
    #     return coef_

    def predict_individual(self: BaggingRegressor, X: np.ndarray, M: int=-1, n_jobs: int=-1, verbose: bool=0) -> np.ndarray:
        '''
        Predicts the target values for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
            regr : BaggingRegressor
                The BaggingRegressor model to use for prediction.
            X : np.ndarray
                [n, p] The input data to predict target values for.

        Returns
        ----------
            Y_hat : np.ndarray
                [n, M] The predicted target values of all $M$ estimators for the input data.
        '''
        if M < 0:
            M = self.n_estimators
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            Y_hat = parallel(
                delayed(lambda reg,X: reg.predict(X[:,features]))(reg, X)
                for reg,features in zip(self.estimators_[:M],self.estimators_features_[:M])
            )
        Y_hat = np.stack(Y_hat, axis=-1)
        return Y_hat
    

    def compute_risk(
            self, X, Y, M_test=None, return_df=False, 
            avg=True, n_jobs=-1, verbose=0, **kwargs_est):
        if M_test is None:
            M_test = self.n_estimators
        if M_test>self.n_estimators:
            raise ValueError('The ensemble size M must be less than or equal to the number of estimators in the BaggingRegressor model.')        

        Y_hat = self.predict_individual(X, M_test, n_jobs, verbose)
        if Y_hat.ndim>2:
            Y_hat = Y_hat.reshape((-1,Y_hat.shape[-1]))
        Y = Y.reshape((*Y_hat.shape[:-1],1))

        if avg:
            err_eval = avg_sq_err(Y_hat - Y)
        else:
            Y_hat = np.cumsum(Y_hat, axis=1) / np.arange(1, M_test+1)
            err_eval = (Y_hat - Y)**2
        risk = risk_estimate(err_eval, axis=0, **kwargs_est)

        if return_df:
            df = pd.DataFrame({'M':np.arange(1,M_test+1), 'risk':risk})
            return df
        else:
            return risk
    
    
    def compute_ecv_estimate(self, X_train, Y_train, M_test=None, M0=None, return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the ECV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ...] The target values of the input data.
        M_test : int or np.ndarray
            The maximum ensemble size of the ECV estimate.
        M0 : int, optional
            The number of estimators to use for the OOB estimate. If None, M0 is set to the number of estimators in the BaggingRegressor model.
        return_df : bool, optional
            If True, returns the ECV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_ecv : np.ndarray or pandas.DataFrame
            [M_test, ] The ECV estimate for each ensemble size in M_test.
        '''
        if M_test is None:
            M_test = self.n_estimators
        if M0 is None:
            M0 = self.n_estimators
        if M0<2:
            raise ValueError('The ensemble size M or M0 must be at least 2.')
        if np.isscalar(M_test):
            M_test = np.arange(1,M_test+1)
        else:
            M_test = np.array(M_test)
        ids_list = self.estimators_samples_[:M0]
        Y_hat = self.predict_individual(X_train, M0, n_jobs)
        Y_train = Y_train.reshape((*Y_hat.shape[:-1],1))
        dev_eval = Y_hat - Y_train
        err_eval = dev_eval**2
        
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            res_1 = parallel(
                delayed(lambda j:risk_estimate(
                    np.delete(err_eval[...,j], ids_list[j], axis=0).reshape(-1,1), **kwargs_est)
                       )(j)
                for j in np.arange(M0)
            )
            res_2 = parallel(
                delayed(lambda i,j:risk_estimate(
                    (np.mean(
                        np.delete(dev_eval[...,[i,j]], 
                                  np.union1d(ids_list[i], ids_list[j]), axis=0), axis=-1)**2).reshape(-1,1),
                    **kwargs_est
                ))(i,j)
                for i,j in itertools.combinations(np.arange(M0), 2)
            )
                    
        risk_ecv_1 = np.nanmean(res_1)
        risk_ecv_2 = np.nanmean(res_2)
        risk_ecv = - (1-2/M_test) * risk_ecv_1 + 2*(1-1/M_test) * risk_ecv_2
        if return_df:
            df = pd.DataFrame({'M':M_test, 'estimate':risk_ecv})
            return df
        else:
            return risk_ecv
        
    
    def extrapolate(self, risk, M_test=None):
        if M_test is None:
            M_test = self.n_estimators
        M0 = risk.shape[0]        
        if np.isscalar(M_test):
            M_test = np.arange(1,M_test+1)
        else:
            M_test = np.array(M_test)
        if M0 <2 or np.max(M_test) < M0:
            raise ValueError('The ensemble size M or M0 must be at least 2.')
        
        risk_1 = risk[0]
        risk_inf = np.sum(risk - risk_1/np.arange(1, M0+1)) / np.sum(1 - 1/np.arange(1, M0+1))
        
        risk_ecv = (1/M_test) * risk_1 + (1-1/M_test) * risk_inf
        risk_ecv[:M0] = risk
        return risk_ecv
        

    def compute_gcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the naive GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of GCV estimate to compute. Can be either 'full' (the naive GCV using full observations) or 
            'union' (the naive GCV using training observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The GCV estimate for each ensemble size in M_test.
        '''
        if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
            raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        elif Y_train.ndim>2:
            raise NotImplementedError('CGCV for multi-task regression not implemented yet.')
        if M is None:
            M = self.n_estimators        
        M_arr = np.arange(M)
        ids_list = self.estimators_samples_
        Y_hat = self.predict_individual(X_train, M, n_jobs)        
        n = X_train.shape[0]

        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            dof = parallel(
                delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]])
                       )(j)
                for j in M_arr
            )
            dof = np.mean(dof)

        if type=='full':
            err_eval = avg_sq_err(Y_hat - Y_train)
            err_train = np.mean(err_eval, axis=0)
            deno = (1 - dof/n)**2
            risk_gcv = err_train / deno
        else:
            Y_hat = np.cumsum(Y_hat, axis=1) / (M_arr+1)
            err_eval = (Y_hat - Y_train)**2
            ids_union_list = [reduce(np.union1d, ids_list[:j+1]) for j in M_arr]
            n_ids = np.array([len(ids) for ids in ids_union_list])
            err_train = np.array([np.mean(err_eval[ids_union_list[j],j]) for j in M_arr])
            deno = (1 - dof/n_ids)**2
            risk_gcv = err_train / deno
        
        if return_df:
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_gcv, 'err_train':err_train, 'deno':deno})
            return df
        else:
            return risk_gcv
        
    

    def compute_cgcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 
            'ovlp' (using overlapping observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The CGCV estimate for each ensemble size in M_test.
        '''
        if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
            # raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
            return self._compute_cgcv_estimate_general(X_train, Y_train, M, type, return_df, n_jobs, verbose, **kwargs_est)
            
        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        if M is None:
            M = self.n_estimators            
        M_arr = np.arange(M)
        ids_list = self.estimators_samples_
        Y_hat = self.predict_individual(X_train, M, n_jobs)

        n, p = X_train.shape
        if hasattr(self.estimators_[0], 'fit_intercept') and self.estimators_[0].fit_intercept:
            p += 1
        phi = p/n
        k = self.max_samples if isinstance(self.max_samples, int) else int(n * self.max_samples)
        psi = p/k

        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            dof = parallel(
                delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]]))(j)
                for j in M_arr
            )
            dof = np.mean(dof)

        err_train = (Y_hat-Y_train)**2
        if type=='full':
            correct_term = np.mean(err_train) / (1 - 2*dof/n + dof**2/(k*n))
        elif type=='ovlp':
            correct_term = np.mean([np.mean(err_train[ids_list[j],j]) for j in M_arr]) / (1 - dof/k)**2
        else:
            raise ValueError('The type must be either "full" or "ovlp".')
        
        err_eval = avg_sq_err(Y_hat - Y_train)
        err_train = np.mean(err_eval, axis=0)
        deno = (1 - dof/n)**2
        C = 1/ (n/dof - 1)**2 / (M_arr+1) * (psi/phi - 1) * correct_term
        risk_cgcv = err_train / deno - C
        
        if return_df:
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_cgcv, 'err_train':err_train, 'deno':deno, 'C':C})
            return df
        else:
            return risk_cgcv



    def _compute_cgcv_estimate_general(
            self, X_train, Y_train, M=None, type='full',
            return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 
            'ovlp' (using overlapping observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The CGCV estimate for each ensemble size in M_test.
        '''
        if not hasattr(self.estimator_.__class__, 'get_gcv_input'):
            raise ValueError(
                "Explicit GCV calculation is only implemented for Ridge, Lasso, and ElasticNet regression.\n"
                "For estimator class '{estimator_class.__name__}', it needs to have 'get_gcv_input' "
                "method to compute the GCV input. The function takes training samples (X,y) in [n,p] by [n,1] "
                "as input and return a tuple of:\n"
                "(1) [n, ] residuals;\n"
                "(2) [n, ] derivative of loss evaluated at residuals;\n"
                "(3) [1, ] degrees of freedom;\n"
                "(4) [1, ] the trace of generalized smoothing matrix tr_V.")

        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        elif Y_train.ndim>2:
            raise NotImplementedError('CGCV for multi-task regression not implemented yet.')
        if M is None:
            M = self.n_estimators            
        M_arr = np.arange(M)
        ids_list = [np.sort(ids) for ids in self.estimators_samples_]
        Y_hat = self.predict_individual(X_train, M, n_jobs)

        n, p = X_train.shape
        if hasattr(self.estimators_[0], 'fit_intercept') and self.estimators_[0].fit_intercept:
            p += 1
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            res = parallel(
                delayed(
                    lambda j:self.estimators_[j].get_gcv_input(X_train[ids_list[j]], Y_train[ids_list[j]])
                        )(j) for j in M_arr
            )
            r, loss_p, dof, tr_V = list(zip(*res))
            r = np.array(r).T
            loss_p = np.array(loss_p).T
            dof = np.array(dof)
            tr_V = np.array(tr_V)
        
        tmp = r + (dof / tr_V) [None,:] * loss_p
        
        if type=='full':
            r_full = Y_train - Y_hat            
            tmp_full = np.zeros_like(r_full)
            for j in range(tmp.shape[1]):
                tmp_full[ids_list[j],j] = tmp[:,j]

            del r, tmp

            def _get_est(i,j):
                return np.mean(np.where(np.isin(np.arange(n), ids_list[i]), tmp_full[:,i], r_full[:,i]) * np.where(np.isin(np.arange(n), ids_list[j]), tmp_full[:,j], r_full[:,j]))
        elif type=='ovlp':        
            def _get_est(i,j):
                ids = np.intersect1d(ids_list[i], ids_list[j])
                if len(ids)>0:
                    return np.mean(tmp[np.isin(ids_list[i],ids),i] * tmp[np.isin(ids_list[j],ids),j])
                else:
                    return np.nan                        
        else:
            raise ValueError('The type must be either "full" or "ovlp".')

        risk_M1 = np.mean(Parallel(n_jobs=n_jobs-2, verbose=verbose)(delayed(_get_est)(i=i, j=i) for i in M_arr))
        if M>1:
            risk_Minf = np.nanmean(Parallel(n_jobs=n_jobs-2, verbose=verbose)(delayed(_get_est)(i=i, j=j) for j in M_arr for i in M_arr if j<i))
            risk_cgcv = (1/(1+M_arr)) * risk_M1 + (1-1/(1+M_arr)) * risk_Minf
        else:
            risk_cgcv = np.append([risk_M1], np.ones(M-1)*np.nan)

        if return_df:
            err_eval = avg_sq_err(Y_train - Y_hat)
            err_train = np.mean(err_eval, axis=0)
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_cgcv, 'err_train':err_train})
            return df
        else:
            return risk_cgcv            

Ancestors

  • sklearn.ensemble._bagging.BaggingRegressor
  • sklearn.base.RegressorMixin
  • sklearn.ensemble._bagging.BaseBagging
  • sklearn.ensemble._base.BaseEnsemble
  • sklearn.base.MetaEstimatorMixin
  • sklearn.base.BaseEstimator
  • sklearn.utils._metadata_requests._MetadataRequester

Methods

def predict_individual(self: sklearn.ensemble._bagging.BaggingRegressor, X: numpy.ndarray, M: int = -1, n_jobs: int = -1, verbose: bool = 0) ‑> numpy.ndarray

Predicts the target values for the given input data using the provided BaggingRegressor model.

Parameters

regr : BaggingRegressor
    The BaggingRegressor model to use for prediction.
X : np.ndarray
    [n, p] The input data to predict target values for.

Returns

Y_hat : np.ndarray
    [n, M] The predicted target values of all $M$ estimators for the input data.
Expand source code
def predict_individual(self: BaggingRegressor, X: np.ndarray, M: int=-1, n_jobs: int=-1, verbose: bool=0) -> np.ndarray:
    '''
    Predicts the target values for the given input data using the provided BaggingRegressor model.

    Parameters
    ----------
        regr : BaggingRegressor
            The BaggingRegressor model to use for prediction.
        X : np.ndarray
            [n, p] The input data to predict target values for.

    Returns
    ----------
        Y_hat : np.ndarray
            [n, M] The predicted target values of all $M$ estimators for the input data.
    '''
    if M < 0:
        M = self.n_estimators
    with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
        Y_hat = parallel(
            delayed(lambda reg,X: reg.predict(X[:,features]))(reg, X)
            for reg,features in zip(self.estimators_[:M],self.estimators_features_[:M])
        )
    Y_hat = np.stack(Y_hat, axis=-1)
    return Y_hat
def compute_risk(self, X, Y, M_test=None, return_df=False, avg=True, n_jobs=-1, verbose=0, **kwargs_est)
Expand source code
def compute_risk(
        self, X, Y, M_test=None, return_df=False, 
        avg=True, n_jobs=-1, verbose=0, **kwargs_est):
    if M_test is None:
        M_test = self.n_estimators
    if M_test>self.n_estimators:
        raise ValueError('The ensemble size M must be less than or equal to the number of estimators in the BaggingRegressor model.')        

    Y_hat = self.predict_individual(X, M_test, n_jobs, verbose)
    if Y_hat.ndim>2:
        Y_hat = Y_hat.reshape((-1,Y_hat.shape[-1]))
    Y = Y.reshape((*Y_hat.shape[:-1],1))

    if avg:
        err_eval = avg_sq_err(Y_hat - Y)
    else:
        Y_hat = np.cumsum(Y_hat, axis=1) / np.arange(1, M_test+1)
        err_eval = (Y_hat - Y)**2
    risk = risk_estimate(err_eval, axis=0, **kwargs_est)

    if return_df:
        df = pd.DataFrame({'M':np.arange(1,M_test+1), 'risk':risk})
        return df
    else:
        return risk
def compute_ecv_estimate(self, X_train, Y_train, M_test=None, M0=None, return_df=False, n_jobs=-1, verbose=0, **kwargs_est)

Computes the ECV estimate for the given input data using the provided BaggingRegressor model.

Parameters

X_train : np.ndarray
[n, p] The input covariates.
Y_train : np.ndarray
[n, …] The target values of the input data.
M_test : int or np.ndarray
The maximum ensemble size of the ECV estimate.
M0 : int, optional
The number of estimators to use for the OOB estimate. If None, M0 is set to the number of estimators in the BaggingRegressor model.
return_df : bool, optional
If True, returns the ECV estimate as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
kwargs_est : dict
Additional keyword arguments for the risk estimate.

Returns

risk_ecv : np.ndarray or pandas.DataFrame
[M_test, ] The ECV estimate for each ensemble size in M_test.
Expand source code
def compute_ecv_estimate(self, X_train, Y_train, M_test=None, M0=None, return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
    '''
    Computes the ECV estimate for the given input data using the provided BaggingRegressor model.

    Parameters
    ----------
    X_train : np.ndarray
        [n, p] The input covariates.
    Y_train : np.ndarray
        [n, ...] The target values of the input data.
    M_test : int or np.ndarray
        The maximum ensemble size of the ECV estimate.
    M0 : int, optional
        The number of estimators to use for the OOB estimate. If None, M0 is set to the number of estimators in the BaggingRegressor model.
    return_df : bool, optional
        If True, returns the ECV estimate as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    kwargs_est : dict
        Additional keyword arguments for the risk estimate.

    Returns
    --------
    risk_ecv : np.ndarray or pandas.DataFrame
        [M_test, ] The ECV estimate for each ensemble size in M_test.
    '''
    if M_test is None:
        M_test = self.n_estimators
    if M0 is None:
        M0 = self.n_estimators
    if M0<2:
        raise ValueError('The ensemble size M or M0 must be at least 2.')
    if np.isscalar(M_test):
        M_test = np.arange(1,M_test+1)
    else:
        M_test = np.array(M_test)
    ids_list = self.estimators_samples_[:M0]
    Y_hat = self.predict_individual(X_train, M0, n_jobs)
    Y_train = Y_train.reshape((*Y_hat.shape[:-1],1))
    dev_eval = Y_hat - Y_train
    err_eval = dev_eval**2
    
    with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
        res_1 = parallel(
            delayed(lambda j:risk_estimate(
                np.delete(err_eval[...,j], ids_list[j], axis=0).reshape(-1,1), **kwargs_est)
                   )(j)
            for j in np.arange(M0)
        )
        res_2 = parallel(
            delayed(lambda i,j:risk_estimate(
                (np.mean(
                    np.delete(dev_eval[...,[i,j]], 
                              np.union1d(ids_list[i], ids_list[j]), axis=0), axis=-1)**2).reshape(-1,1),
                **kwargs_est
            ))(i,j)
            for i,j in itertools.combinations(np.arange(M0), 2)
        )
                
    risk_ecv_1 = np.nanmean(res_1)
    risk_ecv_2 = np.nanmean(res_2)
    risk_ecv = - (1-2/M_test) * risk_ecv_1 + 2*(1-1/M_test) * risk_ecv_2
    if return_df:
        df = pd.DataFrame({'M':M_test, 'estimate':risk_ecv})
        return df
    else:
        return risk_ecv
def extrapolate(self, risk, M_test=None)
Expand source code
def extrapolate(self, risk, M_test=None):
    if M_test is None:
        M_test = self.n_estimators
    M0 = risk.shape[0]        
    if np.isscalar(M_test):
        M_test = np.arange(1,M_test+1)
    else:
        M_test = np.array(M_test)
    if M0 <2 or np.max(M_test) < M0:
        raise ValueError('The ensemble size M or M0 must be at least 2.')
    
    risk_1 = risk[0]
    risk_inf = np.sum(risk - risk_1/np.arange(1, M0+1)) / np.sum(1 - 1/np.arange(1, M0+1))
    
    risk_ecv = (1/M_test) * risk_1 + (1-1/M_test) * risk_inf
    risk_ecv[:M0] = risk
    return risk_ecv
def compute_gcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est)

Computes the naive GCV estimate for the given input data using the provided BaggingRegressor model.

Parameters

X_train : np.ndarray
[n, p] The input covariates.
Y_train : np.ndarray
[n, ] The target values of the input data.
type : str, optional
The type of GCV estimate to compute. Can be either 'full' (the naive GCV using full observations) or 'union' (the naive GCV using training observations).
return_df : bool, optional
If True, returns the GCV estimate as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
kwargs_est : dict
Additional keyword arguments for the risk estimate.

Returns

risk_gcv : np.ndarray or pandas.DataFrame
[M_test, ] The GCV estimate for each ensemble size in M_test.
Expand source code
def compute_gcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
    '''
    Computes the naive GCV estimate for the given input data using the provided BaggingRegressor model.

    Parameters
    ----------
    X_train : np.ndarray
        [n, p] The input covariates.
    Y_train : np.ndarray
        [n, ] The target values of the input data.
    type : str, optional
        The type of GCV estimate to compute. Can be either 'full' (the naive GCV using full observations) or 
        'union' (the naive GCV using training observations).
    return_df : bool, optional
        If True, returns the GCV estimate as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    kwargs_est : dict
        Additional keyword arguments for the risk estimate.

    Returns
    --------
    risk_gcv : np.ndarray or pandas.DataFrame
        [M_test, ] The GCV estimate for each ensemble size in M_test.
    '''
    if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
        raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
    if Y_train.ndim==1:
        Y_train = Y_train[:,None]
    elif Y_train.ndim>2:
        raise NotImplementedError('CGCV for multi-task regression not implemented yet.')
    if M is None:
        M = self.n_estimators        
    M_arr = np.arange(M)
    ids_list = self.estimators_samples_
    Y_hat = self.predict_individual(X_train, M, n_jobs)        
    n = X_train.shape[0]

    with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
        dof = parallel(
            delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]])
                   )(j)
            for j in M_arr
        )
        dof = np.mean(dof)

    if type=='full':
        err_eval = avg_sq_err(Y_hat - Y_train)
        err_train = np.mean(err_eval, axis=0)
        deno = (1 - dof/n)**2
        risk_gcv = err_train / deno
    else:
        Y_hat = np.cumsum(Y_hat, axis=1) / (M_arr+1)
        err_eval = (Y_hat - Y_train)**2
        ids_union_list = [reduce(np.union1d, ids_list[:j+1]) for j in M_arr]
        n_ids = np.array([len(ids) for ids in ids_union_list])
        err_train = np.array([np.mean(err_eval[ids_union_list[j],j]) for j in M_arr])
        deno = (1 - dof/n_ids)**2
        risk_gcv = err_train / deno
    
    if return_df:
        df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_gcv, 'err_train':err_train, 'deno':deno})
        return df
    else:
        return risk_gcv
def compute_cgcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est)

Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

Parameters

X_train : np.ndarray
[n, p] The input covariates.
Y_train : np.ndarray
[n, ] The target values of the input data.
type : str, optional
The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 'ovlp' (using overlapping observations).
return_df : bool, optional
If True, returns the GCV estimate as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
kwargs_est : dict
Additional keyword arguments for the risk estimate.

Returns

risk_gcv : np.ndarray or pandas.DataFrame
[M_test, ] The CGCV estimate for each ensemble size in M_test.
Expand source code
def compute_cgcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
    '''
    Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

    Parameters
    ----------
    X_train : np.ndarray
        [n, p] The input covariates.
    Y_train : np.ndarray
        [n, ] The target values of the input data.
    type : str, optional
        The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 
        'ovlp' (using overlapping observations).
    return_df : bool, optional
        If True, returns the GCV estimate as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    kwargs_est : dict
        Additional keyword arguments for the risk estimate.

    Returns
    --------
    risk_gcv : np.ndarray or pandas.DataFrame
        [M_test, ] The CGCV estimate for each ensemble size in M_test.
    '''
    if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
        # raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
        return self._compute_cgcv_estimate_general(X_train, Y_train, M, type, return_df, n_jobs, verbose, **kwargs_est)
        
    if Y_train.ndim==1:
        Y_train = Y_train[:,None]
    if M is None:
        M = self.n_estimators            
    M_arr = np.arange(M)
    ids_list = self.estimators_samples_
    Y_hat = self.predict_individual(X_train, M, n_jobs)

    n, p = X_train.shape
    if hasattr(self.estimators_[0], 'fit_intercept') and self.estimators_[0].fit_intercept:
        p += 1
    phi = p/n
    k = self.max_samples if isinstance(self.max_samples, int) else int(n * self.max_samples)
    psi = p/k

    with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
        dof = parallel(
            delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]]))(j)
            for j in M_arr
        )
        dof = np.mean(dof)

    err_train = (Y_hat-Y_train)**2
    if type=='full':
        correct_term = np.mean(err_train) / (1 - 2*dof/n + dof**2/(k*n))
    elif type=='ovlp':
        correct_term = np.mean([np.mean(err_train[ids_list[j],j]) for j in M_arr]) / (1 - dof/k)**2
    else:
        raise ValueError('The type must be either "full" or "ovlp".')
    
    err_eval = avg_sq_err(Y_hat - Y_train)
    err_train = np.mean(err_eval, axis=0)
    deno = (1 - dof/n)**2
    C = 1/ (n/dof - 1)**2 / (M_arr+1) * (psi/phi - 1) * correct_term
    risk_cgcv = err_train / deno - C
    
    if return_df:
        df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_cgcv, 'err_train':err_train, 'deno':deno, 'C':C})
        return df
    else:
        return risk_cgcv
def set_fit_request(self: Ensemble, *, sample_weight: Union[bool, NoneType, str] = '$UNCHANGED$') ‑> Ensemble

Request metadata passed to the fit method.

Note that this method is only relevant if enable_metadata_routing=True (see :func:sklearn.set_config). Please see :ref:User Guide <metadata_routing> on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to fit if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to fit.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version: 1.3

Note

This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:~sklearn.pipeline.Pipeline. Otherwise it has no effect.

Parameters

sample_weight : str, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED
Metadata routing for sample_weight parameter in fit.

Returns

self : object
The updated object.
Expand source code
def func(**kw):
    """Updates the request for provided parameters

    This docstring is overwritten below.
    See REQUESTER_DOC for expected functionality
    """
    if not _routing_enabled():
        raise RuntimeError(
            "This method is only available when metadata routing is enabled."
            " You can enable it using"
            " sklearn.set_config(enable_metadata_routing=True)."
        )

    if self.validate_keys and (set(kw) - set(self.keys)):
        raise TypeError(
            f"Unexpected args: {set(kw) - set(self.keys)}. Accepted arguments"
            f" are: {set(self.keys)}"
        )

    requests = instance._get_metadata_request()
    method_metadata_request = getattr(requests, self.name)

    for prop, alias in kw.items():
        if alias is not UNCHANGED:
            method_metadata_request.add_request(param=prop, alias=alias)
    instance._metadata_request = requests

    return instance
def set_score_request(self: Ensemble, *, sample_weight: Union[bool, NoneType, str] = '$UNCHANGED$') ‑> Ensemble

Request metadata passed to the score method.

Note that this method is only relevant if enable_metadata_routing=True (see :func:sklearn.set_config). Please see :ref:User Guide <metadata_routing> on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to score if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to score.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version: 1.3

Note

This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:~sklearn.pipeline.Pipeline. Otherwise it has no effect.

Parameters

sample_weight : str, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED
Metadata routing for sample_weight parameter in score.

Returns

self : object
The updated object.
Expand source code
def func(**kw):
    """Updates the request for provided parameters

    This docstring is overwritten below.
    See REQUESTER_DOC for expected functionality
    """
    if not _routing_enabled():
        raise RuntimeError(
            "This method is only available when metadata routing is enabled."
            " You can enable it using"
            " sklearn.set_config(enable_metadata_routing=True)."
        )

    if self.validate_keys and (set(kw) - set(self.keys)):
        raise TypeError(
            f"Unexpected args: {set(kw) - set(self.keys)}. Accepted arguments"
            f" are: {set(self.keys)}"
        )

    requests = instance._get_metadata_request()
    method_metadata_request = getattr(requests, self.name)

    for prop, alias in kw.items():
        if alias is not UNCHANGED:
            method_metadata_request.add_request(param=prop, alias=alias)
    instance._metadata_request = requests

    return instance