Insight Toolkit for N-dimensional image processing, segmentation, and registration in medical and scientific applications
—
Complete, production-ready examples for common image processing tasks.
import itk
import numpy as np
def process_dicom_series(dicom_dir, output_file):
"""
Read DICOM series, apply preprocessing, and save result.
"""
try:
# Read DICOM series
print(f"Reading DICOM series from {dicom_dir}")
image = itk.imread(dicom_dir, itk.SS) # Signed short for CT/MRI
print(f"Image size: {itk.size(image)}")
print(f"Spacing: {itk.spacing(image)}")
# Preprocessing pipeline
print("Applying preprocessing...")
# 1. Smoothing to reduce noise
smoothed = itk.discrete_gaussian_image_filter(
image,
variance=1.0
)
# 2. Resample to isotropic spacing
original_spacing = itk.spacing(smoothed)
target_spacing = [1.0, 1.0, 1.0]
original_size = itk.size(smoothed)
new_size = [
int(original_size[i] * original_spacing[i] / target_spacing[i])
for i in range(3)
]
resampled = itk.resample_image_filter(
smoothed,
size=new_size,
output_spacing=target_spacing
)
# Save result
print(f"Saving to {output_file}")
itk.imwrite(resampled, output_file, compression=True)
print("Processing complete!")
return resampled
except RuntimeError as e:
print(f"Error processing DICOM: {e}")
return None
# Usage
result = process_dicom_series('/path/to/dicom/', 'processed.nii.gz')import itk
def segment_organ(image_file, seed_points, output_file):
"""
Segment organ using region growing with preprocessing.
"""
# Load image
image = itk.imread(image_file, itk.F)
# Preprocessing
smoothed = itk.curvature_anisotropic_diffusion_image_filter(
image,
number_of_iterations=5,
time_step=0.0625,
conductance_parameter=3.0
)
# Region growing segmentation
ImageType = itk.Image[itk.F, 3]
OutputType = itk.Image[itk.UC, 3]
segmenter = itk.ConfidenceConnectedImageFilter[ImageType, OutputType].New()
segmenter.SetInput(smoothed)
# Add seed points
for x, y, z in seed_points:
seed = itk.Index[3]()
seed[0], seed[1], seed[2] = x, y, z
segmenter.AddSeed(seed)
segmenter.SetMultiplier(2.5)
segmenter.SetNumberOfIterations(5)
segmenter.SetReplaceValue(255)
segmenter.Update()
segmented = segmenter.GetOutput()
# Post-processing: morphological closing to fill holes
closed = itk.binary_morphological_closing_image_filter(
segmented,
radius=3,
foreground_value=255
)
# Save
itk.imwrite(closed, output_file, compression=True)
return closed
# Usage
seeds = [(120, 150, 80), (125, 155, 82)] # Multiple seed points
result = segment_organ('ct_scan.nii.gz', seeds, 'liver_segmentation.nii.gz')import itk
def register_multimodal(fixed_file, moving_file, output_file):
"""
Register two images from different modalities using mutual information.
"""
# Load images
fixed = itk.imread(fixed_file, itk.F)
moving = itk.imread(moving_file, itk.F)
print(f"Fixed image size: {itk.size(fixed)}")
print(f"Moving image size: {itk.size(moving)}")
# Setup registration
ImageType = itk.Image[itk.F, 3]
TransformType = itk.VersorRigid3DTransform[itk.D]
registration = itk.ImageRegistrationMethodv4[ImageType, ImageType].New()
registration.SetFixedImage(fixed)
registration.SetMovingImage(moving)
# Initialize transform with image centers
transform = TransformType.New()
initializer = itk.CenteredTransformInitializer[
TransformType, ImageType, ImageType
].New()
initializer.SetTransform(transform)
initializer.SetFixedImage(fixed)
initializer.SetMovingImage(moving)
initializer.GeometryOn()
initializer.InitializeTransform()
registration.SetInitialTransform(transform)
# Setup metric (mutual information for multi-modal)
metric = itk.MattesMutualInformationImageToImageMetricv4[
ImageType, ImageType
].New()
metric.SetNumberOfHistogramBins(50)
registration.SetMetric(metric)
# Setup optimizer
optimizer = itk.RegularStepGradientDescentOptimizerv4Template[itk.D].New()
optimizer.SetLearningRate(1.0)
optimizer.SetMinimumStepLength(0.001)
optimizer.SetNumberOfIterations(200)
registration.SetOptimizer(optimizer)
# Multi-resolution
registration.SetNumberOfLevels(3)
registration.SetSmoothingSigmasPerLevel([2.0, 1.0, 0.0])
registration.SetShrinkFactorsPerLevel([4, 2, 1])
# Run registration
print("Running registration...")
registration.Update()
final_transform = registration.GetTransform()
final_metric = registration.GetMetricValue()
print(f"Final metric value: {final_metric}")
print(f"Optimizer stop condition: {registration.GetOptimizerStopConditionDescription()}")
# Apply transform to moving image
resampled = itk.resample_image_filter(
moving,
transform=final_transform,
size=itk.size(fixed),
output_spacing=itk.spacing(fixed),
output_origin=itk.origin(fixed),
output_direction=fixed.GetDirection()
)
# Save registered image
itk.imwrite(resampled, output_file, compression=True)
# Save transform
itk.transformwrite(final_transform, output_file.replace('.nii.gz', '_transform.h5'))
return resampled, final_transform
# Usage
registered, transform = register_multimodal(
'mri_t1.nii.gz',
'ct_scan.nii.gz',
'ct_registered_to_mri.nii.gz'
)import itk
def deformable_registration(fixed_file, moving_file, output_file):
"""
Non-rigid registration using Demons algorithm.
"""
# Load images
fixed = itk.imread(fixed_file, itk.F)
moving = itk.imread(moving_file, itk.F)
# Setup Demons registration
ImageType = itk.Image[itk.F, 3]
VectorType = itk.Vector[itk.F, 3]
DisplacementFieldType = itk.Image[VectorType, 3]
demons = itk.DiffeomorphicDemonsRegistrationFilter[
ImageType, ImageType, DisplacementFieldType
].New()
demons.SetFixedImage(fixed)
demons.SetMovingImage(moving)
demons.SetNumberOfIterations(50)
demons.SetStandardDeviations(1.0)
print("Running deformable registration...")
demons.Update()
displacement_field = demons.GetDisplacementField()
# Warp moving image
warped = itk.warp_image_filter(moving, displacement_field)
# Save results
itk.imwrite(warped, output_file, compression=True)
itk.imwrite(
displacement_field,
output_file.replace('.nii.gz', '_field.nii.gz'),
compression=True
)
return warped, displacement_field
# Usage
warped, field = deformable_registration(
'fixed.nii.gz',
'moving.nii.gz',
'warped.nii.gz'
)import itk
from pathlib import Path
def batch_process_images(input_dir, output_dir, operation='smooth'):
"""
Process all images in a directory.
"""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Find all image files
image_files = list(input_path.glob('*.png')) + \
list(input_path.glob('*.jpg')) + \
list(input_path.glob('*.nii.gz'))
print(f"Found {len(image_files)} images to process")
for i, image_file in enumerate(image_files, 1):
print(f"Processing {i}/{len(image_files)}: {image_file.name}")
try:
# Load image
image = itk.imread(str(image_file), itk.F)
# Apply operation
if operation == 'smooth':
result = itk.median_image_filter(image, radius=2)
elif operation == 'edges':
result = itk.canny_edge_detection_image_filter(
image, variance=2.0
)
elif operation == 'threshold':
result = itk.otsu_threshold_image_filter(image)
else:
result = image
# Save result
output_file = output_path / image_file.name
itk.imwrite(result, str(output_file))
except Exception as e:
print(f" Error processing {image_file.name}: {e}")
continue
print("Batch processing complete!")
# Usage
batch_process_images('input_images/', 'output_images/', operation='smooth')import itk
import numpy as np
from scipy import ndimage
def hybrid_processing(input_file, output_file):
"""
Combine ITK and NumPy/SciPy processing.
"""
# Load with ITK
image = itk.imread(input_file, itk.F)
# ITK preprocessing
smoothed = itk.discrete_gaussian_image_filter(image, variance=1.0)
# Convert to NumPy for custom processing
array = itk.array_from_image(smoothed)
# NumPy/SciPy operations
# 1. Normalize
array = (array - array.min()) / (array.max() - array.min())
# 2. Apply custom filter
filtered = ndimage.gaussian_filter(array, sigma=2.0)
# 3. Threshold
binary = filtered > 0.5
# 4. Label connected components
labeled, num_features = ndimage.label(binary)
print(f"Found {num_features} connected components")
# Convert back to ITK
result = itk.image_from_array(labeled.astype(np.uint16))
# Preserve metadata
result.SetSpacing(image.GetSpacing())
result.SetOrigin(image.GetOrigin())
result.SetDirection(image.GetDirection())
# ITK post-processing
# Relabel by size
relabeled = itk.relabel_component_image_filter(
result,
minimum_object_size=100,
sort_by_object_size=True
)
# Save
itk.imwrite(relabeled, output_file)
return relabeled
# Usage
result = hybrid_processing('input.nii.gz', 'labeled.nii.gz')import itk
def process_large_image(input_file, output_file):
"""
Efficiently process large images using views and streaming.
"""
# Enable multi-threading
itk.set_nthreads(0) # Use all cores
# Enable progress reporting
itk.auto_progress(True)
print("Loading large image...")
image = itk.imread(input_file, itk.F)
print(f"Image size: {itk.size(image)}")
print(f"Memory size: ~{np.prod(itk.size(image)) * 4 / 1e9:.2f} GB")
# Use pipeline to avoid intermediate copies
print("Processing...")
# Build pipeline (no execution yet)
smoothed = itk.median_image_filter(image, radius=2)
gradient = itk.gradient_magnitude_image_filter(smoothed)
thresholded = itk.binary_threshold_image_filter(
gradient,
lower_threshold=10.0,
upper_threshold=255.0
)
# Execute entire pipeline and write directly
print("Writing result...")
itk.imwrite(thresholded, output_file, compression=True)
print("Processing complete!")
# Usage
process_large_image('large_volume.nii.gz', 'processed.nii.gz')import itk
from multiprocessing import Pool
from pathlib import Path
def process_single_slice(args):
"""Process a single 2D slice."""
slice_file, output_dir = args
try:
# Load slice
image = itk.imread(str(slice_file), itk.F)
# Process
smoothed = itk.median_image_filter(image, radius=2)
edges = itk.canny_edge_detection_image_filter(smoothed)
# Save
output_file = Path(output_dir) / slice_file.name
itk.imwrite(edges, str(output_file))
return True
except Exception as e:
print(f"Error processing {slice_file}: {e}")
return False
def parallel_slice_processing(input_dir, output_dir, num_workers=4):
"""
Process slices in parallel using multiprocessing.
"""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Get all slice files
slice_files = list(input_path.glob('slice_*.png'))
args = [(f, output_dir) for f in slice_files]
print(f"Processing {len(slice_files)} slices with {num_workers} workers")
# Process in parallel
with Pool(num_workers) as pool:
results = pool.map(process_single_slice, args)
success_count = sum(results)
print(f"Successfully processed {success_count}/{len(slice_files)} slices")
# Usage
parallel_slice_processing('input_slices/', 'output_slices/', num_workers=8)import itk
import numpy as np
def assess_image_quality(image_file):
"""
Compute quality metrics for an image.
"""
image = itk.imread(image_file, itk.F)
array = itk.array_from_image(image)
# Compute statistics
stats = {
'mean': float(np.mean(array)),
'std': float(np.std(array)),
'min': float(np.min(array)),
'max': float(np.max(array)),
'median': float(np.median(array)),
}
# Compute SNR (simplified)
signal = np.mean(array)
noise = np.std(array)
stats['snr'] = float(signal / noise) if noise > 0 else float('inf')
# Compute contrast
stats['contrast'] = float(np.max(array) - np.min(array))
# Check for artifacts (very simple check)
# Look for extreme outliers
q1, q99 = np.percentile(array, [1, 99])
outliers = np.sum((array < q1) | (array > q99))
stats['outlier_percentage'] = float(100 * outliers / array.size)
return stats
# Usage
quality = assess_image_quality('scan.nii.gz')
print("Image Quality Metrics:")
for metric, value in quality.items():
print(f" {metric}: {value:.2f}")Install with Tessl CLI
npx tessl i tessl/pypi-itk@5.4.1docs
guides
reference