Toolbox for imbalanced dataset in machine learning
—
Advanced pipeline functionality that extends scikit-learn's Pipeline class to seamlessly integrate sampling algorithms with machine learning workflows. Ensures proper handling of resampling operations during model training while maintaining compatibility with cross-validation and model selection procedures.
The imbalanced-learn pipeline system addresses key challenges when combining sampling methods with machine learning pipelines:
fit_resample() methods in pipeline stepsThe pipeline components extend scikit-learn's pipeline functionality while maintaining full API compatibility.
Extended pipeline class that supports both transformers and samplers in a unified workflow.
class Pipeline(pipeline.Pipeline):
def __init__(
self,
steps,
*,
transform_input=None,
memory=None,
verbose=False,
):
"""
Parameters
----------
steps : list of (str, transformer/sampler) tuples
List of (name, transform) tuples implementing fit/transform/fit_resample
that are chained in order, with the last object an estimator.
transform_input : list of str, default=None
Names of metadata parameters that should be transformed by the pipeline
before passing to the step consuming them. Enables transforming input
arguments to fit() other than X. Only available with metadata routing enabled.
memory : None, str or object with joblib.Memory interface, default=None
Used to cache fitted transformers of the pipeline. If string, path to
caching directory. Caching triggers cloning of transformers before fitting.
verbose : bool, default=False
If True, time elapsed while fitting each step will be printed.
"""
def fit(self, X, y=None, **params):
"""
Fit the model.
Fits all transforms/samplers sequentially and transform/sample the data,
then fits the final estimator on the transformed/sampled data.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first pipeline step.
y : iterable, default=None
Training targets. Must fulfill label requirements for all pipeline steps.
**params : dict of str -> object
Parameters passed to fit method of each step. Parameter names prefixed
with step name and '__' separator (e.g., 'step__parameter').
With metadata routing, parameters are forwarded based on step requests.
Returns
-------
self : Pipeline
Fitted pipeline instance.
"""
def fit_transform(self, X, y=None, **params):
"""
Fit the model and transform with the final estimator.
Fits all transformers/samplers sequentially, then uses fit_transform
on transformed data with the final estimator.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first pipeline step.
y : iterable, default=None
Training targets. Must fulfill label requirements for all pipeline steps.
**params : dict of str -> object
Parameters for fit method of each step using 'step__parameter' format.
Returns
-------
Xt : array-like of shape (n_samples, n_transformed_features)
Transformed samples from final estimator.
"""
def fit_resample(self, X, y=None, **params):
"""
Fit the model and resample with the final estimator.
Fits all transformers/samplers sequentially, then uses fit_resample
on transformed data with the final estimator.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first pipeline step.
y : iterable, default=None
Training targets. Must fulfill label requirements for all pipeline steps.
**params : dict of str -> object
Parameters for fit method of each step using 'step__parameter' format.
Returns
-------
Xt : array-like of shape (n_samples_new, n_transformed_features)
Resampled and transformed samples.
yt : array-like of shape (n_samples_new,)
Resampled target labels.
"""
def predict(self, X, **params):
"""
Transform data and apply predict with final estimator.
Parameters
----------
X : iterable
Data to predict on. Must fulfill input requirements of first step.
**params : dict of str -> object
Parameters for predict method of final estimator.
Returns
-------
y_pred : ndarray
Predictions from final estimator.
"""
def predict_proba(self, X, **params):
"""
Transform data and apply predict_proba with final estimator.
Parameters
----------
X : iterable
Data to predict probabilities for.
**params : dict of str -> object
Parameters for predict_proba method of final estimator.
Returns
-------
y_proba : ndarray of shape (n_samples, n_classes)
Class probability predictions.
"""
def transform(self, X, **params):
"""
Transform data through all pipeline steps.
Parameters
----------
X : iterable
Data to transform through pipeline steps.
**params : dict of str -> object
Parameters for transform methods of pipeline steps.
Returns
-------
Xt : ndarray
Transformed data.
"""
def inverse_transform(self, Xt, **params):
"""
Apply inverse_transform for each step in reverse order.
Parameters
----------
Xt : array-like
Transformed data to inverse transform.
**params : dict of str -> object
Parameters for inverse_transform methods.
Returns
-------
X : ndarray
Data in original feature space.
"""# Pipeline attributes after fitting
pipeline.named_steps # Bunch object for accessing steps by name
pipeline.classes_ # Class labels from final estimator
pipeline.n_features_in_ # Number of input features
pipeline.feature_names_in_ # Input feature names (if available)Construct a Pipeline from estimators without explicit naming.
def make_pipeline(
*steps,
memory=None,
transform_input=None,
verbose=False,
):
"""
Construct Pipeline from given estimators.
Shorthand for Pipeline constructor that automatically names estimators
based on their class names in lowercase.
Parameters
----------
*steps : list of estimators
Sequence of estimators to chain in pipeline.
memory : None, str or object with joblib.Memory interface, default=None
Used to cache fitted transformers. If string, path to caching directory.
transform_input : list of str, default=None
Names of metadata parameters to transform through pipeline steps.
Only available with metadata routing enabled.
verbose : bool, default=False
If True, print time elapsed while fitting each step.
Returns
-------
p : Pipeline
Imbalanced-learn Pipeline instance that handles samplers.
"""The imbalanced-learn Pipeline class extends scikit-learn's Pipeline with several important enhancements:
fit_resample() methodThe pipeline breaks scikit-learn's usual contract where fit_transform(X, y) equals fit(X, y).transform(X):
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import EditedNearestNeighbours
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
# Create pipeline with preprocessing, sampling, and classification
pipeline = Pipeline([
('scaler', StandardScaler()),
('sampling', SMOTE(random_state=42)),
('pca', PCA(n_components=10)),
('classifier', RandomForestClassifier(random_state=42))
])
# Fit pipeline - resampling happens during fit
pipeline.fit(X_train, y_train)
# Make predictions - no resampling during prediction
y_pred = pipeline.predict(X_test)from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import EditedNearestNeighbours
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
# Combine over-sampling and under-sampling
pipeline = Pipeline([
('scaler', StandardScaler()),
('over_sampling', SMOTE(random_state=42)),
('under_sampling', EditedNearestNeighbours()),
('classifier', SVC(probability=True))
])
pipeline.fit(X_train, y_train)
probabilities = pipeline.predict_proba(X_test)from imblearn.pipeline import make_pipeline
from imblearn.over_sampling import ADASYN
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LogisticRegression
# Automatic step naming based on class names
pipeline = make_pipeline(
MinMaxScaler(),
ADASYN(random_state=42),
LogisticRegression(random_state=42),
verbose=True # Print timing information
)
pipeline.fit(X_train, y_train)
print(f"Pipeline steps: {list(pipeline.named_steps.keys())}")
# Output: ['minmaxscaler', 'adasyn', 'logisticregression']from sklearn.model_selection import cross_val_score
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import RandomForestClassifier
# Create pipeline for cross-validation
pipeline = Pipeline([
('sampling', SMOTE(random_state=42)),
('classifier', RandomForestClassifier(random_state=42))
])
# Cross-validation applies sampling within each fold
scores = cross_val_score(pipeline, X, y, cv=5, scoring='f1')
print(f"Cross-validation F1 scores: {scores}")
print(f"Mean F1 score: {scores.mean():.3f} (+/- {scores.std() * 2:.3f})")from sklearn.externals import joblib
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
# Cache expensive transformations
cachedir = '/tmp/joblib_cache'
memory = joblib.Memory(cachedir, verbose=0)
pipeline = Pipeline([
('sampling', SMOTE(random_state=42)),
('pca', PCA(n_components=50)), # Expensive for large datasets
('classifier', RandomForestClassifier(random_state=42))
], memory=memory)
# First fit caches transformations
pipeline.fit(X_train, y_train)
# Subsequent fits with same parameters use cache
pipeline.set_params(classifier__n_estimators=200)
pipeline.fit(X_train, y_train) # Reuses cached SMOTE and PCA resultsfrom sklearn.model_selection import GridSearchCV
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.svm import SVC
pipeline = Pipeline([
('sampling', SMOTE()),
('classifier', SVC())
])
# Define parameter grid with step prefixes
param_grid = {
'sampling__k_neighbors': [3, 5, 7],
'sampling__random_state': [42],
'classifier__C': [0.1, 1, 10],
'classifier__kernel': ['rbf', 'linear']
}
# Grid search with cross-validation
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='f1',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best cross-validation score: {grid_search.best_score_:.3f}")from sklearn.set_config import set_config
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
# Enable metadata routing (sklearn >= 1.4)
set_config(enable_metadata_routing=True)
# Pipeline that transforms validation set through preprocessing
pipeline = Pipeline([
('scaler', StandardScaler()),
('sampling', SMOTE(random_state=42)),
('classifier', RandomForestClassifier())
], transform_input=['X_val'])
# Fit with validation set that gets transformed
pipeline.fit(X_train, y_train, X_val=X_val, y_val=y_val)from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
pipeline = Pipeline([
('scaler', StandardScaler()),
('sampling', SMOTE(random_state=42)),
('classifier', RandomForestClassifier(random_state=42))
])
pipeline.fit(X_train, y_train)
# Access individual steps
scaler = pipeline.named_steps['scaler']
sampler = pipeline.named_steps['sampling']
classifier = pipeline.named_steps['classifier']
# Get feature importance from final estimator
feature_importance = pipeline.named_steps['classifier'].feature_importances_
# Get resampling information
print(f"Original samples: {len(y_train)}")
# Note: Cannot directly get resampled data as sampling only occurs during fit
# Access pipeline properties
print(f"Number of pipeline steps: {len(pipeline.steps)}")
print(f"Step names: {list(pipeline.named_steps.keys())}")
print(f"Classes: {pipeline.classes_}")Always use the pipeline for cross-validation to prevent data leakage:
# Correct: Sampling happens within each CV fold
scores = cross_val_score(pipeline, X, y, cv=5)
# Incorrect: Sampling applied to entire dataset first
X_resampled, y_resampled = smote.fit_resample(X, y)
scores = cross_val_score(classifier, X_resampled, y_resampled, cv=5)Use double underscore notation for step-specific parameters:
# Correct parameter naming
pipeline.set_params(
sampling__k_neighbors=7,
classifier__n_estimators=100
)
# Access parameters
params = pipeline.get_params()
print(params['sampling__random_state'])Use caching for expensive operations in iterative workflows:
# Cache expensive transformations
pipeline = Pipeline([
('expensive_transform', ExpensiveTransformer()),
('sampling', SMOTE()),
('classifier', RandomForestClassifier())
], memory='/tmp/cache')Use verbose mode and step inspection for debugging:
# Enable timing information
pipeline = Pipeline(steps, verbose=True)
# Inspect individual steps after fitting
for name, step in pipeline.named_steps.items():
print(f"Step {name}: {step}")Install with Tessl CLI
npx tessl i tessl/pypi-imbalanced-learn