Document for sklearn_ensemble_cv

sklearn_ensemble_cv API documentation

Package sklearn_ensemble_cv

Expand source code
__all__ = ['ECV', 'GCV', 'splitCV', 'KFoldCV', 
           'comp_empirical_ecv', 'comp_empirical_gcv',
           'reset_random_seeds', 'Ensemble', 'generate_data']

from sklearn_ensemble_cv.cross_validation import *
from sklearn_ensemble_cv.ensemble import Ensemble
from sklearn_ensemble_cv.utils import reset_random_seeds
from sklearn_ensemble_cv.simu_data import generate_data


__license__ = "MIT"
__version__ = "0.2.4"
__author__ = "Jin-Hong Du"
__email__ = "jinhongd@andrew.cmu.edu"
__maintainer__ = "Jin-Hong Du"
__maintainer_email__ = "jinhongd@andrew.cmu.edu"
__description__ = ("Ensemble Cross-validation is a Python package for performing specialized " 
    "cross-validation on ensemble models, such as extrapolated cross-validation (ECV), "
    "generalized cross-validation (GCV), and etc. The implementation of ensemble models are "
    "based on scikit-learn."
    )

Sub-modules

sklearn_ensemble_cv.cross_validation
sklearn_ensemble_cv.ensemble
sklearn_ensemble_cv.simu_data
sklearn_ensemble_cv.utils

Functions

def ECV(X_train, Y_train, regr, grid_regr={}, grid_ensemble={}, kwargs_regr={}, kwargs_ensemble={}, M=20, M0=20, M_max=inf, delta=0.0, return_df=False, n_jobs=-1, X_test=None, Y_test=None, kwargs_est={}, **kwargs)

Cross-validation for ensemble models using the empirical ECV estimate.

Parameters

X_train, Y_train : numpy.array
The training samples.
grid : pandas.DataFrame
The grid of hyperparameters to search over.
regr : object
The base estimator to use for the ensemble model.
kwargs_regr : dict, optional
Additional keyword arguments for the base estimator.
kwargs_ensemble : dict, optional
Additional keyword arguments for the ensemble model.
M : int, optional
The ensemble size to build.
M0 : int, optional
The number of estimators to use for the ECV estimate.
M_max : int, optional
The maximum ensemble size to consider for the tuned ensemble.
delta : float, optional
The suboptimality parameter for the ensemble size tuning by ECV.
return_df : bool, optional
If True, returns the results as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
X_test, Y_test : numpy.array, optional
The validation samples. It may be useful to be used for comparing the performance of ECV with other cross-validation methods that requires sample-splitting.
kwargs_est : dict, optional
Additional keyword arguments for the risk estimate.
Expand source code
def ECV(
        X_train, Y_train, regr, grid_regr={}, grid_ensemble={}, kwargs_regr={}, kwargs_ensemble={},
        M=20, M0=20, M_max=np.inf, delta=0., return_df=False, n_jobs=-1, X_test=None, Y_test=None, 
        kwargs_est={}, **kwargs
        ):
    '''
    Cross-validation for ensemble models using the empirical ECV estimate.

    Parameters
    ----------
    X_train, Y_train : numpy.array
        The training samples.
    grid : pandas.DataFrame
        The grid of hyperparameters to search over.
    regr : object
        The base estimator to use for the ensemble model.
    kwargs_regr : dict, optional
        Additional keyword arguments for the base estimator.
    kwargs_ensemble : dict, optional
        Additional keyword arguments for the ensemble model.
    M : int, optional
        The ensemble size to build.
    M0 : int, optional
        The number of estimators to use for the ECV estimate.
    M_max : int, optional
        The maximum ensemble size to consider for the tuned ensemble.
    delta : float, optional
        The suboptimality parameter for the ensemble size tuning by ECV.
    return_df : bool, optional
        If True, returns the results as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    X_test, Y_test : numpy.array, optional
        The validation samples. It may be useful to be used for comparing the 
        performance of ECV with other cross-validation methods that requires sample-splitting.
    kwargs_est : dict, optional
        Additional keyword arguments for the risk estimate.
    '''
    grid_regr, kwargs_regr, grid_ensemble, kwargs_ensemble, kwargs_est = process_grid(
        grid_regr, kwargs_regr, grid_ensemble, kwargs_ensemble, kwargs_est, M)

    if M0>M:
        raise ValueError('M0 must be less than or equal to M.')
    if np.isinf(M_max):
        M_max = np.append(np.arange(M)+1, np.inf)
    elif np.isscalar(M_max):
        M_max = np.arange(M_max)+1
    n_M_max = len(M_max)

    test = X_test is not None and Y_test is not None
    n_res = n_M_max+M if test else n_M_max
    n_grid = len(grid_regr)
    res_risk = np.full((n_grid, n_res), np.inf)

    for i in range(n_grid):
        params_ensemble = grid_ensemble[i]
        params_regr = grid_regr[i]

        _, res = comp_empirical_ecv(
            X_train, Y_train, regr, 
            {**kwargs_regr, **params_regr}, {**kwargs_ensemble, **params_ensemble},
            M, M0, M_max, n_jobs, X_test, Y_test, _check_input=False, **kwargs_est
        )
        res_risk[i, :] = np.r_[res]

    if return_df:
        cols = np.char.add(['risk_val-']*n_M_max, np.char.mod('%d', 1+np.arange(n_M_max)))
        if np.isinf(M_max[-1]):
            cols[-1] = 'risk_val-inf'
        if test:
            cols = np.append(cols, np.char.add(['risk_test-']*M, np.char.mod('%d', 1+np.arange(M))))
        res_ecv = pd.concat([pd.DataFrame(grid_regr), pd.DataFrame(grid_ensemble),
                             pd.DataFrame(res_risk, columns=cols)
                             ] ,axis=1)
    else:
        if test:            
            res_ecv = (res_risk[:,:n_M_max], res_risk[:,n_M_max:])
        else:
            res_ecv = res_risk

    j, M_best = np.unravel_index(np.nanargmin(res_risk[:,:M]), res_risk[:,:M].shape)
    M_best += 1

    if delta==0.:
        M_hat = np.inf
    else:
        M_hat = int(np.ceil(2 / (delta + 2/M_max[-1]*(res_risk[j,0] - res_risk[j,1])) * 
                            (res_risk[j,0] - res_risk[j,1])))
    M_best_ext = np.minimum(M_hat, M_max[-1])
    if not np.isinf(M_best_ext):
        M_best_ext = int(M_best_ext)
    
    info = {        
        'best_params_regr': {**kwargs_regr, **grid_regr[j]},
        'best_params_ensemble': {**kwargs_ensemble, **grid_ensemble[j]},
        'best_n_estimators': M_best,
        'best_params_index':j,
        'best_score':res_risk[j, M_best-1],

        'delta': delta,
        'M_max':M_max[-1],
        'best_n_estimators_extrapolate': M_best_ext,
        'best_score_extrapolate': res_risk[j,n_M_max-1] if np.isinf(M_best_ext) else res_risk[j, M_best_ext-1],        
    }
    return res_ecv, info
def GCV(X_train, Y_train, regr, grid_regr={}, grid_ensemble={}, kwargs_regr={}, kwargs_ensemble={}, M=20, M0=20, M_max=inf, corrected=True, type='full', return_df=False, n_jobs=-1, X_test=None, Y_test=None, kwargs_est={}, **kwargs)

Cross-validation for ensemble models using the empirical ECV estimate. Currently, only the GCV estimates for the Ridge, Lasso, and ElasticNet are implemented.

Parameters

X_train, Y_train : numpy.array
The training samples.
grid : pandas.DataFrame
The grid of hyperparameters to search over.
regr : object
The base estimator to use for the ensemble model.
kwargs_regr : dict, optional
Additional keyword arguments for the base estimator.
kwargs_ensemble : dict, optional
Additional keyword arguments for the ensemble model.
M : int, optional
The ensemble size to build.
corrected : bool, optional
If True, compute the corrected GCV estimate.
type : str, optional
The type of GCV or GCV estimate to compute. It can be either 'full' or 'union' for naive GCV, and 'full' or 'ovlp' for CGCV.
return_df : bool, optional
If True, returns the results as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
X_test, Y_test : numpy.array, optional
The validation samples. It may be useful to be used for comparing the performance of ECV with other cross-validation methods that requires sample-splitting.
kwargs_est : dict, optional
Additional keyword arguments for the risk estimate.
Expand source code
def GCV(
        X_train, Y_train, regr, grid_regr={}, grid_ensemble={}, kwargs_regr={}, kwargs_ensemble={},
        M=20, M0=20, M_max=np.inf, corrected=True, type='full', return_df=False, n_jobs=-1, X_test=None, Y_test=None, 
        kwargs_est={}, **kwargs
        ):
    '''
    Cross-validation for ensemble models using the empirical ECV estimate.
    Currently, only the GCV estimates for the Ridge, Lasso, and ElasticNet are implemented.

    Parameters
    ----------
    X_train, Y_train : numpy.array
        The training samples.
    grid : pandas.DataFrame
        The grid of hyperparameters to search over.
    regr : object
        The base estimator to use for the ensemble model.
    kwargs_regr : dict, optional
        Additional keyword arguments for the base estimator.
    kwargs_ensemble : dict, optional
        Additional keyword arguments for the ensemble model.
    M : int, optional
        The ensemble size to build.
    corrected : bool, optional
        If True, compute the corrected GCV estimate.
    type : str, optional
        The type of GCV or GCV estimate to compute. It can be either 'full' or 'union' for naive GCV,
        and 'full' or 'ovlp' for CGCV.
    return_df : bool, optional
        If True, returns the results as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    X_test, Y_test : numpy.array, optional
        The validation samples. It may be useful to be used for comparing the 
        performance of ECV with other cross-validation methods that requires sample-splitting.
    kwargs_est : dict, optional
        Additional keyword arguments for the risk estimate.
    '''
    grid_regr, kwargs_regr, grid_ensemble, kwargs_ensemble, kwargs_est = process_grid(
        grid_regr, kwargs_regr, grid_ensemble, kwargs_ensemble, kwargs_est, M)

    if M0>M:
        raise ValueError('M0 must be less than or equal to M.')
    if np.isinf(M_max):
        M_max = np.append(np.arange(M)+1, np.inf)
    elif np.isscalar(M_max):
        M_max = np.arange(M_max)+1
    n_M_max = len(M_max)

    test = X_test is not None and Y_test is not None
    n_res = n_M_max+M if test else n_M_max
    n_grid = len(grid_regr)
    res_risk = np.full((n_grid, n_res), np.inf)

    for i in range(n_grid):
        params_ensemble = grid_ensemble[i]
        params_regr = grid_regr[i]
        
        _, res = comp_empirical_gcv(
            X_train, Y_train, regr, 
            {**kwargs_regr, **params_regr}, {**kwargs_ensemble, **params_ensemble},
            M, M0, M_max, corrected, type, n_jobs, X_test, Y_test,  _check_input=False, **kwargs_est
        )
        res_risk[i, :] = np.r_[res]

    if return_df:
        cols = np.char.add(['risk_val-']*n_M_max, np.char.mod('%d', 1+np.arange(n_M_max)))
        if np.isinf(M_max[-1]):
            cols[-1] = 'risk_val-inf'
        if test:
            cols = np.append(cols, np.char.add(['risk_test-']*M, np.char.mod('%d', 1+np.arange(M))))        
        res_gcv = pd.concat([pd.DataFrame(grid_regr), pd.DataFrame(grid_ensemble),
                             pd.DataFrame(res_risk, columns=cols)
                             ] ,axis=1)
    else:
        if test:            
            res_gcv = (res_risk[:,:M], res_risk[:,M:])
        else:
            res_gcv = res_risk

    j, M_best = np.unravel_index(np.nanargmin(res_risk[:,:M]), res_risk[:,:M].shape)
    M_best += 1

    info = {
        'best_params_regr': {**kwargs_regr, **grid_regr[j]},
        'best_params_ensemble': {**kwargs_ensemble, **grid_ensemble[j]},
        'best_n_estimators': M_best,
        'best_params_index':j,
        'best_score':res_risk[j, M_best-1],
    }

    return res_gcv, info
def splitCV(X_train, Y_train, regr, grid_regr={}, grid_ensemble={}, kwargs_regr={}, kwargs_ensemble={}, M=20, return_df=False, n_jobs=-1, X_test=None, Y_test=None, kwargs_est={}, **kwargs)

Sample-split cross-validation for ensemble models.

Parameters

X_train, Y_train : numpy.array
The training samples.
regr : object
The base estimator to use for the ensemble model.
grid_regr : pandas.DataFrame
The grid of hyperparameters for the base estimator.
grid_ensemble : pandas.DataFrame
The grid of hyperparameters for the ensemble model.
kwargs_regr : dict, optional
Additional keyword arguments for the base estimator.
kwargs_ensemble : dict, optional
Additional keyword arguments for the ensemble model.
M : int, optional
The ensemble size to build.
return_df : bool, optional
If True, returns the results as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
X_test, Y_test : numpy.array, optional
The test samples. It may be useful to be used for comparing the performance of different cross-validation methods.
kwargs_est : dict, optional
Additional keyword arguments for the risk estimate.
kwargs : dict, optional
Additional keyword arguments for ShuffleSplit; see https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.ShuffleSplit.html for more details.
Expand source code
def splitCV(
        X_train, Y_train, regr, grid_regr={}, grid_ensemble={}, kwargs_regr={}, kwargs_ensemble={},
        M=20, return_df=False, n_jobs=-1, X_test=None, Y_test=None, kwargs_est={}, **kwargs
        ):
    '''
    Sample-split cross-validation for ensemble models.

    Parameters
    ----------
    X_train, Y_train : numpy.array
        The training samples.
    regr : object
        The base estimator to use for the ensemble model.        
    grid_regr : pandas.DataFrame
        The grid of hyperparameters for the base estimator.
    grid_ensemble : pandas.DataFrame
        The grid of hyperparameters for the ensemble model.
    kwargs_regr : dict, optional
        Additional keyword arguments for the base estimator.
    kwargs_ensemble : dict, optional
        Additional keyword arguments for the ensemble model.
    M : int, optional
        The ensemble size to build.
    return_df : bool, optional
        If True, returns the results as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    X_test, Y_test : numpy.array, optional
        The test samples. It may be useful to be used for comparing the
        performance of different cross-validation methods.
    kwargs_est : dict, optional
        Additional keyword arguments for the risk estimate.
    kwargs : dict, optional
        Additional keyword arguments for `ShuffleSplit`; see 
        https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.ShuffleSplit.html
        for more details.
    '''
    grid_regr, kwargs_regr, grid_ensemble, kwargs_ensemble, kwargs_est = process_grid(
        grid_regr, kwargs_regr, grid_ensemble, kwargs_ensemble, kwargs_est, M)
            
    test = X_test is not None and Y_test is not None
    n_res = 2*M if test else M
    n_grid = len(grid_regr)
    res_risk = np.full((n_grid,n_res), np.inf)

    rs = ShuffleSplit(1, **kwargs)
    id_train, id_val = next(rs.split(X_train, Y_train))
    _X_train, _X_val, _Y_train, _Y_val = X_train[id_train], X_train[id_val], Y_train[id_train], Y_train[id_val]
    
    for i in range(n_grid):
        params_ensemble = grid_ensemble[i]
        params_regr = grid_regr[i]
        
        _, res = comp_empirical_val(
            _X_train, _Y_train, _X_val, _Y_val, regr, 
            {**kwargs_regr, **params_regr}, {**kwargs_ensemble, **params_ensemble},
            M, n_jobs, X_test, Y_test,  _check_input=False, **kwargs_est
        )
        res_risk[i, :] = np.r_[res]

    if return_df:
        cols = np.char.add(['risk_val-']*M, np.char.mod('%d', 1+np.arange(M)))
        if test:
            cols = np.append(cols, np.char.add(['risk_test-']*M, np.char.mod('%d', 1+np.arange(M))))
        res_splitcv = pd.concat([pd.DataFrame(grid_regr), pd.DataFrame(grid_ensemble),
                             pd.DataFrame(res_risk, columns=cols)
                             ] ,axis=1)
    else:
        if test:            
            res_splitcv = (res_risk[:,:M], res_risk[:,M:])
        else:
            res_splitcv = res_risk

    j, M_best = np.unravel_index(np.nanargmin(res_risk[:,:M]), res_risk[:,:M].shape)
    M_best += 1

    info = {
        'best_params_regr': {**kwargs_regr, **grid_regr[j]},
        'best_params_ensemble': {**kwargs_ensemble, **grid_ensemble[j]},
        'best_n_estimators': M_best,
        'best_params_index':j,
        'best_score':res_risk[j, M_best-1],
        'split_params':{
            'index_train':id_train, 
            'index_val':id_val,
            'test_size':rs.test_size,
            'random_state':rs.random_state
        }
    }
    return res_splitcv, info
def KFoldCV(X_train, Y_train, regr, grid_regr={}, grid_ensemble={}, kwargs_regr={}, kwargs_ensemble={}, M=20, return_df=False, n_jobs=-1, X_test=None, Y_test=None, kwargs_est={}, **kwargs)

Sample-split cross-validation for ensemble models.

Parameters

X_train, Y_train : numpy.array
The training samples.
regr : object
The base estimator to use for the ensemble model.
grid_regr : pandas.DataFrame
The grid of hyperparameters for the base estimator.
grid_ensemble : pandas.DataFrame
The grid of hyperparameters for the ensemble model.
kwargs_regr : dict, optional
Additional keyword arguments for the base estimator.
kwargs_ensemble : dict, optional
Additional keyword arguments for the ensemble model.
M : int, optional
The ensemble size to build.
return_df : bool, optional
If True, returns the results as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
X_test, Y_test : numpy.array, optional
The test samples. It may be useful to be used for comparing the performance of different cross-validation methods.
kwargs_est : dict, optional
Additional keyword arguments for the risk estimate.
kwargs : dict, optional
Additional keyword arguments for KFold; see https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.KFold.html for more details.
Expand source code
def KFoldCV(
        X_train, Y_train, regr, grid_regr={}, grid_ensemble={}, kwargs_regr={}, kwargs_ensemble={},
        M=20, return_df=False, n_jobs=-1, X_test=None, Y_test=None, kwargs_est={}, **kwargs
        ):
    '''
    Sample-split cross-validation for ensemble models.

    Parameters
    ----------
    X_train, Y_train : numpy.array
        The training samples.
    regr : object
        The base estimator to use for the ensemble model.        
    grid_regr : pandas.DataFrame
        The grid of hyperparameters for the base estimator.
    grid_ensemble : pandas.DataFrame
        The grid of hyperparameters for the ensemble model.
    kwargs_regr : dict, optional
        Additional keyword arguments for the base estimator.
    kwargs_ensemble : dict, optional
        Additional keyword arguments for the ensemble model.
    M : int, optional
        The ensemble size to build.
    return_df : bool, optional
        If True, returns the results as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    X_test, Y_test : numpy.array, optional
        The test samples. It may be useful to be used for comparing the
        performance of different cross-validation methods.
    kwargs_est : dict, optional
        Additional keyword arguments for the risk estimate.
    kwargs : dict, optional
        Additional keyword arguments for `KFold`; see 
        https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.KFold.html
        for more details.
    '''
    grid_regr, kwargs_regr, grid_ensemble, kwargs_ensemble, kwargs_est = process_grid(
        grid_regr, kwargs_regr, grid_ensemble, kwargs_ensemble, kwargs_est, M)
            
    test = X_test is not None and Y_test is not None
    n_res = 2*M if test else M
    n_grid = len(grid_regr)
    
    kf = KFold(**kwargs)
    n_splits = kf.get_n_splits(X_train)
    res_risk_all = np.full((n_grid,n_res,n_splits), np.inf)

    for fold, (id_train, id_val) in enumerate(kf.split(X_train)):

        _X_train, _X_val, _Y_train, _Y_val = X_train[id_train], X_train[id_val], Y_train[id_train], Y_train[id_val]
        
        for i in range(n_grid):
            params_ensemble = grid_ensemble[i]
            params_regr = grid_regr[i]
            
            _, res = comp_empirical_val(
                _X_train, _Y_train, _X_val, _Y_val, regr, 
                {**kwargs_regr, **params_regr}, {**kwargs_ensemble, **params_ensemble}, 
                M, n_jobs, X_test, Y_test,  _check_input=False, **kwargs_est
            )
            res_risk_all[i, :, fold] = np.r_[res]

    res_risk = np.mean(res_risk_all, axis=2)

    if return_df:
        cols = np.char.add(['risk_val-']*M, np.char.mod('%d', 1+np.arange(M)))
        if test:
            cols = np.append(cols, np.char.add(['risk_test-']*M, np.char.mod('%d', 1+np.arange(M))))
        res_splitcv = pd.concat([pd.DataFrame(grid_regr), pd.DataFrame(grid_ensemble),
                             pd.DataFrame(res_risk, columns=cols)
                             ] ,axis=1)
    else:
        if test:            
            res_splitcv = (res_risk[:,:M], res_risk[:,M:])
        else:
            res_splitcv = res_risk

    j, M_best = np.unravel_index(np.nanargmin(res_risk[:,:M]), res_risk[:,:M].shape)
    M_best += 1

    info = {
        'best_params_regr': {**kwargs_regr, **grid_regr[j]},
        'best_params_ensemble': {**kwargs_ensemble, **grid_ensemble[j]},
        'best_n_estimators': M_best,
        'best_params_index':j,
        'best_score':res_risk[j, M_best-1],

        'val_score':res_risk_all[:,:M],
        'test_score':None if not test else res_risk_all[:,M:],
        'split_params':{
            'n_splits':n_splits,
            'random_state':kf.random_state,
            'shuffle':kf.shuffle,            
        }
    }
    return res_splitcv, info  
def comp_empirical_ecv(X_train, Y_train, regr, kwargs_regr={}, kwargs_ensemble={}, M=20, M0=20, M_max=inf, n_jobs=-1, X_test=None, Y_test=None, **kwargs_est)

Compute the empirical ECV estimate for a given ensemble model.

Parameters

X_train, Y_train : numpy.array
The training samples.
regr : object
The base estimator to use for the ensemble model.
kwargs_regr : dict, optional
Additional keyword arguments for the base estimator.
kwargs_ensemble : dict, optional
Additional keyword arguments for the ensemble model.
M : int, optional
The maximum ensemble size to consider.
M0 : int, optional
The number of estimators to use for the ECV estimate.
M_max : int, optional
The maximum ensemble size to consider for the tuned ensemble.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
X_test, Y_test : numpy.array, optional
The test samples.
_check_input : bool, optional
If True, check the input arguments.
kwargs_est : dict, optional
Additional keyword arguments for the risk estimate.

Returns

risk_ecv : numpy.array
The empirical ECV estimate.
Expand source code
def comp_empirical_ecv(
        X_train, Y_train, regr, kwargs_regr={}, kwargs_ensemble={}, M=20, M0=20, M_max=np.inf,
        n_jobs=-1, X_test=None, Y_test=None, _check_input=True, **kwargs_est, 
        ):
    '''
    Compute the empirical ECV estimate for a given ensemble model.

    Parameters
    ----------
    X_train, Y_train : numpy.array
        The training samples.
    regr : object
        The base estimator to use for the ensemble model.
    kwargs_regr : dict, optional
        Additional keyword arguments for the base estimator.
    kwargs_ensemble : dict, optional
        Additional keyword arguments for the ensemble model.
    M : int, optional
        The maximum ensemble size to consider.
    M0 : int, optional
        The number of estimators to use for the ECV estimate.
    M_max : int, optional
        The maximum ensemble size to consider for the tuned ensemble.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    X_test, Y_test : numpy.array, optional
        The test samples.
    _check_input : bool, optional
        If True, check the input arguments.        
    kwargs_est : dict, optional
        Additional keyword arguments for the risk estimate.
    
    Returns
    ----------
    risk_ecv : numpy.array
        The empirical ECV estimate.
    '''
    if _check_input:
        if M0>M:
            raise ValueError('M0 must be less than or equal to M.')
        if np.isinf(M_max):
            M_max = np.append(np.arange(M)+1, np.inf)
        elif np.isscalar(M_max):
            M_max = np.arange(M_max)+1

        kwargs_regr, kwargs_ensemble, kwargs_est = check_input(kwargs_regr, kwargs_ensemble, kwargs_est, M)
    regr = fit_ensemble(regr,kwargs_regr,kwargs_ensemble).fit(X_train, Y_train)
    risk_ecv = regr.compute_ecv_estimate(X_train, Y_train, M_max, M0=M0, n_jobs=n_jobs, **kwargs_est)

    if X_test is not None and Y_test is not None:
        risk_val = regr.compute_risk(X_test, Y_test, M, n_jobs=n_jobs, **kwargs_est)
        return regr, (risk_ecv, risk_val)
    else:
        return regr, risk_ecv
def comp_empirical_gcv(X_train, Y_train, regr, kwargs_regr={}, kwargs_ensemble={}, M=20, M0=20, M_max=inf, corrected=True, type='full', n_jobs=-1, X_test=None, Y_test=None, **kwargs_est)

Compute the empirical GCV or CGCV estimate for a given ensemble model.

Parameters

X_train, Y_train : numpy.array
The training samples.
regr : object
The base estimator to use for the ensemble model.
kwargs_regr : dict, optional
Additional keyword arguments for the base estimator.
kwargs_ensemble : dict, optional
Additional keyword arguments for the ensemble model.
M : int, optional
The maximum ensemble size to consider.
corrected : bool, optional
If True, compute the corrected GCV estimate.
type : str, optional
The type of GCV or GCV estimate to compute. It can be either 'full' or 'union' for naive GCV, and 'full' or 'ovlp' for CGCV.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
X_test, Y_test : numpy.array, optional
The test samples.
_check_input : bool, optional
If True, check the input arguments.
kwargs_est : dict, optional
Additional keyword arguments for the risk estimate.

Returns

risk_ecv : numpy.array
The empirical ECV estimate.
Expand source code
def comp_empirical_gcv(
        X_train, Y_train, regr, kwargs_regr={}, kwargs_ensemble={}, M=20, M0=20, M_max=np.inf, 
        corrected=True, type='full',
        n_jobs=-1, X_test=None, Y_test=None, _check_input=True, **kwargs_est, 
        ):
    '''
    Compute the empirical GCV or CGCV estimate for a given ensemble model.

    Parameters
    ----------
    X_train, Y_train : numpy.array
        The training samples.
    regr : object
        The base estimator to use for the ensemble model.
    kwargs_regr : dict, optional
        Additional keyword arguments for the base estimator.
    kwargs_ensemble : dict, optional
        Additional keyword arguments for the ensemble model.
    M : int, optional
        The maximum ensemble size to consider.
    corrected : bool, optional
        If True, compute the corrected GCV estimate.
    type : str, optional
        The type of GCV or GCV estimate to compute. It can be either 'full' or 'union' for naive GCV,
        and 'full' or 'ovlp' for CGCV.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    X_test, Y_test : numpy.array, optional
        The test samples.
    _check_input : bool, optional
        If True, check the input arguments.        
    kwargs_est : dict, optional
        Additional keyword arguments for the risk estimate.
    
    Returns
    ----------
    risk_ecv : numpy.array
        The empirical ECV estimate.
    '''
    if _check_input:
        if M0>M:
            raise ValueError('M0 must be less than or equal to M.')
        if np.isinf(M_max):
            M_max = np.append(np.arange(M)+1, np.inf)
        elif np.isscalar(M_max):
            M_max = np.arange(M_max)+1
    
        kwargs_regr, kwargs_ensemble, kwargs_est = check_input(kwargs_regr, kwargs_ensemble, kwargs_est, M)

    regr = fit_ensemble(regr,kwargs_regr,kwargs_ensemble).fit(X_train, Y_train)
    if corrected:
        risk_gcv = regr.compute_cgcv_estimate(X_train, Y_train, M0, type, n_jobs=n_jobs, **kwargs_est)
    else:
        risk_gcv = regr.compute_gcv_estimate(X_train, Y_train, M0, type, n_jobs=n_jobs, **kwargs_est)

    risk_gcv = regr.extrapolate(risk_gcv, M_max)
    
    if X_test is not None and Y_test is not None:        
        risk_val = regr.compute_risk(X_test, Y_test, M, n_jobs=n_jobs, **kwargs_est)
        return regr, (risk_gcv, risk_val)
    else:
        return regr, risk_gcv
def reset_random_seeds(seed)
Expand source code
def reset_random_seeds(seed):
    os.environ['PYTHONHASHSEED']=str(seed)
    random.seed(seed)
    np.random.seed(seed)
def generate_data(n, p, coef='random', func='linear', rho_ar1=0.0, sigma=1, df=inf, n_test=1000, sigma_quad=1.0)

Parameters

n : int
sample size
p : int
number of features
coef : str or array-like
If 'sorted', the coefficients are sorted in descending order of absolute value. If 'random', the coefficients are randomly generated from a uniform distribution. If an array-like object, the coefficients are set to the given values.
func : str
The functional form of the relationship between the features and the response. If 'linear', the response is a linear combination of the features. If 'nonlinear', the response is a nonlinear function of the features.
rho_ar1 : float or array-like
If a scalar, the AR(1) correlation coefficient for the features. If an array-like object, the AR(1) correlation coefficients for each block of features.
sigma : float
The standard deviation of the noise.
df : float
The degrees of freedom for the t-distribution used to generate the noise. If np.inf, the noise is generated from a normal distribution.
n_test : int
The number of test samples to generate.
sigma_quad : float
The standard deviation of the quadratic term in the nonlinear function.

Returns

X_train : ndarray of shape (n, p)
The training features.
y_train : ndarray of shape (n,)
The training response.
X_test : ndarray of shape (n_test, p)
The test features.
y_test : ndarray of shape (n_test,)
The test response.
Expand source code
def generate_data(
    n, p, coef='random', func='linear',
    rho_ar1=0., sigma=1, df=np.inf, n_test=1000, sigma_quad=1.,
):
    """
    Parameters
    ----------
    n : int
        sample size
    p : int
        number of features
    coef : str or array-like
        If 'sorted', the coefficients are sorted in descending order of absolute value.
        If 'random', the coefficients are randomly generated from a uniform distribution.
        If an array-like object, the coefficients are set to the given values.
    func : str
        The functional form of the relationship between the features and the response.
        If 'linear', the response is a linear combination of the features.
        If 'nonlinear', the response is a nonlinear function of the features.
    rho_ar1 : float or array-like
        If a scalar, the AR(1) correlation coefficient for the features.
        If an array-like object, the AR(1) correlation coefficients for each block of features.
    sigma : float
        The standard deviation of the noise.
    df : float
        The degrees of freedom for the t-distribution used to generate the noise.
        If np.inf, the noise is generated from a normal distribution.
    n_test : int
        The number of test samples to generate.
    sigma_quad : float
        The standard deviation of the quadratic term in the nonlinear function.
    
    Returns
    -------
    X_train : ndarray of shape (n, p)
        The training features.
    y_train : ndarray of shape (n,)
        The training response.
    X_test : ndarray of shape (n_test, p)
        The test features.
    y_test : ndarray of shape (n_test,)
        The test response.
    """
        
    if np.isscalar(rho_ar1):
        Sigma = ar1_cov(rho_ar1, p)
    else:
        Sigma = block_ar1_cov(rho_ar1, p)

    if df==np.inf:
        Z = np.random.normal(size=(n,p))
        Z_test = np.random.normal(size=(n_test,p))
    else:
        Z = np.random.standard_t(df=df, size=(n,p)) / np.sqrt(df / (df - 2))
        Z_test = np.random.standard_t(df=df, size=(n_test,p)) / np.sqrt(df / (df - 2))
        
    Sigma_sqrt = sqrtm(Sigma)
    X = Z @ Sigma_sqrt
    X_test = Z_test @ Sigma_sqrt
    
    if sigma<np.inf:
        if coef.startswith('eig'):
            top_k = int(coef.split('-')[1])
            _, beta0 = eigsh(Sigma, k=top_k)
            
            beta0 = np.mean(beta0, axis=-1)
            if np.isscalar(rho_ar1):
                rho2 = (1-rho_ar1**2)/(1-rho_ar1)**2/top_k
            else:
                rho2 = np.linalg.norm(beta0)**2
        elif coef=='sorted':
            beta0 = np.random.normal(size=(p,))
            beta0 = beta0[np.argsort(-np.abs(beta0))]
            beta0 /= np.linalg.norm(beta0)
            rho2 = 1.        
        elif coef=='uniform':
            beta0 = np.ones(p,) / np.sqrt(p)
            rho2 = 1.
        elif coef=='random':
            beta0 = np.random.normal(size=(p,))
            beta0 /= np.linalg.norm(beta0)
            rho2 = 1.
        elif coef.startswith('sparse'):
            s = int(coef.split('-')[1])
            beta0 = np.zeros(p,)
            beta0[:s] = 1/np.sqrt(s)
            rho2 = 1.
            
    else:
        rho2 = 0.
        beta0 = np.zeros(p)

    Y = X@beta0[:,None]   
    Y_test = X_test@beta0[:,None]
    
    if func=='linear':
        pass
    elif func=='quad':
        Y += sigma_quad * (np.mean(X**2, axis=-1) - np.trace(Sigma)/p)[:, None]
        Y_test += sigma_quad * (np.mean(X_test**2, axis=-1) - np.trace(Sigma)/p)[:, None]
    elif func=='tanh':
        Y = np.tanh(Y)
        Y_test = np.tanh(Y_test)
    else:
        raise ValueError('Not implemented.')

    if sigma>0.:
        if df==np.inf:
            Y = Y + sigma * np.random.normal(size=(n,1))            
            Y_test += sigma * np.random.normal(size=(n_test,1))
        else:
            Y = Y + np.random.standard_t(df=df, size=(n,1))
            Y_test += np.random.standard_t(df=df, size=(n_test,1))
            sigma = np.sqrt(df / (df - 2))
    else:
        sigma = 0.

    return Sigma, beta0, X, Y, X_test, Y_test, rho2, sigma**2

Classes

class Ensemble (**kwargs)

Ensemble class is built on top of sklearn.ensemble.BaggingRegressor. It provides additional methods for computing ECV estimates.

Expand source code
class Ensemble(BaggingRegressor):
    '''
    Ensemble class is built on top of sklearn.ensemble.BaggingRegressor.
    It provides additional methods for computing ECV estimates.
    '''
    def __init__(self, **kwargs):
        super(BaggingRegressor, self).__init__(**kwargs)

    # def get_coef(self, M=-1):
    #     if M < 0:
    #         M = self.n_estimators
    #     coef_ = np.mean(np.array([self.estimators_[i].coef_ for i in range(M)]), axis=0)
    #     return coef_

    def predict_individual(self: BaggingRegressor, X: np.ndarray, M: int=-1, n_jobs: int=-1, verbose: bool=0) -> np.ndarray:
        '''
        Predicts the target values for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
            regr : BaggingRegressor
                The BaggingRegressor model to use for prediction.
            X : np.ndarray
                [n, p] The input data to predict target values for.

        Returns
        ----------
            Y_hat : np.ndarray
                [n, M] The predicted target values of all $M$ estimators for the input data.
        '''
        if M < 0:
            M = self.n_estimators
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            Y_hat = parallel(
                delayed(lambda reg,X: reg.predict(X[:,features]))(reg, X)
                for reg,features in zip(self.estimators_[:M],self.estimators_features_[:M])
            )
        Y_hat = np.stack(Y_hat, axis=-1)
        return Y_hat
    

    def compute_risk(
            self, X, Y, M_test=None, return_df=False, 
            avg=True, n_jobs=-1, verbose=0, **kwargs_est):
        if M_test is None:
            M_test = self.n_estimators
        if M_test>self.n_estimators:
            raise ValueError('The ensemble size M must be less than or equal to the number of estimators in the BaggingRegressor model.')        

        Y_hat = self.predict_individual(X, M_test, n_jobs, verbose)
        if Y_hat.ndim>2:
            Y_hat = Y_hat.reshape((-1,Y_hat.shape[-1]))
        Y = Y.reshape((*Y_hat.shape[:-1],1))

        if avg:
            err_eval = avg_sq_err(Y_hat - Y)
        else:
            Y_hat = np.cumsum(Y_hat, axis=1) / np.arange(1, M_test+1)
            err_eval = (Y_hat - Y)**2
        risk = risk_estimate(err_eval, axis=0, **kwargs_est)

        if return_df:
            df = pd.DataFrame({'M':np.arange(1,M_test+1), 'risk':risk})
            return df
        else:
            return risk
    
    
    def compute_ecv_estimate(self, X_train, Y_train, M_test=None, M0=None, return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the ECV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ...] The target values of the input data.
        M_test : int or np.ndarray
            The maximum ensemble size of the ECV estimate.
        M0 : int, optional
            The number of estimators to use for the OOB estimate. If None, M0 is set to the number of estimators in the BaggingRegressor model.
        return_df : bool, optional
            If True, returns the ECV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_ecv : np.ndarray or pandas.DataFrame
            [M_test, ] The ECV estimate for each ensemble size in M_test.
        '''
        if M_test is None:
            M_test = self.n_estimators
        if M0 is None:
            M0 = self.n_estimators
        if M0<2:
            raise ValueError('The ensemble size M or M0 must be at least 2.')
        if np.isscalar(M_test):
            M_test = np.arange(1,M_test+1)
        else:
            M_test = np.array(M_test)
        ids_list = self.estimators_samples_[:M0]
        Y_hat = self.predict_individual(X_train, M0, n_jobs)
        Y_train = Y_train.reshape((*Y_hat.shape[:-1],1))
        dev_eval = Y_hat - Y_train
        err_eval = dev_eval**2
        
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            res_1 = parallel(
                delayed(lambda j:risk_estimate(
                    np.delete(err_eval[...,j], ids_list[j], axis=0).reshape(-1,1), **kwargs_est)
                       )(j)
                for j in np.arange(M0)
            )
            res_2 = parallel(
                delayed(lambda i,j:risk_estimate(
                    (np.mean(
                        np.delete(dev_eval[...,[i,j]], 
                                  np.union1d(ids_list[i], ids_list[j]), axis=0), axis=-1)**2).reshape(-1,1),
                    **kwargs_est
                ))(i,j)
                for i,j in itertools.combinations(np.arange(M0), 2)
            )
                    
        risk_ecv_1 = np.nanmean(res_1)
        risk_ecv_2 = np.nanmean(res_2)
        risk_ecv = - (1-2/M_test) * risk_ecv_1 + 2*(1-1/M_test) * risk_ecv_2
        if return_df:
            df = pd.DataFrame({'M':M_test, 'estimate':risk_ecv})
            return df
        else:
            return risk_ecv
        
    
    def extrapolate(self, risk, M_test=None):
        if M_test is None:
            M_test = self.n_estimators
        M0 = risk.shape[0]        
        if np.isscalar(M_test):
            M_test = np.arange(1,M_test+1)
        else:
            M_test = np.array(M_test)
        if M0 <2 or np.max(M_test) < M0:
            raise ValueError('The ensemble size M or M0 must be at least 2.')
        
        risk_1 = risk[0]
        risk_inf = np.sum(risk - risk_1/np.arange(1, M0+1)) / np.sum(1 - 1/np.arange(1, M0+1))
        
        risk_ecv = (1/M_test) * risk_1 + (1-1/M_test) * risk_inf
        risk_ecv[:M0] = risk
        return risk_ecv
        

    def compute_gcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the naive GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of GCV estimate to compute. Can be either 'full' (the naive GCV using full observations) or 
            'union' (the naive GCV using training observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The GCV estimate for each ensemble size in M_test.
        '''
        if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
            raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        elif Y_train.ndim>2:
            raise NotImplementedError('CGCV for multi-task regression not implemented yet.')
        if M is None:
            M = self.n_estimators        
        M_arr = np.arange(M)
        ids_list = self.estimators_samples_
        Y_hat = self.predict_individual(X_train, M, n_jobs)        
        n = X_train.shape[0]

        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            dof = parallel(
                delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]])
                       )(j)
                for j in M_arr
            )
            dof = np.mean(dof)

        if type=='full':
            err_eval = avg_sq_err(Y_hat - Y_train)
            err_train = np.mean(err_eval, axis=0)
            deno = (1 - dof/n)**2
            risk_gcv = err_train / deno
        else:
            Y_hat = np.cumsum(Y_hat, axis=1) / (M_arr+1)
            err_eval = (Y_hat - Y_train)**2
            ids_union_list = [reduce(np.union1d, ids_list[:j+1]) for j in M_arr]
            n_ids = np.array([len(ids) for ids in ids_union_list])
            err_train = np.array([np.mean(err_eval[ids_union_list[j],j]) for j in M_arr])
            deno = (1 - dof/n_ids)**2
            risk_gcv = err_train / deno
        
        if return_df:
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_gcv, 'err_train':err_train, 'deno':deno})
            return df
        else:
            return risk_gcv
        
    

    def compute_cgcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 
            'ovlp' (using overlapping observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The CGCV estimate for each ensemble size in M_test.
        '''
        if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
            # raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
            return self._compute_cgcv_estimate_general(X_train, Y_train, M, type, return_df, n_jobs, verbose, **kwargs_est)
            
        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        if M is None:
            M = self.n_estimators            
        M_arr = np.arange(M)
        ids_list = self.estimators_samples_
        Y_hat = self.predict_individual(X_train, M, n_jobs)

        n, p = X_train.shape
        if hasattr(self.estimators_[0], 'fit_intercept') and self.estimators_[0].fit_intercept:
            p += 1
        phi = p/n
        k = self.max_samples if isinstance(self.max_samples, int) else int(n * self.max_samples)
        psi = p/k

        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            dof = parallel(
                delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]]))(j)
                for j in M_arr
            )
            dof = np.mean(dof)

        err_train = (Y_hat-Y_train)**2
        if type=='full':
            correct_term = np.mean(err_train) / (1 - 2*dof/n + dof**2/(k*n))
        elif type=='ovlp':
            correct_term = np.mean([np.mean(err_train[ids_list[j],j]) for j in M_arr]) / (1 - dof/k)**2
        else:
            raise ValueError('The type must be either "full" or "ovlp".')
        
        err_eval = avg_sq_err(Y_hat - Y_train)
        err_train = np.mean(err_eval, axis=0)
        deno = (1 - dof/n)**2
        C = 1/ (n/dof - 1)**2 / (M_arr+1) * (psi/phi - 1) * correct_term
        risk_cgcv = err_train / deno - C
        
        if return_df:
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_cgcv, 'err_train':err_train, 'deno':deno, 'C':C})
            return df
        else:
            return risk_cgcv



    def _compute_cgcv_estimate_general(
            self, X_train, Y_train, M=None, type='full',
            return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
        '''
        Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

        Parameters
        ----------
        X_train : np.ndarray
            [n, p] The input covariates.
        Y_train : np.ndarray
            [n, ] The target values of the input data.
        type : str, optional
            The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 
            'ovlp' (using overlapping observations).
        return_df : bool, optional
            If True, returns the GCV estimate as a pandas.DataFrame object.
        n_jobs : int, optional
            The number of jobs to run in parallel. If -1, all CPUs are used.
        kwargs_est : dict
            Additional keyword arguments for the risk estimate.

        Returns
        --------
        risk_gcv : np.ndarray or pandas.DataFrame
            [M_test, ] The CGCV estimate for each ensemble size in M_test.
        '''
        if not hasattr(self.estimator_.__class__, 'get_gcv_input'):
            raise ValueError(
                "Explicit GCV calculation is only implemented for Ridge, Lasso, and ElasticNet regression.\n"
                "For estimator class '{estimator_class.__name__}', it needs to have 'get_gcv_input' "
                "method to compute the GCV input. The function takes training samples (X,y) in [n,p] by [n,1] "
                "as input and return a tuple of:\n"
                "(1) [n, ] residuals;\n"
                "(2) [n, ] derivative of loss evaluated at residuals;\n"
                "(3) [1, ] degrees of freedom;\n"
                "(4) [1, ] the trace of generalized smoothing matrix tr_V.")

        if Y_train.ndim==1:
            Y_train = Y_train[:,None]
        elif Y_train.ndim>2:
            raise NotImplementedError('CGCV for multi-task regression not implemented yet.')
        if M is None:
            M = self.n_estimators            
        M_arr = np.arange(M)
        ids_list = [np.sort(ids) for ids in self.estimators_samples_]
        Y_hat = self.predict_individual(X_train, M, n_jobs)

        n, p = X_train.shape
        if hasattr(self.estimators_[0], 'fit_intercept') and self.estimators_[0].fit_intercept:
            p += 1
        with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
            res = parallel(
                delayed(
                    lambda j:self.estimators_[j].get_gcv_input(X_train[ids_list[j]], Y_train[ids_list[j]])
                        )(j) for j in M_arr
            )
            r, loss_p, dof, tr_V = list(zip(*res))
            r = np.array(r).T
            loss_p = np.array(loss_p).T
            dof = np.array(dof)
            tr_V = np.array(tr_V)
        
        tmp = r + (dof / tr_V) [None,:] * loss_p
        
        if type=='full':
            r_full = Y_train - Y_hat            
            tmp_full = np.zeros_like(r_full)
            for j in range(tmp.shape[1]):
                tmp_full[ids_list[j],j] = tmp[:,j]

            del r, tmp

            def _get_est(i,j):
                return np.mean(np.where(np.isin(np.arange(n), ids_list[i]), tmp_full[:,i], r_full[:,i]) * np.where(np.isin(np.arange(n), ids_list[j]), tmp_full[:,j], r_full[:,j]))
        elif type=='ovlp':        
            def _get_est(i,j):
                ids = np.intersect1d(ids_list[i], ids_list[j])
                if len(ids)>0:
                    return np.mean(tmp[np.isin(ids_list[i],ids),i] * tmp[np.isin(ids_list[j],ids),j])
                else:
                    return np.nan                        
        else:
            raise ValueError('The type must be either "full" or "ovlp".')

        risk_M1 = np.mean(Parallel(n_jobs=n_jobs-2, verbose=verbose)(delayed(_get_est)(i=i, j=i) for i in M_arr))
        if M>1:
            risk_Minf = np.nanmean(Parallel(n_jobs=n_jobs-2, verbose=verbose)(delayed(_get_est)(i=i, j=j) for j in M_arr for i in M_arr if j<i))
            risk_cgcv = (1/(1+M_arr)) * risk_M1 + (1-1/(1+M_arr)) * risk_Minf
        else:
            risk_cgcv = np.append([risk_M1], np.ones(M-1)*np.nan)

        if return_df:
            err_eval = avg_sq_err(Y_train - Y_hat)
            err_train = np.mean(err_eval, axis=0)
            df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_cgcv, 'err_train':err_train})
            return df
        else:
            return risk_cgcv            

Ancestors

  • sklearn.ensemble._bagging.BaggingRegressor
  • sklearn.base.RegressorMixin
  • sklearn.ensemble._bagging.BaseBagging
  • sklearn.ensemble._base.BaseEnsemble
  • sklearn.base.MetaEstimatorMixin
  • sklearn.base.BaseEstimator
  • sklearn.utils._metadata_requests._MetadataRequester

Methods

def predict_individual(self: sklearn.ensemble._bagging.BaggingRegressor, X: numpy.ndarray, M: int = -1, n_jobs: int = -1, verbose: bool = 0) ‑> numpy.ndarray

Predicts the target values for the given input data using the provided BaggingRegressor model.

Parameters

regr : BaggingRegressor
    The BaggingRegressor model to use for prediction.
X : np.ndarray
    [n, p] The input data to predict target values for.

Returns

Y_hat : np.ndarray
    [n, M] The predicted target values of all $M$ estimators for the input data.
Expand source code
def predict_individual(self: BaggingRegressor, X: np.ndarray, M: int=-1, n_jobs: int=-1, verbose: bool=0) -> np.ndarray:
    '''
    Predicts the target values for the given input data using the provided BaggingRegressor model.

    Parameters
    ----------
        regr : BaggingRegressor
            The BaggingRegressor model to use for prediction.
        X : np.ndarray
            [n, p] The input data to predict target values for.

    Returns
    ----------
        Y_hat : np.ndarray
            [n, M] The predicted target values of all $M$ estimators for the input data.
    '''
    if M < 0:
        M = self.n_estimators
    with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
        Y_hat = parallel(
            delayed(lambda reg,X: reg.predict(X[:,features]))(reg, X)
            for reg,features in zip(self.estimators_[:M],self.estimators_features_[:M])
        )
    Y_hat = np.stack(Y_hat, axis=-1)
    return Y_hat
def compute_risk(self, X, Y, M_test=None, return_df=False, avg=True, n_jobs=-1, verbose=0, **kwargs_est)
Expand source code
def compute_risk(
        self, X, Y, M_test=None, return_df=False, 
        avg=True, n_jobs=-1, verbose=0, **kwargs_est):
    if M_test is None:
        M_test = self.n_estimators
    if M_test>self.n_estimators:
        raise ValueError('The ensemble size M must be less than or equal to the number of estimators in the BaggingRegressor model.')        

    Y_hat = self.predict_individual(X, M_test, n_jobs, verbose)
    if Y_hat.ndim>2:
        Y_hat = Y_hat.reshape((-1,Y_hat.shape[-1]))
    Y = Y.reshape((*Y_hat.shape[:-1],1))

    if avg:
        err_eval = avg_sq_err(Y_hat - Y)
    else:
        Y_hat = np.cumsum(Y_hat, axis=1) / np.arange(1, M_test+1)
        err_eval = (Y_hat - Y)**2
    risk = risk_estimate(err_eval, axis=0, **kwargs_est)

    if return_df:
        df = pd.DataFrame({'M':np.arange(1,M_test+1), 'risk':risk})
        return df
    else:
        return risk
def compute_ecv_estimate(self, X_train, Y_train, M_test=None, M0=None, return_df=False, n_jobs=-1, verbose=0, **kwargs_est)

Computes the ECV estimate for the given input data using the provided BaggingRegressor model.

Parameters

X_train : np.ndarray
[n, p] The input covariates.
Y_train : np.ndarray
[n, …] The target values of the input data.
M_test : int or np.ndarray
The maximum ensemble size of the ECV estimate.
M0 : int, optional
The number of estimators to use for the OOB estimate. If None, M0 is set to the number of estimators in the BaggingRegressor model.
return_df : bool, optional
If True, returns the ECV estimate as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
kwargs_est : dict
Additional keyword arguments for the risk estimate.

Returns

risk_ecv : np.ndarray or pandas.DataFrame
[M_test, ] The ECV estimate for each ensemble size in M_test.
Expand source code
def compute_ecv_estimate(self, X_train, Y_train, M_test=None, M0=None, return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
    '''
    Computes the ECV estimate for the given input data using the provided BaggingRegressor model.

    Parameters
    ----------
    X_train : np.ndarray
        [n, p] The input covariates.
    Y_train : np.ndarray
        [n, ...] The target values of the input data.
    M_test : int or np.ndarray
        The maximum ensemble size of the ECV estimate.
    M0 : int, optional
        The number of estimators to use for the OOB estimate. If None, M0 is set to the number of estimators in the BaggingRegressor model.
    return_df : bool, optional
        If True, returns the ECV estimate as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    kwargs_est : dict
        Additional keyword arguments for the risk estimate.

    Returns
    --------
    risk_ecv : np.ndarray or pandas.DataFrame
        [M_test, ] The ECV estimate for each ensemble size in M_test.
    '''
    if M_test is None:
        M_test = self.n_estimators
    if M0 is None:
        M0 = self.n_estimators
    if M0<2:
        raise ValueError('The ensemble size M or M0 must be at least 2.')
    if np.isscalar(M_test):
        M_test = np.arange(1,M_test+1)
    else:
        M_test = np.array(M_test)
    ids_list = self.estimators_samples_[:M0]
    Y_hat = self.predict_individual(X_train, M0, n_jobs)
    Y_train = Y_train.reshape((*Y_hat.shape[:-1],1))
    dev_eval = Y_hat - Y_train
    err_eval = dev_eval**2
    
    with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
        res_1 = parallel(
            delayed(lambda j:risk_estimate(
                np.delete(err_eval[...,j], ids_list[j], axis=0).reshape(-1,1), **kwargs_est)
                   )(j)
            for j in np.arange(M0)
        )
        res_2 = parallel(
            delayed(lambda i,j:risk_estimate(
                (np.mean(
                    np.delete(dev_eval[...,[i,j]], 
                              np.union1d(ids_list[i], ids_list[j]), axis=0), axis=-1)**2).reshape(-1,1),
                **kwargs_est
            ))(i,j)
            for i,j in itertools.combinations(np.arange(M0), 2)
        )
                
    risk_ecv_1 = np.nanmean(res_1)
    risk_ecv_2 = np.nanmean(res_2)
    risk_ecv = - (1-2/M_test) * risk_ecv_1 + 2*(1-1/M_test) * risk_ecv_2
    if return_df:
        df = pd.DataFrame({'M':M_test, 'estimate':risk_ecv})
        return df
    else:
        return risk_ecv
def extrapolate(self, risk, M_test=None)
Expand source code
def extrapolate(self, risk, M_test=None):
    if M_test is None:
        M_test = self.n_estimators
    M0 = risk.shape[0]        
    if np.isscalar(M_test):
        M_test = np.arange(1,M_test+1)
    else:
        M_test = np.array(M_test)
    if M0 <2 or np.max(M_test) < M0:
        raise ValueError('The ensemble size M or M0 must be at least 2.')
    
    risk_1 = risk[0]
    risk_inf = np.sum(risk - risk_1/np.arange(1, M0+1)) / np.sum(1 - 1/np.arange(1, M0+1))
    
    risk_ecv = (1/M_test) * risk_1 + (1-1/M_test) * risk_inf
    risk_ecv[:M0] = risk
    return risk_ecv
def compute_gcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est)

Computes the naive GCV estimate for the given input data using the provided BaggingRegressor model.

Parameters

X_train : np.ndarray
[n, p] The input covariates.
Y_train : np.ndarray
[n, ] The target values of the input data.
type : str, optional
The type of GCV estimate to compute. Can be either 'full' (the naive GCV using full observations) or 'union' (the naive GCV using training observations).
return_df : bool, optional
If True, returns the GCV estimate as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
kwargs_est : dict
Additional keyword arguments for the risk estimate.

Returns

risk_gcv : np.ndarray or pandas.DataFrame
[M_test, ] The GCV estimate for each ensemble size in M_test.
Expand source code
def compute_gcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
    '''
    Computes the naive GCV estimate for the given input data using the provided BaggingRegressor model.

    Parameters
    ----------
    X_train : np.ndarray
        [n, p] The input covariates.
    Y_train : np.ndarray
        [n, ] The target values of the input data.
    type : str, optional
        The type of GCV estimate to compute. Can be either 'full' (the naive GCV using full observations) or 
        'union' (the naive GCV using training observations).
    return_df : bool, optional
        If True, returns the GCV estimate as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    kwargs_est : dict
        Additional keyword arguments for the risk estimate.

    Returns
    --------
    risk_gcv : np.ndarray or pandas.DataFrame
        [M_test, ] The GCV estimate for each ensemble size in M_test.
    '''
    if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
        raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
    if Y_train.ndim==1:
        Y_train = Y_train[:,None]
    elif Y_train.ndim>2:
        raise NotImplementedError('CGCV for multi-task regression not implemented yet.')
    if M is None:
        M = self.n_estimators        
    M_arr = np.arange(M)
    ids_list = self.estimators_samples_
    Y_hat = self.predict_individual(X_train, M, n_jobs)        
    n = X_train.shape[0]

    with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
        dof = parallel(
            delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]])
                   )(j)
            for j in M_arr
        )
        dof = np.mean(dof)

    if type=='full':
        err_eval = avg_sq_err(Y_hat - Y_train)
        err_train = np.mean(err_eval, axis=0)
        deno = (1 - dof/n)**2
        risk_gcv = err_train / deno
    else:
        Y_hat = np.cumsum(Y_hat, axis=1) / (M_arr+1)
        err_eval = (Y_hat - Y_train)**2
        ids_union_list = [reduce(np.union1d, ids_list[:j+1]) for j in M_arr]
        n_ids = np.array([len(ids) for ids in ids_union_list])
        err_train = np.array([np.mean(err_eval[ids_union_list[j],j]) for j in M_arr])
        deno = (1 - dof/n_ids)**2
        risk_gcv = err_train / deno
    
    if return_df:
        df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_gcv, 'err_train':err_train, 'deno':deno})
        return df
    else:
        return risk_gcv
def compute_cgcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est)

Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

Parameters

X_train : np.ndarray
[n, p] The input covariates.
Y_train : np.ndarray
[n, ] The target values of the input data.
type : str, optional
The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 'ovlp' (using overlapping observations).
return_df : bool, optional
If True, returns the GCV estimate as a pandas.DataFrame object.
n_jobs : int, optional
The number of jobs to run in parallel. If -1, all CPUs are used.
kwargs_est : dict
Additional keyword arguments for the risk estimate.

Returns

risk_gcv : np.ndarray or pandas.DataFrame
[M_test, ] The CGCV estimate for each ensemble size in M_test.
Expand source code
def compute_cgcv_estimate(self, X_train, Y_train, M=None, type='full', return_df=False, n_jobs=-1, verbose=0, **kwargs_est):
    '''
    Computes the corrected GCV estimate for the given input data using the provided BaggingRegressor model.

    Parameters
    ----------
    X_train : np.ndarray
        [n, p] The input covariates.
    Y_train : np.ndarray
        [n, ] The target values of the input data.
    type : str, optional
        The type of CGCV estimate to compute. Can be either 'full' (using full observations) or 
        'ovlp' (using overlapping observations).
    return_df : bool, optional
        If True, returns the GCV estimate as a pandas.DataFrame object.
    n_jobs : int, optional
        The number of jobs to run in parallel. If -1, all CPUs are used.
    kwargs_est : dict
        Additional keyword arguments for the risk estimate.

    Returns
    --------
    risk_gcv : np.ndarray or pandas.DataFrame
        [M_test, ] The CGCV estimate for each ensemble size in M_test.
    '''
    if self.estimator_.__class__.__name__ not in ['Ridge', 'Lasso', 'ElasticNet']:
        # raise ValueError('GCV is only implemented for Ridge, Lasso, and ElasticNet regression.')
        return self._compute_cgcv_estimate_general(X_train, Y_train, M, type, return_df, n_jobs, verbose, **kwargs_est)
        
    if Y_train.ndim==1:
        Y_train = Y_train[:,None]
    if M is None:
        M = self.n_estimators            
    M_arr = np.arange(M)
    ids_list = self.estimators_samples_
    Y_hat = self.predict_individual(X_train, M, n_jobs)

    n, p = X_train.shape
    if hasattr(self.estimators_[0], 'fit_intercept') and self.estimators_[0].fit_intercept:
        p += 1
    phi = p/n
    k = self.max_samples if isinstance(self.max_samples, int) else int(n * self.max_samples)
    psi = p/k

    with Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=verbose) as parallel:
        dof = parallel(
            delayed(lambda j:degree_of_freedom(self.estimators_[j], X_train[ids_list[j]]))(j)
            for j in M_arr
        )
        dof = np.mean(dof)

    err_train = (Y_hat-Y_train)**2
    if type=='full':
        correct_term = np.mean(err_train) / (1 - 2*dof/n + dof**2/(k*n))
    elif type=='ovlp':
        correct_term = np.mean([np.mean(err_train[ids_list[j],j]) for j in M_arr]) / (1 - dof/k)**2
    else:
        raise ValueError('The type must be either "full" or "ovlp".')
    
    err_eval = avg_sq_err(Y_hat - Y_train)
    err_train = np.mean(err_eval, axis=0)
    deno = (1 - dof/n)**2
    C = 1/ (n/dof - 1)**2 / (M_arr+1) * (psi/phi - 1) * correct_term
    risk_cgcv = err_train / deno - C
    
    if return_df:
        df = pd.DataFrame({'M':M_arr+1, 'estimate':risk_cgcv, 'err_train':err_train, 'deno':deno, 'C':C})
        return df
    else:
        return risk_cgcv
def set_fit_request(self: Ensemble, *, sample_weight: Union[bool, NoneType, str] = '$UNCHANGED$') ‑> Ensemble

Request metadata passed to the fit method.

Note that this method is only relevant if enable_metadata_routing=True (see :func:sklearn.set_config). Please see :ref:User Guide <metadata_routing> on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to fit if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to fit.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version: 1.3

Note

This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:~sklearn.pipeline.Pipeline. Otherwise it has no effect.

Parameters

sample_weight : str, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED
Metadata routing for sample_weight parameter in fit.

Returns

self : object
The updated object.
Expand source code
def func(**kw):
    """Updates the request for provided parameters

    This docstring is overwritten below.
    See REQUESTER_DOC for expected functionality
    """
    if not _routing_enabled():
        raise RuntimeError(
            "This method is only available when metadata routing is enabled."
            " You can enable it using"
            " sklearn.set_config(enable_metadata_routing=True)."
        )

    if self.validate_keys and (set(kw) - set(self.keys)):
        raise TypeError(
            f"Unexpected args: {set(kw) - set(self.keys)}. Accepted arguments"
            f" are: {set(self.keys)}"
        )

    requests = instance._get_metadata_request()
    method_metadata_request = getattr(requests, self.name)

    for prop, alias in kw.items():
        if alias is not UNCHANGED:
            method_metadata_request.add_request(param=prop, alias=alias)
    instance._metadata_request = requests

    return instance
def set_score_request(self: Ensemble, *, sample_weight: Union[bool, NoneType, str] = '$UNCHANGED$') ‑> Ensemble

Request metadata passed to the score method.

Note that this method is only relevant if enable_metadata_routing=True (see :func:sklearn.set_config). Please see :ref:User Guide <metadata_routing> on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to score if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to score.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version: 1.3

Note

This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:~sklearn.pipeline.Pipeline. Otherwise it has no effect.

Parameters

sample_weight : str, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED
Metadata routing for sample_weight parameter in score.

Returns

self : object
The updated object.
Expand source code
def func(**kw):
    """Updates the request for provided parameters

    This docstring is overwritten below.
    See REQUESTER_DOC for expected functionality
    """
    if not _routing_enabled():
        raise RuntimeError(
            "This method is only available when metadata routing is enabled."
            " You can enable it using"
            " sklearn.set_config(enable_metadata_routing=True)."
        )

    if self.validate_keys and (set(kw) - set(self.keys)):
        raise TypeError(
            f"Unexpected args: {set(kw) - set(self.keys)}. Accepted arguments"
            f" are: {set(self.keys)}"
        )

    requests = instance._get_metadata_request()
    method_metadata_request = getattr(requests, self.name)

    for prop, alias in kw.items():
        if alias is not UNCHANGED:
            method_metadata_request.add_request(param=prop, alias=alias)
    instance._metadata_request = requests

    return instance