or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdcontext-enhancement.mdcore-analysis.mdentity-recognizers.mdindex.mdpredefined-recognizers.md

batch-processing.mddocs/

0

# Batch Processing

1

2

The `BatchAnalyzerEngine` provides high-performance analysis capabilities for processing large datasets, including iterables, dictionaries, and structured data with multiprocessing support.

3

4

## Capabilities

5

6

### BatchAnalyzerEngine

7

8

Efficient batch processing engine that handles large-scale PII detection operations with configurable parallelization and memory optimization.

9

10

```python { .api }

11

class BatchAnalyzerEngine:

12

"""

13

Batch analysis engine for processing large datasets efficiently.

14

15

Args:

16

analyzer_engine: AnalyzerEngine instance (creates default if None)

17

"""

18

def __init__(self, analyzer_engine: Optional[AnalyzerEngine] = None): ...

19

20

def analyze_iterator(

21

self,

22

texts: Iterable[Union[str, bool, float, int]],

23

language: str,

24

batch_size: int = 1,

25

n_process: int = 1,

26

**kwargs

27

) -> List[List[RecognizerResult]]:

28

"""

29

Analyze an iterable of texts with batch processing and multiprocessing support.

30

31

Args:

32

texts: Iterable of text strings to analyze (non-string values converted to string)

33

language: Language code for analysis

34

batch_size: Number of texts to process in each batch

35

n_process: Number of parallel processes (1 = single process)

36

**kwargs: Additional arguments passed to AnalyzerEngine.analyze()

37

38

Returns:

39

List of RecognizerResult lists, one per input text (same order as input)

40

"""

41

42

def analyze_dict(

43

self,

44

input_dict: Dict[str, Union[Any, Iterable[Any]]],

45

language: str,

46

keys_to_skip: Optional[List[str]] = None,

47

batch_size: int = 1,

48

n_process: int = 1,

49

**kwargs

50

) -> Iterator[DictAnalyzerResult]:

51

"""

52

Analyze dictionary values with support for nested structures and iterables.

53

54

Args:

55

input_dict: Dictionary with string keys and various value types

56

language: Language code for analysis

57

keys_to_skip: Dictionary keys to exclude from analysis

58

batch_size: Number of values to process in each batch

59

n_process: Number of parallel processes

60

**kwargs: Additional arguments passed to AnalyzerEngine.analyze()

61

62

Returns:

63

Iterator of DictAnalyzerResult objects for each analyzed key-value pair

64

"""

65

66

# Property

67

analyzer_engine: AnalyzerEngine # Underlying analyzer engine instance

68

```

69

70

### DictAnalyzerResult

71

72

Result container for dictionary analysis operations, handling various value types and nested structures.

73

74

```python { .api }

75

class DictAnalyzerResult:

76

"""

77

Result container for dictionary analysis operations.

78

79

Properties:

80

key: Dictionary key that was analyzed

81

value: Original value (string, list, dict, or other type)

82

recognizer_results: Detection results based on value type:

83

- List[RecognizerResult] for string values

84

- List[List[RecognizerResult]] for list values

85

- Iterator[DictAnalyzerResult] for nested dictionaries

86

"""

87

key: str

88

value: Union[str, List[str], dict]

89

recognizer_results: Union[

90

List[RecognizerResult],

91

List[List[RecognizerResult]],

92

Iterator[DictAnalyzerResult]

93

]

94

```

95

96

## Usage Examples

97

98

### Basic Iterator Processing

99

100

```python

101

from presidio_analyzer import BatchAnalyzerEngine

102

103

# Initialize batch engine

104

batch_engine = BatchAnalyzerEngine()

105

106

# Process list of texts

107

texts = [

108

"Contact John at john@email.com",

109

"Call support: 555-123-4567",

110

"SSN: 123-45-6789",

111

"Visit https://example.com"

112

]

113

114

results = batch_engine.analyze_iterator(

115

texts=texts,

116

language="en",

117

batch_size=2 # Process 2 texts per batch

118

)

119

120

# Process results (same order as input)

121

for i, text_results in enumerate(results):

122

print(f"Text {i+1}: '{texts[i]}'")

123

for result in text_results:

124

detected = texts[i][result.start:result.end]

125

print(f" Found {result.entity_type}: '{detected}'")

126

```

127

128

### Multiprocess Analysis

129

130

```python

131

from presidio_analyzer import BatchAnalyzerEngine

132

import pandas as pd

133

134

# Large dataset example

135

batch_engine = BatchAnalyzerEngine()

136

137

# Sample large dataset

138

texts = [f"User email: user{i}@company.com" for i in range(1000)]

139

140

# Process with multiple cores

141

results = batch_engine.analyze_iterator(

142

texts=texts,

143

language="en",

144

batch_size=50, # Process 50 texts per batch

145

n_process=4, # Use 4 parallel processes

146

score_threshold=0.7 # Passed to underlying analyzer

147

)

148

149

print(f"Processed {len(texts)} texts with {sum(len(r) for r in results)} total detections")

150

```

151

152

### Dictionary Analysis

153

154

```python

155

from presidio_analyzer import BatchAnalyzerEngine

156

157

batch_engine = BatchAnalyzerEngine()

158

159

# Sample user data dictionary

160

user_data = {

161

"name": "John Smith",

162

"email": "john.smith@company.com",

163

"phone": "555-123-4567",

164

"address": "123 Main St, Boston, MA",

165

"notes": ["Called on Monday", "Prefers email contact"],

166

"metadata": {

167

"created": "2023-01-15",

168

"last_login": "user.login@system.com"

169

},

170

"user_id": 12345, # Non-string value

171

"active": True # Non-string value

172

}

173

174

# Analyze dictionary

175

results = batch_engine.analyze_dict(

176

input_dict=user_data,

177

language="en",

178

keys_to_skip=["user_id", "active"], # Skip non-PII fields

179

score_threshold=0.6

180

)

181

182

# Process results

183

for dict_result in results:

184

print(f"\nKey: '{dict_result.key}'")

185

print(f"Value: {dict_result.value}")

186

187

if isinstance(dict_result.recognizer_results, list):

188

# String or list value results

189

if dict_result.recognizer_results and isinstance(dict_result.recognizer_results[0], list):

190

# List of strings - each element has its own results

191

for i, element_results in enumerate(dict_result.recognizer_results):

192

if element_results:

193

print(f" Element {i}: {len(element_results)} detections")

194

else:

195

# Single string - direct results

196

if dict_result.recognizer_results:

197

print(f" Detections: {len(dict_result.recognizer_results)}")

198

for result in dict_result.recognizer_results:

199

print(f" {result.entity_type}: score {result.score:.2f}")

200

else:

201

# Nested dictionary - recursive results

202

print(" Nested dictionary analysis:")

203

for nested_result in dict_result.recognizer_results:

204

print(f" {nested_result.key}: {nested_result.value}")

205

```

206

207

### Pandas DataFrame Integration

208

209

```python

210

from presidio_analyzer import BatchAnalyzerEngine

211

import pandas as pd

212

213

batch_engine = BatchAnalyzerEngine()

214

215

# Sample DataFrame

216

df = pd.DataFrame({

217

'customer_id': [1, 2, 3],

218

'name': ['John Doe', 'Jane Smith', 'Bob Johnson'],

219

'email': ['john@email.com', 'jane.smith@company.org', 'bob.j@service.net'],

220

'phone': ['555-0123', '555-0456', '555-0789'],

221

'notes': ['VIP customer', 'Prefers phone calls', 'Email only']

222

})

223

224

# Analyze specific columns

225

email_results = batch_engine.analyze_iterator(

226

texts=df['email'].tolist(),

227

language="en",

228

batch_size=10,

229

entities=["EMAIL_ADDRESS"]

230

)

231

232

phone_results = batch_engine.analyze_iterator(

233

texts=df['phone'].tolist(),

234

language="en",

235

batch_size=10,

236

entities=["PHONE_NUMBER"]

237

)

238

239

# Add detection flags to DataFrame

240

df['email_detected'] = [len(results) > 0 for results in email_results]

241

df['phone_detected'] = [len(results) > 0 for results in phone_results]

242

243

print("Detection Summary:")

244

print(f"Emails detected: {df['email_detected'].sum()}/{len(df)}")

245

print(f"Phones detected: {df['phone_detected'].sum()}/{len(df)}")

246

```

247

248

### File Processing

249

250

```python

251

from presidio_analyzer import BatchAnalyzerEngine

252

import json

253

254

batch_engine = BatchAnalyzerEngine()

255

256

# Process log file entries

257

def process_log_file(file_path):

258

texts = []

259

with open(file_path, 'r') as f:

260

for line in f:

261

if line.strip(): # Skip empty lines

262

texts.append(line.strip())

263

264

# Batch process all log entries

265

results = batch_engine.analyze_iterator(

266

texts=texts,

267

language="en",

268

batch_size=100,

269

n_process=2,

270

entities=["EMAIL_ADDRESS", "PHONE_NUMBER", "IP_ADDRESS"]

271

)

272

273

# Find entries with PII

274

pii_entries = []

275

for i, text_results in enumerate(results):

276

if text_results: # Has detections

277

pii_entries.append({

278

'line_number': i + 1,

279

'text': texts[i],

280

'detections': [

281

{

282

'entity_type': r.entity_type,

283

'text': texts[i][r.start:r.end],

284

'score': r.score

285

}

286

for r in text_results

287

]

288

})

289

290

return pii_entries

291

292

# Usage

293

# pii_findings = process_log_file('/path/to/logfile.txt')

294

# print(f"Found PII in {len(pii_findings)} log entries")

295

```

296

297

### Configuration-based Batch Processing

298

299

```python

300

from presidio_analyzer import BatchAnalyzerEngine, AnalyzerEngineProvider

301

302

# Use configuration for consistent batch processing

303

provider = AnalyzerEngineProvider(

304

analyzer_engine_conf_file="config/analyzer.yaml"

305

)

306

analyzer = provider.create_engine()

307

batch_engine = BatchAnalyzerEngine(analyzer_engine=analyzer)

308

309

# Batch configuration

310

batch_config = {

311

'language': 'en',

312

'batch_size': 50,

313

'n_process': 3,

314

'score_threshold': 0.8,

315

'entities': ['PERSON', 'EMAIL_ADDRESS', 'PHONE_NUMBER', 'US_SSN']

316

}

317

318

# Process with consistent configuration

319

texts = ["Sample text 1", "Sample text 2", "..."]

320

results = batch_engine.analyze_iterator(texts=texts, **batch_config)

321

```

322

323

### Memory-Efficient Processing

324

325

```python

326

from presidio_analyzer import BatchAnalyzerEngine

327

328

batch_engine = BatchAnalyzerEngine()

329

330

def process_large_dataset(data_generator, batch_size=100):

331

"""

332

Process large datasets using generators to minimize memory usage.

333

"""

334

batch = []

335

all_results = []

336

337

for text in data_generator:

338

batch.append(text)

339

340

if len(batch) >= batch_size:

341

# Process current batch

342

batch_results = batch_engine.analyze_iterator(

343

texts=batch,

344

language="en",

345

batch_size=batch_size,

346

score_threshold=0.7

347

)

348

all_results.extend(batch_results)

349

batch = [] # Clear batch to free memory

350

351

# Process remaining items

352

if batch:

353

batch_results = batch_engine.analyze_iterator(

354

texts=batch,

355

language="en",

356

batch_size=len(batch),

357

score_threshold=0.7

358

)

359

all_results.extend(batch_results)

360

361

return all_results

362

363

# Example generator function

364

def text_generator():

365

for i in range(10000):

366

yield f"Generated text {i} with email user{i}@domain.com"

367

368

# Process without loading all data into memory

369

results = process_large_dataset(text_generator())

370

print(f"Processed texts with {sum(len(r) for r in results)} total detections")

371

```

372

373

## Performance Considerations

374

375

### Batch Size Optimization

376

377

- **Small batches (1-10)**: Better for memory-constrained environments

378

- **Medium batches (50-100)**: Good balance for most scenarios

379

- **Large batches (500+)**: Better throughput for high-memory systems

380

381

### Multiprocessing Guidelines

382

383

- **n_process = 1**: Single-threaded (best for small datasets or memory constraints)

384

- **n_process = CPU cores**: Good starting point for parallel processing

385

- **n_process > CPU cores**: May help with I/O-bound operations but can cause overhead

386

387

### Memory Management

388

389

- Use generators for very large datasets

390

- Process results in chunks rather than accumulating all results

391

- Consider using smaller batch sizes with more processes for better memory distribution