or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dcm2niix@1.0.x

docs

examples

edge-cases.mdreal-world-scenarios.md
index.md
tile.json

tessl/pypi-dcm2niix

tessl install tessl/pypi-dcm2niix@1.0.5

Command-line application that converts medical imaging data from DICOM format to NIfTI format with BIDS support

real-world-scenarios.mddocs/examples/

Real-World Scenarios

Comprehensive examples for common dcm2niix usage patterns in production environments.

Error Handling Patterns

Comprehensive Error Handler

from dcm2niix import main
import subprocess
from pathlib import Path

def convert_with_error_handling(input_dir, output_dir):
    """
    Robust conversion with comprehensive error handling.
    
    Returns:
        tuple: (success: bool, message: str, exit_code: int)
    """
    # Validate input
    input_path = Path(input_dir)
    if not input_path.exists():
        return False, f"Input directory does not exist: {input_dir}", 5
    
    # Create output directory
    output_path = Path(output_dir)
    try:
        output_path.mkdir(parents=True, exist_ok=True)
    except PermissionError:
        return False, f"Cannot create output directory: {output_dir}", 6
    
    # Run conversion with timeout
    try:
        exit_code = main([
            "-z", "y",              # Compress output
            "-b", "y",              # Generate BIDS JSON
            "-ba", "y",             # Anonymize
            "-o", str(output_path),
            str(input_path)
        ], capture_output=True, text=True, timeout=300)
        
        # Interpret exit code
        if exit_code == 0:
            return True, "Conversion successful", exit_code
        elif exit_code == 2:
            return False, "No valid DICOM files found", exit_code
        elif exit_code == 4:
            return False, "Corrupt DICOM file detected", exit_code
        elif exit_code == 8:
            return True, "Partial success: some files converted", exit_code
        elif exit_code == 10:
            return True, "Conversion completed with incomplete volume warning", exit_code
        else:
            return False, f"Conversion failed with exit code {exit_code}", exit_code
            
    except subprocess.TimeoutExpired:
        return False, "Conversion timed out after 300 seconds", -1
    except Exception as e:
        return False, f"Unexpected error: {str(e)}", -1

# Usage
success, message, code = convert_with_error_handling("/data/dicom", "/data/nifti")
print(f"{'✓' if success else '✗'} {message} (exit code: {code})")

Retry Logic with Exponential Backoff

from dcm2niix import main
import time

def convert_with_retry(input_dir, output_dir, max_retries=3):
    """Convert with automatic retry on transient failures."""
    
    for attempt in range(max_retries):
        exit_code = main([
            "-z", "y",
            "-o", output_dir,
            input_dir
        ])
        
        # Success or partial success
        if exit_code in [0, 8]:
            return True, exit_code
        
        # Permanent failures - don't retry
        if exit_code in [2, 5, 6]:
            return False, exit_code
        
        # Transient failures - retry with backoff
        if attempt < max_retries - 1:
            wait_time = 2 ** attempt  # 1s, 2s, 4s
            print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
            time.sleep(wait_time)
    
    return False, exit_code

# Usage
success, code = convert_with_retry("/data/dicom", "/data/nifti")

Batch Processing Patterns

Multi-Subject Processing

from dcm2niix import main
from pathlib import Path
from typing import Dict, List

def batch_convert_subjects(
    input_root: str,
    output_root: str,
    subject_ids: List[str],
    compress: bool = True,
    bids: bool = True
) -> Dict[str, dict]:
    """
    Convert multiple subjects with consistent settings.
    
    Args:
        input_root: Root directory containing subject folders
        output_root: Root directory for outputs
        subject_ids: List of subject identifiers
        compress: Enable gzip compression
        bids: Generate BIDS JSON sidecars
        
    Returns:
        Dictionary mapping subject_id to result dict
    """
    results = {}
    
    for subject_id in subject_ids:
        input_dir = Path(input_root) / subject_id
        output_dir = Path(output_root) / subject_id
        
        # Skip if input doesn't exist
        if not input_dir.exists():
            results[subject_id] = {
                "success": False,
                "exit_code": 5,
                "message": "Input directory not found"
            }
            continue
        
        # Create output directory
        output_dir.mkdir(parents=True, exist_ok=True)
        
        # Build arguments
        args = []
        if compress:
            args.extend(["-z", "y"])
        if bids:
            args.extend(["-b", "y", "-ba", "y"])
        args.extend([
            "-f", f"sub-{subject_id}_%p_%s",
            "-o", str(output_dir),
            str(input_dir)
        ])
        
        # Run conversion
        exit_code = main(args)
        
        results[subject_id] = {
            "success": exit_code in [0, 8],
            "exit_code": exit_code,
            "output_dir": str(output_dir)
        }
    
    return results

# Usage
subjects = ["001", "002", "003", "004", "005"]
results = batch_convert_subjects("/data/dicom", "/data/nifti", subjects)

# Report results
successful = sum(1 for r in results.values() if r["success"])
print(f"Converted {successful}/{len(subjects)} subjects successfully")

for subject_id, result in results.items():
    status = "✓" if result["success"] else "✗"
    print(f"{status} {subject_id}: {result['exit_code']}")

Parallel Processing

from dcm2niix import main
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import List, Dict

def convert_single_subject(subject_dir: str, output_root: str) -> tuple:
    """Convert single subject (for parallel execution)."""
    subject_path = Path(subject_dir)
    subject_name = subject_path.name
    output_dir = Path(output_root) / subject_name
    output_dir.mkdir(parents=True, exist_ok=True)
    
    exit_code = main([
        "-z", "y",
        "-b", "y",
        "-ba", "y",
        "-f", f"sub-{subject_name}_%p_%s",
        "-o", str(output_dir),
        str(subject_path)
    ])
    
    return subject_name, exit_code

def parallel_batch_convert(
    subject_dirs: List[str],
    output_root: str,
    max_workers: int = 4
) -> Dict[str, int]:
    """
    Convert multiple subjects in parallel.
    
    Args:
        subject_dirs: List of subject directory paths
        output_root: Root directory for outputs
        max_workers: Maximum parallel processes
        
    Returns:
        Dictionary mapping subject name to exit code
    """
    results = {}
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit all jobs
        futures = {
            executor.submit(convert_single_subject, subj_dir, output_root): subj_dir
            for subj_dir in subject_dirs
        }
        
        # Collect results as they complete
        for future in as_completed(futures):
            try:
                subject_name, exit_code = future.result()
                results[subject_name] = exit_code
                status = "✓" if exit_code in [0, 8] else "✗"
                print(f"{status} {subject_name}: exit code {exit_code}")
            except Exception as e:
                subject_dir = futures[future]
                print(f"✗ {Path(subject_dir).name}: {e}")
                results[Path(subject_dir).name] = -1
    
    return results

# Usage
subject_directories = [
    "/data/subjects/sub-001",
    "/data/subjects/sub-002",
    "/data/subjects/sub-003",
    "/data/subjects/sub-004"
]

results = parallel_batch_convert(subject_directories, "/data/nifti", max_workers=4)

# Summary
successful = sum(1 for code in results.values() if code in [0, 8])
print(f"\nCompleted: {successful}/{len(results)} successful")

Safe Conversion with Rollback

from dcm2niix import main
from pathlib import Path
import shutil
import tempfile

def safe_convert(input_dir: str, output_dir: str, **options) -> tuple:
    """
    Convert DICOM with automatic rollback on failure.
    
    Converts to temporary directory first, then moves to final location
    only on success. This prevents partial/corrupted outputs.
    
    Args:
        input_dir: Input DICOM directory
        output_dir: Final output directory
        **options: Additional dcm2niix options (e.g., z="y", b="y")
        
    Returns:
        tuple: (success: bool, message: str)
    """
    # Create temporary output directory
    with tempfile.TemporaryDirectory() as temp_dir:
        # Build arguments
        args = ["-o", temp_dir, input_dir]
        
        for key, value in options.items():
            if value is True:
                args.extend([f"-{key}", "y"])
            elif value is False:
                args.extend([f"-{key}", "n"])
            elif value is not None:
                args.extend([f"-{key}", str(value)])
        
        # Run conversion
        exit_code = main(args)
        
        # Check for success
        if exit_code in [0, 8]:
            # Move outputs to final destination
            output_path = Path(output_dir)
            output_path.mkdir(parents=True, exist_ok=True)
            
            # Move all files
            for file in Path(temp_dir).iterdir():
                dest = output_path / file.name
                shutil.move(str(file), str(dest))
            
            return True, f"Conversion successful (exit code {exit_code})"
        else:
            # Temporary directory automatically cleaned up
            return False, f"Conversion failed with exit code {exit_code}"

# Usage
success, message = safe_convert(
    "/data/dicom/sub-001",
    "/data/nifti/sub-001",
    z="y",      # Compression
    b="y",      # BIDS JSON
    ba="y"      # Anonymize
)
print(message)

Series Selection Workflows

Interactive Series Selection

from dcm2niix import bin
import subprocess
import re

def get_available_series(dicom_dir: str) -> list:
    """
    List available series with CRC values for selection.
    
    Returns:
        List of dicts with 'name', 'crc', and 'file_count' keys
    """
    result = subprocess.run(
        [bin, "-q", "l", dicom_dir],
        capture_output=True,
        text=True
    )
    
    series_info = []
    for line in result.stdout.split('\n'):
        # Parse: "Series 001: T1_MPRAGE [52301] - 176 files"
        match = re.search(r'Series \d+: (.+?) \[(\d+)\] - (\d+) files', line)
        if match:
            series_info.append({
                'name': match.group(1),
                'crc': match.group(2),
                'file_count': int(match.group(3))
            })
    
    return series_info

def convert_selected_series(dicom_dir: str, output_dir: str, series_crcs: list) -> bool:
    """Convert only specified series by CRC."""
    from dcm2niix import main
    
    args = ["-z", "y", "-b", "y", "-o", output_dir]
    for crc in series_crcs:
        args.extend(["-n", str(crc)])
    args.append(dicom_dir)
    
    exit_code = main(args)
    return exit_code in [0, 8]

# Usage
dicom_folder = "/data/exam/patient001"

# Step 1: List available series
series_list = get_available_series(dicom_folder)
print("Available series:")
for i, series in enumerate(series_list, 1):
    print(f"{i}. {series['name']} (CRC: {series['crc']}, {series['file_count']} files)")

# Step 2: Select desired series (e.g., T1 and T2)
selected_crcs = [s['crc'] for s in series_list if 'T1' in s['name'] or 'T2' in s['name']]

# Step 3: Convert selected series
success = convert_selected_series(dicom_folder, "/data/nifti/patient001", selected_crcs)
print(f"Conversion {'successful' if success else 'failed'}")

Filter by Modality

from dcm2niix import main

def convert_by_modality(input_dir: str, output_dir: str, ignore_derived: bool = True):
    """Convert with filtering for clean primary acquisitions."""
    
    exit_code = main([
        "-i", "y" if ignore_derived else "n",  # Ignore derived/localizer
        "-z", "y",                              # Compress
        "-b", "y",                              # BIDS JSON
        "-o", output_dir,
        input_dir
    ])
    
    return exit_code in [0, 8]

# Usage - only primary acquisitions
success = convert_by_modality("/data/mixed/exam", "/data/clean/exam", ignore_derived=True)

Performance Optimization

Fast Batch Processing

from dcm2niix import main
from pathlib import Path

def fast_batch_convert(input_root: str, output_root: str, subjects: list):
    """
    Optimized batch conversion for organized PACS data.
    
    Uses:
    - Adjacent DICOMs mode (assumes series in same folder)
    - Shallow search depth
    - Skip existing files
    - Minimal output
    """
    results = {}
    
    for subject_id in subjects:
        input_dir = Path(input_root) / subject_id
        output_dir = Path(output_root) / subject_id
        output_dir.mkdir(parents=True, exist_ok=True)
        
        exit_code = main([
            "-z", "y",              # Compress with pigz
            "-a", "y",              # Adjacent DICOMs (faster)
            "-d", "1",              # Shallow search
            "-i", "y",              # Ignore derived
            "-w", "0",              # Skip existing files
            "-v", "0",              # Minimal output
            "-o", str(output_dir),
            str(input_dir)
        ])
        
        results[subject_id] = exit_code in [0, 8]
    
    return results

# Usage
subjects = [f"sub-{i:03d}" for i in range(1, 101)]  # 100 subjects
results = fast_batch_convert("/pacs/export", "/data/nifti", subjects)
print(f"Processed {sum(results.values())}/{len(results)} subjects")

Memory-Efficient Processing

from dcm2niix import main
from pathlib import Path

def chunked_batch_process(all_subjects: list, input_root: str, output_root: str, chunk_size: int = 10):
    """
    Process large batches in chunks to manage memory.
    
    Useful for very large datasets where processing all subjects
    at once might cause memory issues.
    """
    total_subjects = len(all_subjects)
    results = {}
    
    for i in range(0, total_subjects, chunk_size):
        chunk = all_subjects[i:i+chunk_size]
        chunk_num = i // chunk_size + 1
        total_chunks = (total_subjects + chunk_size - 1) // chunk_size
        
        print(f"Processing chunk {chunk_num}/{total_chunks} ({len(chunk)} subjects)...")
        
        for subject_id in chunk:
            input_dir = Path(input_root) / subject_id
            output_dir = Path(output_root) / subject_id
            output_dir.mkdir(parents=True, exist_ok=True)
            
            exit_code = main([
                "-z", "y",
                "-b", "y",
                "-o", str(output_dir),
                str(input_dir)
            ])
            
            results[subject_id] = exit_code in [0, 8]
        
        # Progress update
        completed = i + len(chunk)
        print(f"Completed {completed}/{total_subjects} subjects")
    
    return results

# Usage for 1000 subjects
all_subjects = [f"sub-{i:04d}" for i in range(1, 1001)]
results = chunked_batch_process(all_subjects, "/data/dicom", "/data/nifti", chunk_size=50)

Production Pipeline Integration

Complete Production Workflow

from dcm2niix import main
from pathlib import Path
import logging
import json
from datetime import datetime

class DicomConverter:
    """Production-grade DICOM to NIfTI converter."""
    
    def __init__(self, input_root: str, output_root: str, log_file: str = None):
        self.input_root = Path(input_root)
        self.output_root = Path(output_root)
        
        # Setup logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file) if log_file else logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def validate_input(self, subject_id: str) -> bool:
        """Validate input directory exists and contains files."""
        input_dir = self.input_root / subject_id
        
        if not input_dir.exists():
            self.logger.error(f"Input directory not found: {input_dir}")
            return False
        
        if not any(input_dir.iterdir()):
            self.logger.error(f"Input directory is empty: {input_dir}")
            return False
        
        return True
    
    def convert_subject(self, subject_id: str, **options) -> dict:
        """
        Convert single subject with full error handling and logging.
        
        Returns:
            dict: Result with success, exit_code, message, output_dir
        """
        start_time = datetime.now()
        
        # Validate input
        if not self.validate_input(subject_id):
            return {
                "success": False,
                "exit_code": 5,
                "message": "Input validation failed",
                "duration": 0
            }
        
        # Setup paths
        input_dir = self.input_root / subject_id
        output_dir = self.output_root / subject_id
        output_dir.mkdir(parents=True, exist_ok=True)
        
        # Build arguments
        args = [
            "-z", "y",
            "-b", "y",
            "-ba", "y",
            "-f", f"sub-{subject_id}_%p_%s",
            "-o", str(output_dir),
            str(input_dir)
        ]
        
        # Add custom options
        for key, value in options.items():
            args.extend([f"-{key}", str(value)])
        
        # Run conversion
        self.logger.info(f"Converting {subject_id}...")
        
        try:
            exit_code = main(args, timeout=600)
            duration = (datetime.now() - start_time).total_seconds()
            
            if exit_code in [0, 8]:
                self.logger.info(f"✓ {subject_id} completed in {duration:.1f}s")
                return {
                    "success": True,
                    "exit_code": exit_code,
                    "message": "Conversion successful",
                    "output_dir": str(output_dir),
                    "duration": duration
                }
            else:
                self.logger.error(f"✗ {subject_id} failed with exit code {exit_code}")
                return {
                    "success": False,
                    "exit_code": exit_code,
                    "message": f"Conversion failed (exit code {exit_code})",
                    "duration": duration
                }
                
        except Exception as e:
            duration = (datetime.now() - start_time).total_seconds()
            self.logger.exception(f"✗ {subject_id} exception: {e}")
            return {
                "success": False,
                "exit_code": -1,
                "message": f"Exception: {str(e)}",
                "duration": duration
            }
    
    def batch_convert(self, subject_ids: list, report_file: str = None) -> dict:
        """
        Convert multiple subjects and generate report.
        
        Args:
            subject_ids: List of subject identifiers
            report_file: Optional JSON report output path
            
        Returns:
            dict: Summary statistics and results
        """
        results = {}
        
        for i, subject_id in enumerate(subject_ids, 1):
            self.logger.info(f"Processing {i}/{len(subject_ids)}: {subject_id}")
            results[subject_id] = self.convert_subject(subject_id)
        
        # Generate summary
        successful = sum(1 for r in results.values() if r["success"])
        total_duration = sum(r["duration"] for r in results.values())
        
        summary = {
            "total_subjects": len(subject_ids),
            "successful": successful,
            "failed": len(subject_ids) - successful,
            "success_rate": successful / len(subject_ids) * 100,
            "total_duration": total_duration,
            "average_duration": total_duration / len(subject_ids),
            "results": results
        }
        
        # Save report
        if report_file:
            with open(report_file, 'w') as f:
                json.dump(summary, f, indent=2)
            self.logger.info(f"Report saved to {report_file}")
        
        # Log summary
        self.logger.info(f"Batch complete: {successful}/{len(subject_ids)} successful ({summary['success_rate']:.1f}%)")
        self.logger.info(f"Total time: {total_duration:.1f}s, Average: {summary['average_duration']:.1f}s per subject")
        
        return summary

# Usage
converter = DicomConverter(
    input_root="/data/dicom",
    output_root="/data/nifti",
    log_file="/logs/conversion.log"
)

subjects = [f"sub-{i:03d}" for i in range(1, 51)]
summary = converter.batch_convert(subjects, report_file="/logs/conversion_report.json")

print(f"Conversion complete: {summary['successful']}/{summary['total_subjects']} successful")

Next Steps

  • Edge Cases and Troubleshooting
  • Python API Reference
  • CLI Options Reference