CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-presidio-analyzer

Python-based service for detecting PII entities in text using Named Entity Recognition, regular expressions, rule-based logic, and checksums

Pending
Overview
Eval results
Files

batch-processing.mddocs/

Batch Processing

The BatchAnalyzerEngine provides high-performance analysis capabilities for processing large datasets, including iterables, dictionaries, and structured data with multiprocessing support.

Capabilities

BatchAnalyzerEngine

Efficient batch processing engine that handles large-scale PII detection operations with configurable parallelization and memory optimization.

class BatchAnalyzerEngine:
    """
    Batch analysis engine for processing large datasets efficiently.
    
    Args:
        analyzer_engine: AnalyzerEngine instance (creates default if None)
    """
    def __init__(self, analyzer_engine: Optional[AnalyzerEngine] = None): ...

    def analyze_iterator(
        self,
        texts: Iterable[Union[str, bool, float, int]],
        language: str,
        batch_size: int = 1,
        n_process: int = 1,
        **kwargs
    ) -> List[List[RecognizerResult]]:
        """
        Analyze an iterable of texts with batch processing and multiprocessing support.
        
        Args:
            texts: Iterable of text strings to analyze (non-string values converted to string)
            language: Language code for analysis
            batch_size: Number of texts to process in each batch
            n_process: Number of parallel processes (1 = single process)
            **kwargs: Additional arguments passed to AnalyzerEngine.analyze()
            
        Returns:
            List of RecognizerResult lists, one per input text (same order as input)
        """

    def analyze_dict(
        self,
        input_dict: Dict[str, Union[Any, Iterable[Any]]],
        language: str,
        keys_to_skip: Optional[List[str]] = None,
        batch_size: int = 1,
        n_process: int = 1,
        **kwargs
    ) -> Iterator[DictAnalyzerResult]:
        """
        Analyze dictionary values with support for nested structures and iterables.
        
        Args:
            input_dict: Dictionary with string keys and various value types
            language: Language code for analysis
            keys_to_skip: Dictionary keys to exclude from analysis
            batch_size: Number of values to process in each batch
            n_process: Number of parallel processes
            **kwargs: Additional arguments passed to AnalyzerEngine.analyze()
            
        Returns:
            Iterator of DictAnalyzerResult objects for each analyzed key-value pair
        """

    # Property
    analyzer_engine: AnalyzerEngine  # Underlying analyzer engine instance

DictAnalyzerResult

Result container for dictionary analysis operations, handling various value types and nested structures.

class DictAnalyzerResult:
    """
    Result container for dictionary analysis operations.
    
    Properties:
        key: Dictionary key that was analyzed
        value: Original value (string, list, dict, or other type)
        recognizer_results: Detection results based on value type:
            - List[RecognizerResult] for string values
            - List[List[RecognizerResult]] for list values  
            - Iterator[DictAnalyzerResult] for nested dictionaries
    """
    key: str
    value: Union[str, List[str], dict]
    recognizer_results: Union[
        List[RecognizerResult],
        List[List[RecognizerResult]], 
        Iterator[DictAnalyzerResult]
    ]

Usage Examples

Basic Iterator Processing

from presidio_analyzer import BatchAnalyzerEngine

# Initialize batch engine
batch_engine = BatchAnalyzerEngine()

# Process list of texts
texts = [
    "Contact John at john@email.com",
    "Call support: 555-123-4567", 
    "SSN: 123-45-6789",
    "Visit https://example.com"
]

results = batch_engine.analyze_iterator(
    texts=texts,
    language="en",
    batch_size=2  # Process 2 texts per batch
)

# Process results (same order as input)
for i, text_results in enumerate(results):
    print(f"Text {i+1}: '{texts[i]}'")
    for result in text_results:
        detected = texts[i][result.start:result.end]
        print(f"  Found {result.entity_type}: '{detected}'")

Multiprocess Analysis

from presidio_analyzer import BatchAnalyzerEngine
import pandas as pd

# Large dataset example
batch_engine = BatchAnalyzerEngine()

# Sample large dataset
texts = [f"User email: user{i}@company.com" for i in range(1000)]

# Process with multiple cores
results = batch_engine.analyze_iterator(
    texts=texts,
    language="en",
    batch_size=50,     # Process 50 texts per batch
    n_process=4,       # Use 4 parallel processes
    score_threshold=0.7  # Passed to underlying analyzer
)

print(f"Processed {len(texts)} texts with {sum(len(r) for r in results)} total detections")

Dictionary Analysis

from presidio_analyzer import BatchAnalyzerEngine

batch_engine = BatchAnalyzerEngine()

# Sample user data dictionary
user_data = {
    "name": "John Smith",
    "email": "john.smith@company.com", 
    "phone": "555-123-4567",
    "address": "123 Main St, Boston, MA",
    "notes": ["Called on Monday", "Prefers email contact"],
    "metadata": {
        "created": "2023-01-15",
        "last_login": "user.login@system.com"
    },
    "user_id": 12345,  # Non-string value
    "active": True     # Non-string value
}

# Analyze dictionary
results = batch_engine.analyze_dict(
    input_dict=user_data,
    language="en",
    keys_to_skip=["user_id", "active"],  # Skip non-PII fields
    score_threshold=0.6
)

# Process results
for dict_result in results:
    print(f"\nKey: '{dict_result.key}'")
    print(f"Value: {dict_result.value}")
    
    if isinstance(dict_result.recognizer_results, list):
        # String or list value results
        if dict_result.recognizer_results and isinstance(dict_result.recognizer_results[0], list):
            # List of strings - each element has its own results
            for i, element_results in enumerate(dict_result.recognizer_results):
                if element_results:
                    print(f"  Element {i}: {len(element_results)} detections")
        else:
            # Single string - direct results
            if dict_result.recognizer_results:
                print(f"  Detections: {len(dict_result.recognizer_results)}")
                for result in dict_result.recognizer_results:
                    print(f"    {result.entity_type}: score {result.score:.2f}")
    else:
        # Nested dictionary - recursive results
        print("  Nested dictionary analysis:")
        for nested_result in dict_result.recognizer_results:
            print(f"    {nested_result.key}: {nested_result.value}")

Pandas DataFrame Integration

from presidio_analyzer import BatchAnalyzerEngine
import pandas as pd

batch_engine = BatchAnalyzerEngine()

# Sample DataFrame
df = pd.DataFrame({
    'customer_id': [1, 2, 3],
    'name': ['John Doe', 'Jane Smith', 'Bob Johnson'],
    'email': ['john@email.com', 'jane.smith@company.org', 'bob.j@service.net'],
    'phone': ['555-0123', '555-0456', '555-0789'],
    'notes': ['VIP customer', 'Prefers phone calls', 'Email only']
})

# Analyze specific columns
email_results = batch_engine.analyze_iterator(
    texts=df['email'].tolist(),
    language="en",
    batch_size=10,
    entities=["EMAIL_ADDRESS"]
)

phone_results = batch_engine.analyze_iterator(
    texts=df['phone'].tolist(), 
    language="en",
    batch_size=10,
    entities=["PHONE_NUMBER"]
)

# Add detection flags to DataFrame
df['email_detected'] = [len(results) > 0 for results in email_results]
df['phone_detected'] = [len(results) > 0 for results in phone_results]

print("Detection Summary:")
print(f"Emails detected: {df['email_detected'].sum()}/{len(df)}")
print(f"Phones detected: {df['phone_detected'].sum()}/{len(df)}")

File Processing

from presidio_analyzer import BatchAnalyzerEngine
import json

batch_engine = BatchAnalyzerEngine()

# Process log file entries
def process_log_file(file_path):
    texts = []
    with open(file_path, 'r') as f:
        for line in f:
            if line.strip():  # Skip empty lines
                texts.append(line.strip())
    
    # Batch process all log entries
    results = batch_engine.analyze_iterator(
        texts=texts,
        language="en", 
        batch_size=100,
        n_process=2,
        entities=["EMAIL_ADDRESS", "PHONE_NUMBER", "IP_ADDRESS"]
    )
    
    # Find entries with PII
    pii_entries = []
    for i, text_results in enumerate(results):
        if text_results:  # Has detections
            pii_entries.append({
                'line_number': i + 1,
                'text': texts[i],
                'detections': [
                    {
                        'entity_type': r.entity_type,
                        'text': texts[i][r.start:r.end],
                        'score': r.score
                    }
                    for r in text_results
                ]
            })
    
    return pii_entries

# Usage
# pii_findings = process_log_file('/path/to/logfile.txt')
# print(f"Found PII in {len(pii_findings)} log entries")

Configuration-based Batch Processing

from presidio_analyzer import BatchAnalyzerEngine, AnalyzerEngineProvider

# Use configuration for consistent batch processing
provider = AnalyzerEngineProvider(
    analyzer_engine_conf_file="config/analyzer.yaml"
)
analyzer = provider.create_engine()
batch_engine = BatchAnalyzerEngine(analyzer_engine=analyzer)

# Batch configuration
batch_config = {
    'language': 'en',
    'batch_size': 50,
    'n_process': 3,
    'score_threshold': 0.8,
    'entities': ['PERSON', 'EMAIL_ADDRESS', 'PHONE_NUMBER', 'US_SSN']
}

# Process with consistent configuration
texts = ["Sample text 1", "Sample text 2", "..."]
results = batch_engine.analyze_iterator(texts=texts, **batch_config)

Memory-Efficient Processing

from presidio_analyzer import BatchAnalyzerEngine

batch_engine = BatchAnalyzerEngine()

def process_large_dataset(data_generator, batch_size=100):
    """
    Process large datasets using generators to minimize memory usage.
    """
    batch = []
    all_results = []
    
    for text in data_generator:
        batch.append(text)
        
        if len(batch) >= batch_size:
            # Process current batch
            batch_results = batch_engine.analyze_iterator(
                texts=batch,
                language="en",
                batch_size=batch_size,  
                score_threshold=0.7
            )
            all_results.extend(batch_results)
            batch = []  # Clear batch to free memory
    
    # Process remaining items
    if batch:
        batch_results = batch_engine.analyze_iterator(
            texts=batch,
            language="en", 
            batch_size=len(batch),
            score_threshold=0.7
        )
        all_results.extend(batch_results)
    
    return all_results

# Example generator function
def text_generator():
    for i in range(10000):
        yield f"Generated text {i} with email user{i}@domain.com"

# Process without loading all data into memory
results = process_large_dataset(text_generator())
print(f"Processed texts with {sum(len(r) for r in results)} total detections")

Performance Considerations

Batch Size Optimization

  • Small batches (1-10): Better for memory-constrained environments
  • Medium batches (50-100): Good balance for most scenarios
  • Large batches (500+): Better throughput for high-memory systems

Multiprocessing Guidelines

  • n_process = 1: Single-threaded (best for small datasets or memory constraints)
  • n_process = CPU cores: Good starting point for parallel processing
  • n_process > CPU cores: May help with I/O-bound operations but can cause overhead

Memory Management

  • Use generators for very large datasets
  • Process results in chunks rather than accumulating all results
  • Consider using smaller batch sizes with more processes for better memory distribution

Install with Tessl CLI

npx tessl i tessl/pypi-presidio-analyzer

docs

batch-processing.md

configuration.md

context-enhancement.md

core-analysis.md

entity-recognizers.md

index.md

predefined-recognizers.md

tile.json