CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-imbalanced-learn

Toolbox for imbalanced dataset in machine learning

Pending
Overview
Eval results
Files

pipeline.mddocs/

Pipeline Integration

Advanced pipeline functionality that extends scikit-learn's Pipeline class to seamlessly integrate sampling algorithms with machine learning workflows. Ensures proper handling of resampling operations during model training while maintaining compatibility with cross-validation and model selection procedures.

Overview

The imbalanced-learn pipeline system addresses key challenges when combining sampling methods with machine learning pipelines:

  • Resampling Integration: Native support for fit_resample() methods in pipeline steps
  • Cross-validation Safety: Prevents data leakage by applying sampling only during training phases
  • Memory Management: Optional caching of expensive transformations and sampling operations
  • Parameter Routing: Advanced metadata routing for passing parameters to specific pipeline steps
  • Transform Input: Ability to transform input parameters through pipeline stages

The pipeline components extend scikit-learn's pipeline functionality while maintaining full API compatibility.

Pipeline Class

Pipeline

Extended pipeline class that supports both transformers and samplers in a unified workflow.

class Pipeline(pipeline.Pipeline):
    def __init__(
        self,
        steps,
        *,
        transform_input=None,
        memory=None,
        verbose=False,
    ):
        """
        Parameters
        ----------
        steps : list of (str, transformer/sampler) tuples
            List of (name, transform) tuples implementing fit/transform/fit_resample
            that are chained in order, with the last object an estimator.
            
        transform_input : list of str, default=None
            Names of metadata parameters that should be transformed by the pipeline
            before passing to the step consuming them. Enables transforming input
            arguments to fit() other than X. Only available with metadata routing enabled.
            
        memory : None, str or object with joblib.Memory interface, default=None
            Used to cache fitted transformers of the pipeline. If string, path to
            caching directory. Caching triggers cloning of transformers before fitting.
            
        verbose : bool, default=False
            If True, time elapsed while fitting each step will be printed.
        """

    def fit(self, X, y=None, **params):
        """
        Fit the model.
        
        Fits all transforms/samplers sequentially and transform/sample the data,
        then fits the final estimator on the transformed/sampled data.
        
        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first pipeline step.
            
        y : iterable, default=None
            Training targets. Must fulfill label requirements for all pipeline steps.
            
        **params : dict of str -> object
            Parameters passed to fit method of each step. Parameter names prefixed
            with step name and '__' separator (e.g., 'step__parameter').
            With metadata routing, parameters are forwarded based on step requests.
            
        Returns
        -------
        self : Pipeline
            Fitted pipeline instance.
        """

    def fit_transform(self, X, y=None, **params):
        """
        Fit the model and transform with the final estimator.
        
        Fits all transformers/samplers sequentially, then uses fit_transform
        on transformed data with the final estimator.
        
        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first pipeline step.
            
        y : iterable, default=None
            Training targets. Must fulfill label requirements for all pipeline steps.
            
        **params : dict of str -> object
            Parameters for fit method of each step using 'step__parameter' format.
            
        Returns
        -------
        Xt : array-like of shape (n_samples, n_transformed_features)
            Transformed samples from final estimator.
        """

    def fit_resample(self, X, y=None, **params):
        """
        Fit the model and resample with the final estimator.
        
        Fits all transformers/samplers sequentially, then uses fit_resample
        on transformed data with the final estimator.
        
        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first pipeline step.
            
        y : iterable, default=None
            Training targets. Must fulfill label requirements for all pipeline steps.
            
        **params : dict of str -> object
            Parameters for fit method of each step using 'step__parameter' format.
            
        Returns
        -------
        Xt : array-like of shape (n_samples_new, n_transformed_features)
            Resampled and transformed samples.
            
        yt : array-like of shape (n_samples_new,)
            Resampled target labels.
        """

    def predict(self, X, **params):
        """
        Transform data and apply predict with final estimator.
        
        Parameters
        ----------
        X : iterable
            Data to predict on. Must fulfill input requirements of first step.
            
        **params : dict of str -> object
            Parameters for predict method of final estimator.
            
        Returns
        -------
        y_pred : ndarray
            Predictions from final estimator.
        """

    def predict_proba(self, X, **params):
        """
        Transform data and apply predict_proba with final estimator.
        
        Parameters
        ----------
        X : iterable
            Data to predict probabilities for.
            
        **params : dict of str -> object
            Parameters for predict_proba method of final estimator.
            
        Returns
        -------
        y_proba : ndarray of shape (n_samples, n_classes)
            Class probability predictions.
        """

    def transform(self, X, **params):
        """
        Transform data through all pipeline steps.
        
        Parameters
        ----------
        X : iterable
            Data to transform through pipeline steps.
            
        **params : dict of str -> object
            Parameters for transform methods of pipeline steps.
            
        Returns
        -------
        Xt : ndarray
            Transformed data.
        """

    def inverse_transform(self, Xt, **params):
        """
        Apply inverse_transform for each step in reverse order.
        
        Parameters
        ----------
        Xt : array-like
            Transformed data to inverse transform.
            
        **params : dict of str -> object
            Parameters for inverse_transform methods.
            
        Returns
        -------
        X : ndarray
            Data in original feature space.
        """

Attributes

# Pipeline attributes after fitting
pipeline.named_steps  # Bunch object for accessing steps by name
pipeline.classes_     # Class labels from final estimator  
pipeline.n_features_in_     # Number of input features
pipeline.feature_names_in_  # Input feature names (if available)

Helper Functions

make_pipeline

Construct a Pipeline from estimators without explicit naming.

def make_pipeline(
    *steps,
    memory=None,
    transform_input=None,
    verbose=False,
):
    """
    Construct Pipeline from given estimators.
    
    Shorthand for Pipeline constructor that automatically names estimators
    based on their class names in lowercase.
    
    Parameters
    ----------
    *steps : list of estimators
        Sequence of estimators to chain in pipeline.
        
    memory : None, str or object with joblib.Memory interface, default=None
        Used to cache fitted transformers. If string, path to caching directory.
        
    transform_input : list of str, default=None
        Names of metadata parameters to transform through pipeline steps.
        Only available with metadata routing enabled.
        
    verbose : bool, default=False
        If True, print time elapsed while fitting each step.
        
    Returns
    -------
    p : Pipeline
        Imbalanced-learn Pipeline instance that handles samplers.
    """

Key Differences from sklearn.pipeline.Pipeline

The imbalanced-learn Pipeline class extends scikit-learn's Pipeline with several important enhancements:

1. Sampler Support

  • fit_resample() Integration: Native support for samplers that implement fit_resample() method
  • Resampling During Fit: Samplers are applied only during fit stages, not during transform/predict
  • Mixed Steps: Can combine transformers (fit/transform) and samplers (fit_resample) in same pipeline

2. Enhanced Validation

  • Step Validation: Ensures intermediate steps implement either transform or fit_resample, but not both
  • Pipeline Nesting: Prevents nesting of Pipeline objects within steps to avoid complexity
  • Passthrough Support: Supports 'passthrough' and None values for skipping steps

3. Fit/Transform Behavior Warning

The pipeline breaks scikit-learn's usual contract where fit_transform(X, y) equals fit(X, y).transform(X):

  • fit_transform(): Applies resampling during the process
  • fit().transform(): No resampling applied during transform phase
  • This ensures proper cross-validation behavior but can be surprising

Usage Examples

Basic Pipeline Creation

from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import EditedNearestNeighbours
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier

# Create pipeline with preprocessing, sampling, and classification
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('sampling', SMOTE(random_state=42)),
    ('pca', PCA(n_components=10)),
    ('classifier', RandomForestClassifier(random_state=42))
])

# Fit pipeline - resampling happens during fit
pipeline.fit(X_train, y_train)

# Make predictions - no resampling during prediction
y_pred = pipeline.predict(X_test)

Pipeline with Multiple Sampling Steps

from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import EditedNearestNeighbours
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

# Combine over-sampling and under-sampling
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('over_sampling', SMOTE(random_state=42)),
    ('under_sampling', EditedNearestNeighbours()),
    ('classifier', SVC(probability=True))
])

pipeline.fit(X_train, y_train)
probabilities = pipeline.predict_proba(X_test)

Using make_pipeline

from imblearn.pipeline import make_pipeline
from imblearn.over_sampling import ADASYN
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LogisticRegression

# Automatic step naming based on class names
pipeline = make_pipeline(
    MinMaxScaler(),
    ADASYN(random_state=42),
    LogisticRegression(random_state=42),
    verbose=True  # Print timing information
)

pipeline.fit(X_train, y_train)
print(f"Pipeline steps: {list(pipeline.named_steps.keys())}")
# Output: ['minmaxscaler', 'adasyn', 'logisticregression']

Cross-validation with Pipeline

from sklearn.model_selection import cross_val_score
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import RandomForestClassifier

# Create pipeline for cross-validation
pipeline = Pipeline([
    ('sampling', SMOTE(random_state=42)),
    ('classifier', RandomForestClassifier(random_state=42))
])

# Cross-validation applies sampling within each fold
scores = cross_val_score(pipeline, X, y, cv=5, scoring='f1')
print(f"Cross-validation F1 scores: {scores}")
print(f"Mean F1 score: {scores.mean():.3f} (+/- {scores.std() * 2:.3f})")

Memory Caching for Expensive Operations

from sklearn.externals import joblib
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier

# Cache expensive transformations
cachedir = '/tmp/joblib_cache'
memory = joblib.Memory(cachedir, verbose=0)

pipeline = Pipeline([
    ('sampling', SMOTE(random_state=42)),
    ('pca', PCA(n_components=50)),  # Expensive for large datasets
    ('classifier', RandomForestClassifier(random_state=42))
], memory=memory)

# First fit caches transformations
pipeline.fit(X_train, y_train)

# Subsequent fits with same parameters use cache
pipeline.set_params(classifier__n_estimators=200)
pipeline.fit(X_train, y_train)  # Reuses cached SMOTE and PCA results

Parameter Grid Search

from sklearn.model_selection import GridSearchCV
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.svm import SVC

pipeline = Pipeline([
    ('sampling', SMOTE()),
    ('classifier', SVC())
])

# Define parameter grid with step prefixes
param_grid = {
    'sampling__k_neighbors': [3, 5, 7],
    'sampling__random_state': [42],
    'classifier__C': [0.1, 1, 10],
    'classifier__kernel': ['rbf', 'linear']
}

# Grid search with cross-validation
grid_search = GridSearchCV(
    pipeline, 
    param_grid, 
    cv=5, 
    scoring='f1',
    n_jobs=-1
)

grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best cross-validation score: {grid_search.best_score_:.3f}")

Advanced: Transform Input Parameters

from sklearn.set_config import set_config
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

# Enable metadata routing (sklearn >= 1.4)
set_config(enable_metadata_routing=True)

# Pipeline that transforms validation set through preprocessing
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('sampling', SMOTE(random_state=42)),
    ('classifier', RandomForestClassifier())
], transform_input=['X_val'])

# Fit with validation set that gets transformed
pipeline.fit(X_train, y_train, X_val=X_val, y_val=y_val)

Custom Step Access and Inspection

from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('sampling', SMOTE(random_state=42)),
    ('classifier', RandomForestClassifier(random_state=42))
])

pipeline.fit(X_train, y_train)

# Access individual steps
scaler = pipeline.named_steps['scaler']
sampler = pipeline.named_steps['sampling']
classifier = pipeline.named_steps['classifier']

# Get feature importance from final estimator
feature_importance = pipeline.named_steps['classifier'].feature_importances_

# Get resampling information
print(f"Original samples: {len(y_train)}")
# Note: Cannot directly get resampled data as sampling only occurs during fit

# Access pipeline properties
print(f"Number of pipeline steps: {len(pipeline.steps)}")
print(f"Step names: {list(pipeline.named_steps.keys())}")
print(f"Classes: {pipeline.classes_}")

Best Practices

1. Cross-validation Safety

Always use the pipeline for cross-validation to prevent data leakage:

# Correct: Sampling happens within each CV fold
scores = cross_val_score(pipeline, X, y, cv=5)

# Incorrect: Sampling applied to entire dataset first
X_resampled, y_resampled = smote.fit_resample(X, y)
scores = cross_val_score(classifier, X_resampled, y_resampled, cv=5)

2. Parameter Naming

Use double underscore notation for step-specific parameters:

# Correct parameter naming
pipeline.set_params(
    sampling__k_neighbors=7,
    classifier__n_estimators=100
)

# Access parameters
params = pipeline.get_params()
print(params['sampling__random_state'])

3. Memory Management

Use caching for expensive operations in iterative workflows:

# Cache expensive transformations
pipeline = Pipeline([
    ('expensive_transform', ExpensiveTransformer()),
    ('sampling', SMOTE()),
    ('classifier', RandomForestClassifier())
], memory='/tmp/cache')

4. Debugging and Monitoring

Use verbose mode and step inspection for debugging:

# Enable timing information
pipeline = Pipeline(steps, verbose=True)

# Inspect individual steps after fitting
for name, step in pipeline.named_steps.items():
    print(f"Step {name}: {step}")

Install with Tessl CLI

npx tessl i tessl/pypi-imbalanced-learn

docs

combination.md

datasets.md

deep-learning.md

ensemble.md

index.md

metrics.md

model-selection.md

over-sampling.md

pipeline.md

under-sampling.md

utilities.md

tile.json