0
# Batch Processing
1
2
The `BatchAnalyzerEngine` provides high-performance analysis capabilities for processing large datasets, including iterables, dictionaries, and structured data with multiprocessing support.
3
4
## Capabilities
5
6
### BatchAnalyzerEngine
7
8
Efficient batch processing engine that handles large-scale PII detection operations with configurable parallelization and memory optimization.
9
10
```python { .api }
11
class BatchAnalyzerEngine:
12
"""
13
Batch analysis engine for processing large datasets efficiently.
14
15
Args:
16
analyzer_engine: AnalyzerEngine instance (creates default if None)
17
"""
18
def __init__(self, analyzer_engine: Optional[AnalyzerEngine] = None): ...
19
20
def analyze_iterator(
21
self,
22
texts: Iterable[Union[str, bool, float, int]],
23
language: str,
24
batch_size: int = 1,
25
n_process: int = 1,
26
**kwargs
27
) -> List[List[RecognizerResult]]:
28
"""
29
Analyze an iterable of texts with batch processing and multiprocessing support.
30
31
Args:
32
texts: Iterable of text strings to analyze (non-string values converted to string)
33
language: Language code for analysis
34
batch_size: Number of texts to process in each batch
35
n_process: Number of parallel processes (1 = single process)
36
**kwargs: Additional arguments passed to AnalyzerEngine.analyze()
37
38
Returns:
39
List of RecognizerResult lists, one per input text (same order as input)
40
"""
41
42
def analyze_dict(
43
self,
44
input_dict: Dict[str, Union[Any, Iterable[Any]]],
45
language: str,
46
keys_to_skip: Optional[List[str]] = None,
47
batch_size: int = 1,
48
n_process: int = 1,
49
**kwargs
50
) -> Iterator[DictAnalyzerResult]:
51
"""
52
Analyze dictionary values with support for nested structures and iterables.
53
54
Args:
55
input_dict: Dictionary with string keys and various value types
56
language: Language code for analysis
57
keys_to_skip: Dictionary keys to exclude from analysis
58
batch_size: Number of values to process in each batch
59
n_process: Number of parallel processes
60
**kwargs: Additional arguments passed to AnalyzerEngine.analyze()
61
62
Returns:
63
Iterator of DictAnalyzerResult objects for each analyzed key-value pair
64
"""
65
66
# Property
67
analyzer_engine: AnalyzerEngine # Underlying analyzer engine instance
68
```
69
70
### DictAnalyzerResult
71
72
Result container for dictionary analysis operations, handling various value types and nested structures.
73
74
```python { .api }
75
class DictAnalyzerResult:
76
"""
77
Result container for dictionary analysis operations.
78
79
Properties:
80
key: Dictionary key that was analyzed
81
value: Original value (string, list, dict, or other type)
82
recognizer_results: Detection results based on value type:
83
- List[RecognizerResult] for string values
84
- List[List[RecognizerResult]] for list values
85
- Iterator[DictAnalyzerResult] for nested dictionaries
86
"""
87
key: str
88
value: Union[str, List[str], dict]
89
recognizer_results: Union[
90
List[RecognizerResult],
91
List[List[RecognizerResult]],
92
Iterator[DictAnalyzerResult]
93
]
94
```
95
96
## Usage Examples
97
98
### Basic Iterator Processing
99
100
```python
101
from presidio_analyzer import BatchAnalyzerEngine
102
103
# Initialize batch engine
104
batch_engine = BatchAnalyzerEngine()
105
106
# Process list of texts
107
texts = [
108
"Contact John at john@email.com",
109
"Call support: 555-123-4567",
110
"SSN: 123-45-6789",
111
"Visit https://example.com"
112
]
113
114
results = batch_engine.analyze_iterator(
115
texts=texts,
116
language="en",
117
batch_size=2 # Process 2 texts per batch
118
)
119
120
# Process results (same order as input)
121
for i, text_results in enumerate(results):
122
print(f"Text {i+1}: '{texts[i]}'")
123
for result in text_results:
124
detected = texts[i][result.start:result.end]
125
print(f" Found {result.entity_type}: '{detected}'")
126
```
127
128
### Multiprocess Analysis
129
130
```python
131
from presidio_analyzer import BatchAnalyzerEngine
132
import pandas as pd
133
134
# Large dataset example
135
batch_engine = BatchAnalyzerEngine()
136
137
# Sample large dataset
138
texts = [f"User email: user{i}@company.com" for i in range(1000)]
139
140
# Process with multiple cores
141
results = batch_engine.analyze_iterator(
142
texts=texts,
143
language="en",
144
batch_size=50, # Process 50 texts per batch
145
n_process=4, # Use 4 parallel processes
146
score_threshold=0.7 # Passed to underlying analyzer
147
)
148
149
print(f"Processed {len(texts)} texts with {sum(len(r) for r in results)} total detections")
150
```
151
152
### Dictionary Analysis
153
154
```python
155
from presidio_analyzer import BatchAnalyzerEngine
156
157
batch_engine = BatchAnalyzerEngine()
158
159
# Sample user data dictionary
160
user_data = {
161
"name": "John Smith",
162
"email": "john.smith@company.com",
163
"phone": "555-123-4567",
164
"address": "123 Main St, Boston, MA",
165
"notes": ["Called on Monday", "Prefers email contact"],
166
"metadata": {
167
"created": "2023-01-15",
168
"last_login": "user.login@system.com"
169
},
170
"user_id": 12345, # Non-string value
171
"active": True # Non-string value
172
}
173
174
# Analyze dictionary
175
results = batch_engine.analyze_dict(
176
input_dict=user_data,
177
language="en",
178
keys_to_skip=["user_id", "active"], # Skip non-PII fields
179
score_threshold=0.6
180
)
181
182
# Process results
183
for dict_result in results:
184
print(f"\nKey: '{dict_result.key}'")
185
print(f"Value: {dict_result.value}")
186
187
if isinstance(dict_result.recognizer_results, list):
188
# String or list value results
189
if dict_result.recognizer_results and isinstance(dict_result.recognizer_results[0], list):
190
# List of strings - each element has its own results
191
for i, element_results in enumerate(dict_result.recognizer_results):
192
if element_results:
193
print(f" Element {i}: {len(element_results)} detections")
194
else:
195
# Single string - direct results
196
if dict_result.recognizer_results:
197
print(f" Detections: {len(dict_result.recognizer_results)}")
198
for result in dict_result.recognizer_results:
199
print(f" {result.entity_type}: score {result.score:.2f}")
200
else:
201
# Nested dictionary - recursive results
202
print(" Nested dictionary analysis:")
203
for nested_result in dict_result.recognizer_results:
204
print(f" {nested_result.key}: {nested_result.value}")
205
```
206
207
### Pandas DataFrame Integration
208
209
```python
210
from presidio_analyzer import BatchAnalyzerEngine
211
import pandas as pd
212
213
batch_engine = BatchAnalyzerEngine()
214
215
# Sample DataFrame
216
df = pd.DataFrame({
217
'customer_id': [1, 2, 3],
218
'name': ['John Doe', 'Jane Smith', 'Bob Johnson'],
219
'email': ['john@email.com', 'jane.smith@company.org', 'bob.j@service.net'],
220
'phone': ['555-0123', '555-0456', '555-0789'],
221
'notes': ['VIP customer', 'Prefers phone calls', 'Email only']
222
})
223
224
# Analyze specific columns
225
email_results = batch_engine.analyze_iterator(
226
texts=df['email'].tolist(),
227
language="en",
228
batch_size=10,
229
entities=["EMAIL_ADDRESS"]
230
)
231
232
phone_results = batch_engine.analyze_iterator(
233
texts=df['phone'].tolist(),
234
language="en",
235
batch_size=10,
236
entities=["PHONE_NUMBER"]
237
)
238
239
# Add detection flags to DataFrame
240
df['email_detected'] = [len(results) > 0 for results in email_results]
241
df['phone_detected'] = [len(results) > 0 for results in phone_results]
242
243
print("Detection Summary:")
244
print(f"Emails detected: {df['email_detected'].sum()}/{len(df)}")
245
print(f"Phones detected: {df['phone_detected'].sum()}/{len(df)}")
246
```
247
248
### File Processing
249
250
```python
251
from presidio_analyzer import BatchAnalyzerEngine
252
import json
253
254
batch_engine = BatchAnalyzerEngine()
255
256
# Process log file entries
257
def process_log_file(file_path):
258
texts = []
259
with open(file_path, 'r') as f:
260
for line in f:
261
if line.strip(): # Skip empty lines
262
texts.append(line.strip())
263
264
# Batch process all log entries
265
results = batch_engine.analyze_iterator(
266
texts=texts,
267
language="en",
268
batch_size=100,
269
n_process=2,
270
entities=["EMAIL_ADDRESS", "PHONE_NUMBER", "IP_ADDRESS"]
271
)
272
273
# Find entries with PII
274
pii_entries = []
275
for i, text_results in enumerate(results):
276
if text_results: # Has detections
277
pii_entries.append({
278
'line_number': i + 1,
279
'text': texts[i],
280
'detections': [
281
{
282
'entity_type': r.entity_type,
283
'text': texts[i][r.start:r.end],
284
'score': r.score
285
}
286
for r in text_results
287
]
288
})
289
290
return pii_entries
291
292
# Usage
293
# pii_findings = process_log_file('/path/to/logfile.txt')
294
# print(f"Found PII in {len(pii_findings)} log entries")
295
```
296
297
### Configuration-based Batch Processing
298
299
```python
300
from presidio_analyzer import BatchAnalyzerEngine, AnalyzerEngineProvider
301
302
# Use configuration for consistent batch processing
303
provider = AnalyzerEngineProvider(
304
analyzer_engine_conf_file="config/analyzer.yaml"
305
)
306
analyzer = provider.create_engine()
307
batch_engine = BatchAnalyzerEngine(analyzer_engine=analyzer)
308
309
# Batch configuration
310
batch_config = {
311
'language': 'en',
312
'batch_size': 50,
313
'n_process': 3,
314
'score_threshold': 0.8,
315
'entities': ['PERSON', 'EMAIL_ADDRESS', 'PHONE_NUMBER', 'US_SSN']
316
}
317
318
# Process with consistent configuration
319
texts = ["Sample text 1", "Sample text 2", "..."]
320
results = batch_engine.analyze_iterator(texts=texts, **batch_config)
321
```
322
323
### Memory-Efficient Processing
324
325
```python
326
from presidio_analyzer import BatchAnalyzerEngine
327
328
batch_engine = BatchAnalyzerEngine()
329
330
def process_large_dataset(data_generator, batch_size=100):
331
"""
332
Process large datasets using generators to minimize memory usage.
333
"""
334
batch = []
335
all_results = []
336
337
for text in data_generator:
338
batch.append(text)
339
340
if len(batch) >= batch_size:
341
# Process current batch
342
batch_results = batch_engine.analyze_iterator(
343
texts=batch,
344
language="en",
345
batch_size=batch_size,
346
score_threshold=0.7
347
)
348
all_results.extend(batch_results)
349
batch = [] # Clear batch to free memory
350
351
# Process remaining items
352
if batch:
353
batch_results = batch_engine.analyze_iterator(
354
texts=batch,
355
language="en",
356
batch_size=len(batch),
357
score_threshold=0.7
358
)
359
all_results.extend(batch_results)
360
361
return all_results
362
363
# Example generator function
364
def text_generator():
365
for i in range(10000):
366
yield f"Generated text {i} with email user{i}@domain.com"
367
368
# Process without loading all data into memory
369
results = process_large_dataset(text_generator())
370
print(f"Processed texts with {sum(len(r) for r in results)} total detections")
371
```
372
373
## Performance Considerations
374
375
### Batch Size Optimization
376
377
- **Small batches (1-10)**: Better for memory-constrained environments
378
- **Medium batches (50-100)**: Good balance for most scenarios
379
- **Large batches (500+)**: Better throughput for high-memory systems
380
381
### Multiprocessing Guidelines
382
383
- **n_process = 1**: Single-threaded (best for small datasets or memory constraints)
384
- **n_process = CPU cores**: Good starting point for parallel processing
385
- **n_process > CPU cores**: May help with I/O-bound operations but can cause overhead
386
387
### Memory Management
388
389
- Use generators for very large datasets
390
- Process results in chunks rather than accumulating all results
391
- Consider using smaller batch sizes with more processes for better memory distribution