tessl install tessl/pypi-dcm2niix@1.0.5Command-line application that converts medical imaging data from DICOM format to NIfTI format with BIDS support
Comprehensive examples for common dcm2niix usage patterns in production environments.
from dcm2niix import main
import subprocess
from pathlib import Path
def convert_with_error_handling(input_dir, output_dir):
"""
Robust conversion with comprehensive error handling.
Returns:
tuple: (success: bool, message: str, exit_code: int)
"""
# Validate input
input_path = Path(input_dir)
if not input_path.exists():
return False, f"Input directory does not exist: {input_dir}", 5
# Create output directory
output_path = Path(output_dir)
try:
output_path.mkdir(parents=True, exist_ok=True)
except PermissionError:
return False, f"Cannot create output directory: {output_dir}", 6
# Run conversion with timeout
try:
exit_code = main([
"-z", "y", # Compress output
"-b", "y", # Generate BIDS JSON
"-ba", "y", # Anonymize
"-o", str(output_path),
str(input_path)
], capture_output=True, text=True, timeout=300)
# Interpret exit code
if exit_code == 0:
return True, "Conversion successful", exit_code
elif exit_code == 2:
return False, "No valid DICOM files found", exit_code
elif exit_code == 4:
return False, "Corrupt DICOM file detected", exit_code
elif exit_code == 8:
return True, "Partial success: some files converted", exit_code
elif exit_code == 10:
return True, "Conversion completed with incomplete volume warning", exit_code
else:
return False, f"Conversion failed with exit code {exit_code}", exit_code
except subprocess.TimeoutExpired:
return False, "Conversion timed out after 300 seconds", -1
except Exception as e:
return False, f"Unexpected error: {str(e)}", -1
# Usage
success, message, code = convert_with_error_handling("/data/dicom", "/data/nifti")
print(f"{'✓' if success else '✗'} {message} (exit code: {code})")from dcm2niix import main
import time
def convert_with_retry(input_dir, output_dir, max_retries=3):
"""Convert with automatic retry on transient failures."""
for attempt in range(max_retries):
exit_code = main([
"-z", "y",
"-o", output_dir,
input_dir
])
# Success or partial success
if exit_code in [0, 8]:
return True, exit_code
# Permanent failures - don't retry
if exit_code in [2, 5, 6]:
return False, exit_code
# Transient failures - retry with backoff
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
return False, exit_code
# Usage
success, code = convert_with_retry("/data/dicom", "/data/nifti")from dcm2niix import main
from pathlib import Path
from typing import Dict, List
def batch_convert_subjects(
input_root: str,
output_root: str,
subject_ids: List[str],
compress: bool = True,
bids: bool = True
) -> Dict[str, dict]:
"""
Convert multiple subjects with consistent settings.
Args:
input_root: Root directory containing subject folders
output_root: Root directory for outputs
subject_ids: List of subject identifiers
compress: Enable gzip compression
bids: Generate BIDS JSON sidecars
Returns:
Dictionary mapping subject_id to result dict
"""
results = {}
for subject_id in subject_ids:
input_dir = Path(input_root) / subject_id
output_dir = Path(output_root) / subject_id
# Skip if input doesn't exist
if not input_dir.exists():
results[subject_id] = {
"success": False,
"exit_code": 5,
"message": "Input directory not found"
}
continue
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
# Build arguments
args = []
if compress:
args.extend(["-z", "y"])
if bids:
args.extend(["-b", "y", "-ba", "y"])
args.extend([
"-f", f"sub-{subject_id}_%p_%s",
"-o", str(output_dir),
str(input_dir)
])
# Run conversion
exit_code = main(args)
results[subject_id] = {
"success": exit_code in [0, 8],
"exit_code": exit_code,
"output_dir": str(output_dir)
}
return results
# Usage
subjects = ["001", "002", "003", "004", "005"]
results = batch_convert_subjects("/data/dicom", "/data/nifti", subjects)
# Report results
successful = sum(1 for r in results.values() if r["success"])
print(f"Converted {successful}/{len(subjects)} subjects successfully")
for subject_id, result in results.items():
status = "✓" if result["success"] else "✗"
print(f"{status} {subject_id}: {result['exit_code']}")from dcm2niix import main
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import List, Dict
def convert_single_subject(subject_dir: str, output_root: str) -> tuple:
"""Convert single subject (for parallel execution)."""
subject_path = Path(subject_dir)
subject_name = subject_path.name
output_dir = Path(output_root) / subject_name
output_dir.mkdir(parents=True, exist_ok=True)
exit_code = main([
"-z", "y",
"-b", "y",
"-ba", "y",
"-f", f"sub-{subject_name}_%p_%s",
"-o", str(output_dir),
str(subject_path)
])
return subject_name, exit_code
def parallel_batch_convert(
subject_dirs: List[str],
output_root: str,
max_workers: int = 4
) -> Dict[str, int]:
"""
Convert multiple subjects in parallel.
Args:
subject_dirs: List of subject directory paths
output_root: Root directory for outputs
max_workers: Maximum parallel processes
Returns:
Dictionary mapping subject name to exit code
"""
results = {}
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all jobs
futures = {
executor.submit(convert_single_subject, subj_dir, output_root): subj_dir
for subj_dir in subject_dirs
}
# Collect results as they complete
for future in as_completed(futures):
try:
subject_name, exit_code = future.result()
results[subject_name] = exit_code
status = "✓" if exit_code in [0, 8] else "✗"
print(f"{status} {subject_name}: exit code {exit_code}")
except Exception as e:
subject_dir = futures[future]
print(f"✗ {Path(subject_dir).name}: {e}")
results[Path(subject_dir).name] = -1
return results
# Usage
subject_directories = [
"/data/subjects/sub-001",
"/data/subjects/sub-002",
"/data/subjects/sub-003",
"/data/subjects/sub-004"
]
results = parallel_batch_convert(subject_directories, "/data/nifti", max_workers=4)
# Summary
successful = sum(1 for code in results.values() if code in [0, 8])
print(f"\nCompleted: {successful}/{len(results)} successful")from dcm2niix import main
from pathlib import Path
import shutil
import tempfile
def safe_convert(input_dir: str, output_dir: str, **options) -> tuple:
"""
Convert DICOM with automatic rollback on failure.
Converts to temporary directory first, then moves to final location
only on success. This prevents partial/corrupted outputs.
Args:
input_dir: Input DICOM directory
output_dir: Final output directory
**options: Additional dcm2niix options (e.g., z="y", b="y")
Returns:
tuple: (success: bool, message: str)
"""
# Create temporary output directory
with tempfile.TemporaryDirectory() as temp_dir:
# Build arguments
args = ["-o", temp_dir, input_dir]
for key, value in options.items():
if value is True:
args.extend([f"-{key}", "y"])
elif value is False:
args.extend([f"-{key}", "n"])
elif value is not None:
args.extend([f"-{key}", str(value)])
# Run conversion
exit_code = main(args)
# Check for success
if exit_code in [0, 8]:
# Move outputs to final destination
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Move all files
for file in Path(temp_dir).iterdir():
dest = output_path / file.name
shutil.move(str(file), str(dest))
return True, f"Conversion successful (exit code {exit_code})"
else:
# Temporary directory automatically cleaned up
return False, f"Conversion failed with exit code {exit_code}"
# Usage
success, message = safe_convert(
"/data/dicom/sub-001",
"/data/nifti/sub-001",
z="y", # Compression
b="y", # BIDS JSON
ba="y" # Anonymize
)
print(message)from dcm2niix import bin
import subprocess
import re
def get_available_series(dicom_dir: str) -> list:
"""
List available series with CRC values for selection.
Returns:
List of dicts with 'name', 'crc', and 'file_count' keys
"""
result = subprocess.run(
[bin, "-q", "l", dicom_dir],
capture_output=True,
text=True
)
series_info = []
for line in result.stdout.split('\n'):
# Parse: "Series 001: T1_MPRAGE [52301] - 176 files"
match = re.search(r'Series \d+: (.+?) \[(\d+)\] - (\d+) files', line)
if match:
series_info.append({
'name': match.group(1),
'crc': match.group(2),
'file_count': int(match.group(3))
})
return series_info
def convert_selected_series(dicom_dir: str, output_dir: str, series_crcs: list) -> bool:
"""Convert only specified series by CRC."""
from dcm2niix import main
args = ["-z", "y", "-b", "y", "-o", output_dir]
for crc in series_crcs:
args.extend(["-n", str(crc)])
args.append(dicom_dir)
exit_code = main(args)
return exit_code in [0, 8]
# Usage
dicom_folder = "/data/exam/patient001"
# Step 1: List available series
series_list = get_available_series(dicom_folder)
print("Available series:")
for i, series in enumerate(series_list, 1):
print(f"{i}. {series['name']} (CRC: {series['crc']}, {series['file_count']} files)")
# Step 2: Select desired series (e.g., T1 and T2)
selected_crcs = [s['crc'] for s in series_list if 'T1' in s['name'] or 'T2' in s['name']]
# Step 3: Convert selected series
success = convert_selected_series(dicom_folder, "/data/nifti/patient001", selected_crcs)
print(f"Conversion {'successful' if success else 'failed'}")from dcm2niix import main
def convert_by_modality(input_dir: str, output_dir: str, ignore_derived: bool = True):
"""Convert with filtering for clean primary acquisitions."""
exit_code = main([
"-i", "y" if ignore_derived else "n", # Ignore derived/localizer
"-z", "y", # Compress
"-b", "y", # BIDS JSON
"-o", output_dir,
input_dir
])
return exit_code in [0, 8]
# Usage - only primary acquisitions
success = convert_by_modality("/data/mixed/exam", "/data/clean/exam", ignore_derived=True)from dcm2niix import main
from pathlib import Path
def fast_batch_convert(input_root: str, output_root: str, subjects: list):
"""
Optimized batch conversion for organized PACS data.
Uses:
- Adjacent DICOMs mode (assumes series in same folder)
- Shallow search depth
- Skip existing files
- Minimal output
"""
results = {}
for subject_id in subjects:
input_dir = Path(input_root) / subject_id
output_dir = Path(output_root) / subject_id
output_dir.mkdir(parents=True, exist_ok=True)
exit_code = main([
"-z", "y", # Compress with pigz
"-a", "y", # Adjacent DICOMs (faster)
"-d", "1", # Shallow search
"-i", "y", # Ignore derived
"-w", "0", # Skip existing files
"-v", "0", # Minimal output
"-o", str(output_dir),
str(input_dir)
])
results[subject_id] = exit_code in [0, 8]
return results
# Usage
subjects = [f"sub-{i:03d}" for i in range(1, 101)] # 100 subjects
results = fast_batch_convert("/pacs/export", "/data/nifti", subjects)
print(f"Processed {sum(results.values())}/{len(results)} subjects")from dcm2niix import main
from pathlib import Path
def chunked_batch_process(all_subjects: list, input_root: str, output_root: str, chunk_size: int = 10):
"""
Process large batches in chunks to manage memory.
Useful for very large datasets where processing all subjects
at once might cause memory issues.
"""
total_subjects = len(all_subjects)
results = {}
for i in range(0, total_subjects, chunk_size):
chunk = all_subjects[i:i+chunk_size]
chunk_num = i // chunk_size + 1
total_chunks = (total_subjects + chunk_size - 1) // chunk_size
print(f"Processing chunk {chunk_num}/{total_chunks} ({len(chunk)} subjects)...")
for subject_id in chunk:
input_dir = Path(input_root) / subject_id
output_dir = Path(output_root) / subject_id
output_dir.mkdir(parents=True, exist_ok=True)
exit_code = main([
"-z", "y",
"-b", "y",
"-o", str(output_dir),
str(input_dir)
])
results[subject_id] = exit_code in [0, 8]
# Progress update
completed = i + len(chunk)
print(f"Completed {completed}/{total_subjects} subjects")
return results
# Usage for 1000 subjects
all_subjects = [f"sub-{i:04d}" for i in range(1, 1001)]
results = chunked_batch_process(all_subjects, "/data/dicom", "/data/nifti", chunk_size=50)from dcm2niix import main
from pathlib import Path
import logging
import json
from datetime import datetime
class DicomConverter:
"""Production-grade DICOM to NIfTI converter."""
def __init__(self, input_root: str, output_root: str, log_file: str = None):
self.input_root = Path(input_root)
self.output_root = Path(output_root)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file) if log_file else logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def validate_input(self, subject_id: str) -> bool:
"""Validate input directory exists and contains files."""
input_dir = self.input_root / subject_id
if not input_dir.exists():
self.logger.error(f"Input directory not found: {input_dir}")
return False
if not any(input_dir.iterdir()):
self.logger.error(f"Input directory is empty: {input_dir}")
return False
return True
def convert_subject(self, subject_id: str, **options) -> dict:
"""
Convert single subject with full error handling and logging.
Returns:
dict: Result with success, exit_code, message, output_dir
"""
start_time = datetime.now()
# Validate input
if not self.validate_input(subject_id):
return {
"success": False,
"exit_code": 5,
"message": "Input validation failed",
"duration": 0
}
# Setup paths
input_dir = self.input_root / subject_id
output_dir = self.output_root / subject_id
output_dir.mkdir(parents=True, exist_ok=True)
# Build arguments
args = [
"-z", "y",
"-b", "y",
"-ba", "y",
"-f", f"sub-{subject_id}_%p_%s",
"-o", str(output_dir),
str(input_dir)
]
# Add custom options
for key, value in options.items():
args.extend([f"-{key}", str(value)])
# Run conversion
self.logger.info(f"Converting {subject_id}...")
try:
exit_code = main(args, timeout=600)
duration = (datetime.now() - start_time).total_seconds()
if exit_code in [0, 8]:
self.logger.info(f"✓ {subject_id} completed in {duration:.1f}s")
return {
"success": True,
"exit_code": exit_code,
"message": "Conversion successful",
"output_dir": str(output_dir),
"duration": duration
}
else:
self.logger.error(f"✗ {subject_id} failed with exit code {exit_code}")
return {
"success": False,
"exit_code": exit_code,
"message": f"Conversion failed (exit code {exit_code})",
"duration": duration
}
except Exception as e:
duration = (datetime.now() - start_time).total_seconds()
self.logger.exception(f"✗ {subject_id} exception: {e}")
return {
"success": False,
"exit_code": -1,
"message": f"Exception: {str(e)}",
"duration": duration
}
def batch_convert(self, subject_ids: list, report_file: str = None) -> dict:
"""
Convert multiple subjects and generate report.
Args:
subject_ids: List of subject identifiers
report_file: Optional JSON report output path
Returns:
dict: Summary statistics and results
"""
results = {}
for i, subject_id in enumerate(subject_ids, 1):
self.logger.info(f"Processing {i}/{len(subject_ids)}: {subject_id}")
results[subject_id] = self.convert_subject(subject_id)
# Generate summary
successful = sum(1 for r in results.values() if r["success"])
total_duration = sum(r["duration"] for r in results.values())
summary = {
"total_subjects": len(subject_ids),
"successful": successful,
"failed": len(subject_ids) - successful,
"success_rate": successful / len(subject_ids) * 100,
"total_duration": total_duration,
"average_duration": total_duration / len(subject_ids),
"results": results
}
# Save report
if report_file:
with open(report_file, 'w') as f:
json.dump(summary, f, indent=2)
self.logger.info(f"Report saved to {report_file}")
# Log summary
self.logger.info(f"Batch complete: {successful}/{len(subject_ids)} successful ({summary['success_rate']:.1f}%)")
self.logger.info(f"Total time: {total_duration:.1f}s, Average: {summary['average_duration']:.1f}s per subject")
return summary
# Usage
converter = DicomConverter(
input_root="/data/dicom",
output_root="/data/nifti",
log_file="/logs/conversion.log"
)
subjects = [f"sub-{i:03d}" for i in range(1, 51)]
summary = converter.batch_convert(subjects, report_file="/logs/conversion_report.json")
print(f"Conversion complete: {summary['successful']}/{summary['total_subjects']} successful")