CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-itk

Insight Toolkit for N-dimensional image processing, segmentation, and registration in medical and scientific applications

Pending
Overview
Eval results
Files

real-world-scenarios.mddocs/examples/

Real-World ITK Scenarios

Complete, production-ready examples for common image processing tasks.

Medical Image Analysis

DICOM Series Processing

import itk
import numpy as np

def process_dicom_series(dicom_dir, output_file):
    """
    Read DICOM series, apply preprocessing, and save result.
    """
    try:
        # Read DICOM series
        print(f"Reading DICOM series from {dicom_dir}")
        image = itk.imread(dicom_dir, itk.SS)  # Signed short for CT/MRI
        
        print(f"Image size: {itk.size(image)}")
        print(f"Spacing: {itk.spacing(image)}")
        
        # Preprocessing pipeline
        print("Applying preprocessing...")
        
        # 1. Smoothing to reduce noise
        smoothed = itk.discrete_gaussian_image_filter(
            image,
            variance=1.0
        )
        
        # 2. Resample to isotropic spacing
        original_spacing = itk.spacing(smoothed)
        target_spacing = [1.0, 1.0, 1.0]
        
        original_size = itk.size(smoothed)
        new_size = [
            int(original_size[i] * original_spacing[i] / target_spacing[i])
            for i in range(3)
        ]
        
        resampled = itk.resample_image_filter(
            smoothed,
            size=new_size,
            output_spacing=target_spacing
        )
        
        # Save result
        print(f"Saving to {output_file}")
        itk.imwrite(resampled, output_file, compression=True)
        
        print("Processing complete!")
        return resampled
        
    except RuntimeError as e:
        print(f"Error processing DICOM: {e}")
        return None

# Usage
result = process_dicom_series('/path/to/dicom/', 'processed.nii.gz')

Organ Segmentation Pipeline

import itk

def segment_organ(image_file, seed_points, output_file):
    """
    Segment organ using region growing with preprocessing.
    """
    # Load image
    image = itk.imread(image_file, itk.F)
    
    # Preprocessing
    smoothed = itk.curvature_anisotropic_diffusion_image_filter(
        image,
        number_of_iterations=5,
        time_step=0.0625,
        conductance_parameter=3.0
    )
    
    # Region growing segmentation
    ImageType = itk.Image[itk.F, 3]
    OutputType = itk.Image[itk.UC, 3]
    
    segmenter = itk.ConfidenceConnectedImageFilter[ImageType, OutputType].New()
    segmenter.SetInput(smoothed)
    
    # Add seed points
    for x, y, z in seed_points:
        seed = itk.Index[3]()
        seed[0], seed[1], seed[2] = x, y, z
        segmenter.AddSeed(seed)
    
    segmenter.SetMultiplier(2.5)
    segmenter.SetNumberOfIterations(5)
    segmenter.SetReplaceValue(255)
    
    segmenter.Update()
    segmented = segmenter.GetOutput()
    
    # Post-processing: morphological closing to fill holes
    closed = itk.binary_morphological_closing_image_filter(
        segmented,
        radius=3,
        foreground_value=255
    )
    
    # Save
    itk.imwrite(closed, output_file, compression=True)
    
    return closed

# Usage
seeds = [(120, 150, 80), (125, 155, 82)]  # Multiple seed points
result = segment_organ('ct_scan.nii.gz', seeds, 'liver_segmentation.nii.gz')

Image Registration

Multi-Modal Registration (MRI to CT)

import itk

def register_multimodal(fixed_file, moving_file, output_file):
    """
    Register two images from different modalities using mutual information.
    """
    # Load images
    fixed = itk.imread(fixed_file, itk.F)
    moving = itk.imread(moving_file, itk.F)
    
    print(f"Fixed image size: {itk.size(fixed)}")
    print(f"Moving image size: {itk.size(moving)}")
    
    # Setup registration
    ImageType = itk.Image[itk.F, 3]
    TransformType = itk.VersorRigid3DTransform[itk.D]
    
    registration = itk.ImageRegistrationMethodv4[ImageType, ImageType].New()
    registration.SetFixedImage(fixed)
    registration.SetMovingImage(moving)
    
    # Initialize transform with image centers
    transform = TransformType.New()
    initializer = itk.CenteredTransformInitializer[
        TransformType, ImageType, ImageType
    ].New()
    initializer.SetTransform(transform)
    initializer.SetFixedImage(fixed)
    initializer.SetMovingImage(moving)
    initializer.GeometryOn()
    initializer.InitializeTransform()
    
    registration.SetInitialTransform(transform)
    
    # Setup metric (mutual information for multi-modal)
    metric = itk.MattesMutualInformationImageToImageMetricv4[
        ImageType, ImageType
    ].New()
    metric.SetNumberOfHistogramBins(50)
    registration.SetMetric(metric)
    
    # Setup optimizer
    optimizer = itk.RegularStepGradientDescentOptimizerv4Template[itk.D].New()
    optimizer.SetLearningRate(1.0)
    optimizer.SetMinimumStepLength(0.001)
    optimizer.SetNumberOfIterations(200)
    registration.SetOptimizer(optimizer)
    
    # Multi-resolution
    registration.SetNumberOfLevels(3)
    registration.SetSmoothingSigmasPerLevel([2.0, 1.0, 0.0])
    registration.SetShrinkFactorsPerLevel([4, 2, 1])
    
    # Run registration
    print("Running registration...")
    registration.Update()
    
    final_transform = registration.GetTransform()
    final_metric = registration.GetMetricValue()
    
    print(f"Final metric value: {final_metric}")
    print(f"Optimizer stop condition: {registration.GetOptimizerStopConditionDescription()}")
    
    # Apply transform to moving image
    resampled = itk.resample_image_filter(
        moving,
        transform=final_transform,
        size=itk.size(fixed),
        output_spacing=itk.spacing(fixed),
        output_origin=itk.origin(fixed),
        output_direction=fixed.GetDirection()
    )
    
    # Save registered image
    itk.imwrite(resampled, output_file, compression=True)
    
    # Save transform
    itk.transformwrite(final_transform, output_file.replace('.nii.gz', '_transform.h5'))
    
    return resampled, final_transform

# Usage
registered, transform = register_multimodal(
    'mri_t1.nii.gz',
    'ct_scan.nii.gz',
    'ct_registered_to_mri.nii.gz'
)

Deformable Registration

import itk

def deformable_registration(fixed_file, moving_file, output_file):
    """
    Non-rigid registration using Demons algorithm.
    """
    # Load images
    fixed = itk.imread(fixed_file, itk.F)
    moving = itk.imread(moving_file, itk.F)
    
    # Setup Demons registration
    ImageType = itk.Image[itk.F, 3]
    VectorType = itk.Vector[itk.F, 3]
    DisplacementFieldType = itk.Image[VectorType, 3]
    
    demons = itk.DiffeomorphicDemonsRegistrationFilter[
        ImageType, ImageType, DisplacementFieldType
    ].New()
    
    demons.SetFixedImage(fixed)
    demons.SetMovingImage(moving)
    demons.SetNumberOfIterations(50)
    demons.SetStandardDeviations(1.0)
    
    print("Running deformable registration...")
    demons.Update()
    
    displacement_field = demons.GetDisplacementField()
    
    # Warp moving image
    warped = itk.warp_image_filter(moving, displacement_field)
    
    # Save results
    itk.imwrite(warped, output_file, compression=True)
    itk.imwrite(
        displacement_field,
        output_file.replace('.nii.gz', '_field.nii.gz'),
        compression=True
    )
    
    return warped, displacement_field

# Usage
warped, field = deformable_registration(
    'fixed.nii.gz',
    'moving.nii.gz',
    'warped.nii.gz'
)

Batch Processing

Process Multiple Files

import itk
from pathlib import Path

def batch_process_images(input_dir, output_dir, operation='smooth'):
    """
    Process all images in a directory.
    """
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)
    
    # Find all image files
    image_files = list(input_path.glob('*.png')) + \
                  list(input_path.glob('*.jpg')) + \
                  list(input_path.glob('*.nii.gz'))
    
    print(f"Found {len(image_files)} images to process")
    
    for i, image_file in enumerate(image_files, 1):
        print(f"Processing {i}/{len(image_files)}: {image_file.name}")
        
        try:
            # Load image
            image = itk.imread(str(image_file), itk.F)
            
            # Apply operation
            if operation == 'smooth':
                result = itk.median_image_filter(image, radius=2)
            elif operation == 'edges':
                result = itk.canny_edge_detection_image_filter(
                    image, variance=2.0
                )
            elif operation == 'threshold':
                result = itk.otsu_threshold_image_filter(image)
            else:
                result = image
            
            # Save result
            output_file = output_path / image_file.name
            itk.imwrite(result, str(output_file))
            
        except Exception as e:
            print(f"  Error processing {image_file.name}: {e}")
            continue
    
    print("Batch processing complete!")

# Usage
batch_process_images('input_images/', 'output_images/', operation='smooth')

Integration with NumPy/SciPy

Custom Processing Pipeline

import itk
import numpy as np
from scipy import ndimage

def hybrid_processing(input_file, output_file):
    """
    Combine ITK and NumPy/SciPy processing.
    """
    # Load with ITK
    image = itk.imread(input_file, itk.F)
    
    # ITK preprocessing
    smoothed = itk.discrete_gaussian_image_filter(image, variance=1.0)
    
    # Convert to NumPy for custom processing
    array = itk.array_from_image(smoothed)
    
    # NumPy/SciPy operations
    # 1. Normalize
    array = (array - array.min()) / (array.max() - array.min())
    
    # 2. Apply custom filter
    filtered = ndimage.gaussian_filter(array, sigma=2.0)
    
    # 3. Threshold
    binary = filtered > 0.5
    
    # 4. Label connected components
    labeled, num_features = ndimage.label(binary)
    
    print(f"Found {num_features} connected components")
    
    # Convert back to ITK
    result = itk.image_from_array(labeled.astype(np.uint16))
    
    # Preserve metadata
    result.SetSpacing(image.GetSpacing())
    result.SetOrigin(image.GetOrigin())
    result.SetDirection(image.GetDirection())
    
    # ITK post-processing
    # Relabel by size
    relabeled = itk.relabel_component_image_filter(
        result,
        minimum_object_size=100,
        sort_by_object_size=True
    )
    
    # Save
    itk.imwrite(relabeled, output_file)
    
    return relabeled

# Usage
result = hybrid_processing('input.nii.gz', 'labeled.nii.gz')

Performance Optimization

Large Image Processing

import itk

def process_large_image(input_file, output_file):
    """
    Efficiently process large images using views and streaming.
    """
    # Enable multi-threading
    itk.set_nthreads(0)  # Use all cores
    
    # Enable progress reporting
    itk.auto_progress(True)
    
    print("Loading large image...")
    image = itk.imread(input_file, itk.F)
    
    print(f"Image size: {itk.size(image)}")
    print(f"Memory size: ~{np.prod(itk.size(image)) * 4 / 1e9:.2f} GB")
    
    # Use pipeline to avoid intermediate copies
    print("Processing...")
    
    # Build pipeline (no execution yet)
    smoothed = itk.median_image_filter(image, radius=2)
    gradient = itk.gradient_magnitude_image_filter(smoothed)
    thresholded = itk.binary_threshold_image_filter(
        gradient,
        lower_threshold=10.0,
        upper_threshold=255.0
    )
    
    # Execute entire pipeline and write directly
    print("Writing result...")
    itk.imwrite(thresholded, output_file, compression=True)
    
    print("Processing complete!")

# Usage
process_large_image('large_volume.nii.gz', 'processed.nii.gz')

Parallel Processing

import itk
from multiprocessing import Pool
from pathlib import Path

def process_single_slice(args):
    """Process a single 2D slice."""
    slice_file, output_dir = args
    
    try:
        # Load slice
        image = itk.imread(str(slice_file), itk.F)
        
        # Process
        smoothed = itk.median_image_filter(image, radius=2)
        edges = itk.canny_edge_detection_image_filter(smoothed)
        
        # Save
        output_file = Path(output_dir) / slice_file.name
        itk.imwrite(edges, str(output_file))
        
        return True
    except Exception as e:
        print(f"Error processing {slice_file}: {e}")
        return False

def parallel_slice_processing(input_dir, output_dir, num_workers=4):
    """
    Process slices in parallel using multiprocessing.
    """
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)
    
    # Get all slice files
    slice_files = list(input_path.glob('slice_*.png'))
    args = [(f, output_dir) for f in slice_files]
    
    print(f"Processing {len(slice_files)} slices with {num_workers} workers")
    
    # Process in parallel
    with Pool(num_workers) as pool:
        results = pool.map(process_single_slice, args)
    
    success_count = sum(results)
    print(f"Successfully processed {success_count}/{len(slice_files)} slices")

# Usage
parallel_slice_processing('input_slices/', 'output_slices/', num_workers=8)

Quality Control

Image Quality Assessment

import itk
import numpy as np

def assess_image_quality(image_file):
    """
    Compute quality metrics for an image.
    """
    image = itk.imread(image_file, itk.F)
    array = itk.array_from_image(image)
    
    # Compute statistics
    stats = {
        'mean': float(np.mean(array)),
        'std': float(np.std(array)),
        'min': float(np.min(array)),
        'max': float(np.max(array)),
        'median': float(np.median(array)),
    }
    
    # Compute SNR (simplified)
    signal = np.mean(array)
    noise = np.std(array)
    stats['snr'] = float(signal / noise) if noise > 0 else float('inf')
    
    # Compute contrast
    stats['contrast'] = float(np.max(array) - np.min(array))
    
    # Check for artifacts (very simple check)
    # Look for extreme outliers
    q1, q99 = np.percentile(array, [1, 99])
    outliers = np.sum((array < q1) | (array > q99))
    stats['outlier_percentage'] = float(100 * outliers / array.size)
    
    return stats

# Usage
quality = assess_image_quality('scan.nii.gz')
print("Image Quality Metrics:")
for metric, value in quality.items():
    print(f"  {metric}: {value:.2f}")

Next Steps

  • See Edge Cases Guide for handling unusual scenarios
  • Review Reference Documentation for detailed API information
  • Check Quick Start Guide for basics

Install with Tessl CLI

npx tessl i tessl/pypi-itk

docs

examples

edge-cases.md

real-world-scenarios.md

index.md

tile.json