or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-operations.mdbeta-features.mddocument-processing.mddocument-types.mdindex.mdprocessor-management.md

batch-operations.mddocs/

0

# Batch Operations

1

2

This guide covers batch processing capabilities in Google Cloud Document AI, including asynchronous document processing, operation monitoring, and handling large-scale document workflows.

3

4

## Batch Processing Overview

5

6

Batch processing allows you to process multiple documents asynchronously, making it ideal for:

7

- High-volume document processing

8

- Processing large document collections

9

- Scheduled document processing workflows

10

- Documents stored in Cloud Storage

11

12

Key benefits:

13

- **Scalability**: Process hundreds or thousands of documents

14

- **Cost-effective**: Optimized pricing for bulk operations

15

- **Asynchronous**: Non-blocking operations with progress monitoring

16

- **Integration**: Direct Cloud Storage input/output integration

17

18

## Basic Batch Processing

19

20

### BatchProcessRequest Configuration

21

22

```python { .api }

23

from google.cloud.documentai import DocumentProcessorServiceClient

24

from google.cloud.documentai.types import (

25

BatchProcessRequest,

26

BatchDocumentsInputConfig,

27

DocumentOutputConfig,

28

GcsDocuments,

29

GcsDocument,

30

GcsPrefix

31

)

32

33

def create_batch_process_request(

34

processor_name: str,

35

gcs_input_uri: str,

36

gcs_output_uri: str,

37

input_mime_type: str = "application/pdf"

38

) -> BatchProcessRequest:

39

"""

40

Create a batch processing request for documents in Cloud Storage.

41

42

Args:

43

processor_name: Full processor resource name

44

gcs_input_uri: Input Cloud Storage URI or prefix

45

gcs_output_uri: Output Cloud Storage URI

46

input_mime_type: MIME type of input documents

47

48

Returns:

49

BatchProcessRequest: Configured batch processing request

50

"""

51

# Configure input documents

52

if gcs_input_uri.endswith('/'):

53

# Process all documents with prefix

54

gcs_documents = GcsDocuments(

55

documents=[

56

GcsDocument(

57

gcs_uri=gcs_input_uri + "*.pdf",

58

mime_type=input_mime_type

59

)

60

]

61

)

62

else:

63

# Process specific document

64

gcs_documents = GcsDocuments(

65

documents=[

66

GcsDocument(

67

gcs_uri=gcs_input_uri,

68

mime_type=input_mime_type

69

)

70

]

71

)

72

73

# Configure input

74

input_config = BatchDocumentsInputConfig(gcs_documents=gcs_documents)

75

76

# Configure output

77

gcs_output_config = {

78

"gcs_uri": gcs_output_uri

79

}

80

output_config = DocumentOutputConfig(gcs_output_config=gcs_output_config)

81

82

# Create batch request

83

request = BatchProcessRequest(

84

name=processor_name,

85

input_documents=input_config,

86

document_output_config=output_config

87

)

88

89

return request

90

```

91

92

### Execute Batch Processing

93

94

```python { .api }

95

from google.cloud.documentai import DocumentProcessorServiceClient

96

from google.api_core import operation

97

import time

98

99

def execute_batch_processing(

100

project_id: str,

101

location: str,

102

processor_id: str,

103

gcs_input_uri: str,

104

gcs_output_uri: str,

105

wait_for_completion: bool = False

106

) -> operation.Operation:

107

"""

108

Execute batch processing operation.

109

110

Args:

111

project_id: Google Cloud project ID

112

location: Processor location

113

processor_id: Processor ID

114

gcs_input_uri: Input Cloud Storage URI

115

gcs_output_uri: Output Cloud Storage URI

116

wait_for_completion: Whether to wait for operation completion

117

118

Returns:

119

Operation: Long-running operation object

120

"""

121

client = DocumentProcessorServiceClient()

122

123

# Build processor name

124

processor_name = client.processor_path(project_id, location, processor_id)

125

126

# Create batch request

127

request = create_batch_process_request(

128

processor_name=processor_name,

129

gcs_input_uri=gcs_input_uri,

130

gcs_output_uri=gcs_output_uri

131

)

132

133

print(f"Starting batch processing...")

134

print(f"Input: {gcs_input_uri}")

135

print(f"Output: {gcs_output_uri}")

136

137

# Start batch processing

138

operation = client.batch_process_documents(request=request)

139

140

print(f"Operation name: {operation.operation.name}")

141

142

if wait_for_completion:

143

print("Waiting for operation to complete...")

144

result = operation.result()

145

print("Batch processing completed successfully!")

146

return result

147

else:

148

print("Batch processing started. Use operation name to check progress.")

149

return operation

150

151

# Example usage

152

operation = execute_batch_processing(

153

project_id="my-project",

154

location="us",

155

processor_id="abc123",

156

gcs_input_uri="gs://my-bucket/input-docs/",

157

gcs_output_uri="gs://my-bucket/output/"

158

)

159

```

160

161

## Advanced Batch Configuration

162

163

### Multiple Document Sources

164

165

```python { .api }

166

from google.cloud.documentai.types import (

167

BatchProcessRequest,

168

BatchDocumentsInputConfig,

169

GcsDocuments,

170

GcsDocument

171

)

172

173

def create_multi_source_batch_request(

174

processor_name: str,

175

document_sources: list[dict],

176

gcs_output_uri: str

177

) -> BatchProcessRequest:

178

"""

179

Create batch request with multiple document sources.

180

181

Args:

182

processor_name: Full processor resource name

183

document_sources: List of document source configs

184

gcs_output_uri: Output Cloud Storage URI

185

186

Returns:

187

BatchProcessRequest: Multi-source batch request

188

"""

189

# Collect all documents from different sources

190

all_documents = []

191

192

for source in document_sources:

193

if source["type"] == "gcs_prefix":

194

# Add documents from GCS prefix

195

prefix_documents = list_documents_from_gcs_prefix(

196

source["gcs_prefix"],

197

source.get("mime_types", ["application/pdf"])

198

)

199

all_documents.extend(prefix_documents)

200

201

elif source["type"] == "gcs_list":

202

# Add specific documents

203

for doc_uri in source["gcs_uris"]:

204

all_documents.append(

205

GcsDocument(

206

gcs_uri=doc_uri,

207

mime_type=source.get("mime_type", "application/pdf")

208

)

209

)

210

211

# Configure input

212

gcs_documents = GcsDocuments(documents=all_documents)

213

input_config = BatchDocumentsInputConfig(gcs_documents=gcs_documents)

214

215

# Configure output

216

output_config = DocumentOutputConfig(

217

gcs_output_config={"gcs_uri": gcs_output_uri}

218

)

219

220

return BatchProcessRequest(

221

name=processor_name,

222

input_documents=input_config,

223

document_output_config=output_config

224

)

225

226

def list_documents_from_gcs_prefix(

227

gcs_prefix: str,

228

mime_types: list[str]

229

) -> list[GcsDocument]:

230

"""

231

List documents from a GCS prefix.

232

233

Args:

234

gcs_prefix: Cloud Storage prefix

235

mime_types: Allowed MIME types

236

237

Returns:

238

list[GcsDocument]: List of GCS documents

239

"""

240

from google.cloud import storage

241

242

# Parse GCS URI

243

if not gcs_prefix.startswith("gs://"):

244

raise ValueError("Invalid GCS URI")

245

246

parts = gcs_prefix[5:].split("/", 1)

247

bucket_name = parts[0]

248

prefix = parts[1] if len(parts) > 1 else ""

249

250

# List documents in bucket

251

client = storage.Client()

252

bucket = client.bucket(bucket_name)

253

254

documents = []

255

mime_type_extensions = {

256

"application/pdf": [".pdf"],

257

"image/jpeg": [".jpg", ".jpeg"],

258

"image/png": [".png"],

259

"image/tiff": [".tiff", ".tif"]

260

}

261

262

for blob in bucket.list_blobs(prefix=prefix):

263

# Check file extension matches allowed MIME types

264

for mime_type in mime_types:

265

extensions = mime_type_extensions.get(mime_type, [])

266

if any(blob.name.lower().endswith(ext) for ext in extensions):

267

documents.append(

268

GcsDocument(

269

gcs_uri=f"gs://{bucket_name}/{blob.name}",

270

mime_type=mime_type

271

)

272

)

273

break

274

275

return documents

276

277

# Example usage

278

document_sources = [

279

{

280

"type": "gcs_prefix",

281

"gcs_prefix": "gs://my-bucket/invoices/",

282

"mime_types": ["application/pdf"]

283

},

284

{

285

"type": "gcs_prefix",

286

"gcs_prefix": "gs://my-bucket/receipts/",

287

"mime_types": ["image/jpeg", "image/png"]

288

},

289

{

290

"type": "gcs_list",

291

"gcs_uris": [

292

"gs://another-bucket/doc1.pdf",

293

"gs://another-bucket/doc2.pdf"

294

],

295

"mime_type": "application/pdf"

296

}

297

]

298

299

request = create_multi_source_batch_request(

300

processor_name="projects/my-project/locations/us/processors/abc123",

301

document_sources=document_sources,

302

gcs_output_uri="gs://my-bucket/batch-output/"

303

)

304

```

305

306

### Field Mask and Processing Options

307

308

```python { .api }

309

from google.cloud.documentai.types import (

310

BatchProcessRequest,

311

ProcessOptions,

312

OcrConfig

313

)

314

from google.protobuf.field_mask_pb2 import FieldMask

315

316

def create_batch_request_with_options(

317

processor_name: str,

318

gcs_input_uri: str,

319

gcs_output_uri: str,

320

field_mask_paths: list[str] = None,

321

ocr_config: dict = None

322

) -> BatchProcessRequest:

323

"""

324

Create batch request with processing options and field mask.

325

326

Args:

327

processor_name: Full processor resource name

328

gcs_input_uri: Input Cloud Storage URI

329

gcs_output_uri: Output Cloud Storage URI

330

field_mask_paths: List of field paths to return

331

ocr_config: OCR configuration options

332

333

Returns:

334

BatchProcessRequest: Batch request with options

335

"""

336

# Basic request setup

337

request = create_batch_process_request(

338

processor_name, gcs_input_uri, gcs_output_uri

339

)

340

341

# Add field mask if specified

342

if field_mask_paths:

343

request.field_mask = FieldMask(paths=field_mask_paths)

344

345

# Add processing options if specified

346

if ocr_config:

347

ocr_options = OcrConfig(

348

enable_native_pdf_parsing=ocr_config.get("enable_native_pdf_parsing", True),

349

enable_image_quality_scores=ocr_config.get("enable_image_quality_scores", False),

350

enable_symbol=ocr_config.get("enable_symbol", False)

351

)

352

353

request.process_options = ProcessOptions(ocr_config=ocr_options)

354

355

return request

356

357

# Example usage with optimized processing

358

request = create_batch_request_with_options(

359

processor_name="projects/my-project/locations/us/processors/abc123",

360

gcs_input_uri="gs://my-bucket/documents/",

361

gcs_output_uri="gs://my-bucket/results/",

362

field_mask_paths=[

363

"text",

364

"entities.type_",

365

"entities.mention_text",

366

"entities.confidence",

367

"pages.tables"

368

],

369

ocr_config={

370

"enable_native_pdf_parsing": True,

371

"enable_image_quality_scores": False

372

}

373

)

374

```

375

376

## Operation Management

377

378

### Monitor Operation Progress

379

380

```python { .api }

381

from google.api_core import operation

382

from google.cloud.documentai.types import BatchProcessMetadata

383

import time

384

385

def monitor_batch_operation(

386

operation_obj: operation.Operation,

387

check_interval: int = 30

388

) -> "BatchProcessResponse":

389

"""

390

Monitor a batch processing operation until completion.

391

392

Args:

393

operation_obj: Long-running operation object

394

check_interval: Seconds between progress checks

395

396

Returns:

397

BatchProcessResponse: Final operation result

398

"""

399

print(f"Monitoring operation: {operation_obj.operation.name}")

400

401

while not operation_obj.done():

402

print("Operation in progress...")

403

404

# Get operation metadata for progress information

405

if operation_obj.metadata:

406

try:

407

metadata = BatchProcessMetadata.pb(operation_obj.metadata)

408

if hasattr(metadata, 'individual_process_statuses'):

409

total_docs = len(metadata.individual_process_statuses)

410

completed_docs = sum(

411

1 for status in metadata.individual_process_statuses

412

if status.status.code == 0 # OK status

413

)

414

print(f"Progress: {completed_docs}/{total_docs} documents processed")

415

except Exception as e:

416

print(f"Could not parse metadata: {e}")

417

418

time.sleep(check_interval)

419

420

# Operation completed

421

if operation_obj.exception():

422

raise operation_obj.exception()

423

424

print("Operation completed successfully!")

425

return operation_obj.result()

426

427

def get_operation_status(operation_name: str) -> dict:

428

"""

429

Get the current status of a batch operation.

430

431

Args:

432

operation_name: Name of the operation

433

434

Returns:

435

dict: Operation status information

436

"""

437

from google.api_core import operations_v1

438

from google.auth import default

439

440

credentials, project = default()

441

operations_client = operations_v1.OperationsClient(credentials=credentials)

442

443

# Get operation

444

operation_obj = operations_client.get_operation(name=operation_name)

445

446

status = {

447

"name": operation_obj.name,

448

"done": operation_obj.done,

449

"progress": {},

450

"error": None,

451

"result": None

452

}

453

454

# Parse metadata for progress

455

if operation_obj.metadata:

456

try:

457

metadata = BatchProcessMetadata.pb(operation_obj.metadata)

458

if hasattr(metadata, 'individual_process_statuses'):

459

statuses = metadata.individual_process_statuses

460

status["progress"] = {

461

"total_documents": len(statuses),

462

"completed_documents": sum(1 for s in statuses if s.status.code == 0),

463

"failed_documents": sum(1 for s in statuses if s.status.code != 0),

464

"state": str(metadata.state) if hasattr(metadata, 'state') else "RUNNING"

465

}

466

except Exception as e:

467

status["progress"]["error"] = f"Could not parse progress: {e}"

468

469

# Handle completion

470

if operation_obj.done:

471

if operation_obj.error:

472

status["error"] = {

473

"code": operation_obj.error.code,

474

"message": operation_obj.error.message

475

}

476

else:

477

status["result"] = "Operation completed successfully"

478

479

return status

480

481

# Example usage

482

def batch_with_monitoring():

483

"""Example of batch processing with monitoring."""

484

485

# Start batch operation

486

operation = execute_batch_processing(

487

project_id="my-project",

488

location="us",

489

processor_id="abc123",

490

gcs_input_uri="gs://my-bucket/input/",

491

gcs_output_uri="gs://my-bucket/output/",

492

wait_for_completion=False

493

)

494

495

# Monitor progress

496

result = monitor_batch_operation(operation, check_interval=60)

497

498

return result

499

```

500

501

### Cancel Operations

502

503

```python { .api }

504

from google.api_core import operations_v1

505

from google.auth import default

506

507

def cancel_batch_operation(operation_name: str) -> bool:

508

"""

509

Cancel a running batch processing operation.

510

511

Args:

512

operation_name: Name of the operation to cancel

513

514

Returns:

515

bool: True if cancellation was successful

516

"""

517

credentials, project = default()

518

operations_client = operations_v1.OperationsClient(credentials=credentials)

519

520

try:

521

# Cancel the operation

522

operations_client.cancel_operation(name=operation_name)

523

print(f"Cancellation requested for operation: {operation_name}")

524

525

# Verify cancellation

526

operation = operations_client.get_operation(name=operation_name)

527

if operation.done and operation.error and operation.error.code == 1: # CANCELLED

528

print("Operation cancelled successfully")

529

return True

530

else:

531

print("Operation cancellation in progress")

532

return True

533

534

except Exception as e:

535

print(f"Failed to cancel operation: {e}")

536

return False

537

538

def list_operations(project_id: str, location: str) -> list[dict]:

539

"""

540

List all operations for a project and location.

541

542

Args:

543

project_id: Google Cloud project ID

544

location: Location identifier

545

546

Returns:

547

list[dict]: List of operation information

548

"""

549

from google.api_core import operations_v1

550

from google.auth import default

551

552

credentials, project = default()

553

operations_client = operations_v1.OperationsClient(credentials=credentials)

554

555

# List operations filter by location

556

filter_str = f"name:projects/{project_id}/locations/{location}/operations/*"

557

558

operations = []

559

for operation in operations_client.list_operations(

560

name=f"projects/{project_id}/locations/{location}",

561

filter=filter_str

562

):

563

op_info = {

564

"name": operation.name,

565

"done": operation.done,

566

"error": operation.error.message if operation.error else None,

567

"metadata": operation.metadata

568

}

569

operations.append(op_info)

570

571

return operations

572

```

573

574

## Process Batch Results

575

576

### Read Batch Output

577

578

```python { .api }

579

from google.cloud import storage

580

from google.cloud.documentai.types import Document

581

import json

582

583

def read_batch_results(

584

gcs_output_uri: str,

585

operation_name: str = None

586

) -> list[Document]:

587

"""

588

Read processed documents from batch operation output.

589

590

Args:

591

gcs_output_uri: Output Cloud Storage URI

592

operation_name: Optional operation name for filtering

593

594

Returns:

595

list[Document]: List of processed documents

596

"""

597

# Parse GCS URI

598

if not gcs_output_uri.startswith("gs://"):

599

raise ValueError("Invalid GCS output URI")

600

601

parts = gcs_output_uri[5:].split("/", 1)

602

bucket_name = parts[0]

603

prefix = parts[1] if len(parts) > 1 else ""

604

605

# List output files

606

storage_client = storage.Client()

607

bucket = storage_client.bucket(bucket_name)

608

609

documents = []

610

for blob in bucket.list_blobs(prefix=prefix):

611

if blob.name.endswith('.json'):

612

# Read document JSON

613

document_json = json.loads(blob.download_as_text())

614

615

# Convert to Document object

616

document = Document.from_json(json.dumps(document_json))

617

documents.append(document)

618

619

return documents

620

621

def process_batch_results(

622

gcs_output_uri: str,

623

output_format: str = "json"

624

) -> dict:

625

"""

626

Process and summarize batch operation results.

627

628

Args:

629

gcs_output_uri: Output Cloud Storage URI

630

output_format: Output format ('json', 'summary', 'entities')

631

632

Returns:

633

dict: Processed results summary

634

"""

635

documents = read_batch_results(gcs_output_uri)

636

637

if output_format == "summary":

638

return create_batch_summary(documents)

639

elif output_format == "entities":

640

return extract_batch_entities(documents)

641

else:

642

return {"documents": [doc.to_dict() for doc in documents]}

643

644

def create_batch_summary(documents: list[Document]) -> dict:

645

"""

646

Create summary statistics for batch results.

647

648

Args:

649

documents: List of processed documents

650

651

Returns:

652

dict: Batch processing summary

653

"""

654

summary = {

655

"total_documents": len(documents),

656

"total_pages": 0,

657

"total_entities": 0,

658

"entity_types": {},

659

"documents_with_tables": 0,

660

"documents_with_forms": 0,

661

"processing_errors": 0

662

}

663

664

for doc in documents:

665

# Count pages

666

summary["total_pages"] += len(doc.pages)

667

668

# Count entities

669

summary["total_entities"] += len(doc.entities)

670

671

# Count entity types

672

for entity in doc.entities:

673

entity_type = entity.type_

674

summary["entity_types"][entity_type] = \

675

summary["entity_types"].get(entity_type, 0) + 1

676

677

# Check for tables and forms

678

has_tables = any(len(page.tables) > 0 for page in doc.pages)

679

has_forms = any(len(page.form_fields) > 0 for page in doc.pages)

680

681

if has_tables:

682

summary["documents_with_tables"] += 1

683

if has_forms:

684

summary["documents_with_forms"] += 1

685

686

# Check for processing errors

687

if doc.error and doc.error.code != 0:

688

summary["processing_errors"] += 1

689

690

return summary

691

692

def extract_batch_entities(documents: list[Document]) -> dict:

693

"""

694

Extract and organize entities from all batch documents.

695

696

Args:

697

documents: List of processed documents

698

699

Returns:

700

dict: Organized entity data

701

"""

702

entity_data = {}

703

704

for doc_idx, doc in enumerate(documents):

705

doc_entities = {}

706

707

for entity in doc.entities:

708

entity_type = entity.type_

709

710

if entity_type not in doc_entities:

711

doc_entities[entity_type] = []

712

713

entity_info = {

714

"text": entity.mention_text,

715

"confidence": entity.confidence

716

}

717

718

# Add normalized value if available

719

if entity.normalized_value:

720

if entity.normalized_value.money_value:

721

entity_info["normalized_value"] = {

722

"type": "money",

723

"currency": entity.normalized_value.money_value.currency_code,

724

"amount": float(entity.normalized_value.money_value.units)

725

}

726

elif entity.normalized_value.date_value:

727

entity_info["normalized_value"] = {

728

"type": "date",

729

"year": entity.normalized_value.date_value.year,

730

"month": entity.normalized_value.date_value.month,

731

"day": entity.normalized_value.date_value.day

732

}

733

734

doc_entities[entity_type].append(entity_info)

735

736

entity_data[f"document_{doc_idx}"] = doc_entities

737

738

return entity_data

739

```

740

741

## Batch Processing Patterns

742

743

### Scheduled Batch Processing

744

745

```python { .api }

746

import schedule

747

import time

748

from datetime import datetime

749

750

class BatchProcessor:

751

"""Scheduled batch processing manager."""

752

753

def __init__(

754

self,

755

project_id: str,

756

location: str,

757

processor_id: str,

758

input_bucket: str,

759

output_bucket: str

760

):

761

self.project_id = project_id

762

self.location = location

763

self.processor_id = processor_id

764

self.input_bucket = input_bucket

765

self.output_bucket = output_bucket

766

self.client = DocumentProcessorServiceClient()

767

768

def process_daily_documents(self):

769

"""Process documents that arrive daily."""

770

timestamp = datetime.now().strftime("%Y%m%d")

771

772

gcs_input_uri = f"gs://{self.input_bucket}/daily/{timestamp}/"

773

gcs_output_uri = f"gs://{self.output_bucket}/processed/{timestamp}/"

774

775

try:

776

operation = execute_batch_processing(

777

project_id=self.project_id,

778

location=self.location,

779

processor_id=self.processor_id,

780

gcs_input_uri=gcs_input_uri,

781

gcs_output_uri=gcs_output_uri,

782

wait_for_completion=False

783

)

784

785

print(f"Started daily processing for {timestamp}")

786

print(f"Operation: {operation.operation.name}")

787

788

except Exception as e:

789

print(f"Failed to start daily processing: {e}")

790

791

def start_scheduler(self):

792

"""Start scheduled processing."""

793

# Schedule daily processing at 2 AM

794

schedule.every().day.at("02:00").do(self.process_daily_documents)

795

796

print("Batch processor scheduler started")

797

while True:

798

schedule.run_pending()

799

time.sleep(60)

800

801

# Usage

802

processor = BatchProcessor(

803

project_id="my-project",

804

location="us",

805

processor_id="abc123",

806

input_bucket="documents-input",

807

output_bucket="documents-output"

808

)

809

810

# Start scheduled processing

811

# processor.start_scheduler()

812

```

813

814

### Error Handling and Retry Logic

815

816

```python { .api }

817

import time

818

import random

819

from typing import Optional

820

821

def robust_batch_processing(

822

project_id: str,

823

location: str,

824

processor_id: str,

825

gcs_input_uri: str,

826

gcs_output_uri: str,

827

max_retries: int = 3,

828

base_delay: int = 60

829

) -> Optional["BatchProcessResponse"]:

830

"""

831

Execute batch processing with error handling and retries.

832

833

Args:

834

project_id: Google Cloud project ID

835

location: Processor location

836

processor_id: Processor ID

837

gcs_input_uri: Input Cloud Storage URI

838

gcs_output_uri: Output Cloud Storage URI

839

max_retries: Maximum number of retry attempts

840

base_delay: Base delay in seconds for exponential backoff

841

842

Returns:

843

BatchProcessResponse: Processing result or None if failed

844

"""

845

from google.api_core.exceptions import (

846

ResourceExhausted,

847

DeadlineExceeded,

848

InternalServerError,

849

ServiceUnavailable

850

)

851

852

for attempt in range(max_retries + 1):

853

try:

854

# Execute batch processing

855

operation = execute_batch_processing(

856

project_id=project_id,

857

location=location,

858

processor_id=processor_id,

859

gcs_input_uri=gcs_input_uri,

860

gcs_output_uri=gcs_output_uri,

861

wait_for_completion=False

862

)

863

864

# Monitor operation with timeout

865

return monitor_batch_operation_with_timeout(

866

operation, timeout_hours=24

867

)

868

869

except (ResourceExhausted, ServiceUnavailable) as e:

870

if attempt < max_retries:

871

# Exponential backoff with jitter

872

delay = base_delay * (2 ** attempt) + random.randint(0, 30)

873

print(f"Rate limit/service error (attempt {attempt + 1}), retrying in {delay}s: {e}")

874

time.sleep(delay)

875

continue

876

else:

877

print(f"Failed after {max_retries} retries due to rate limiting: {e}")

878

return None

879

880

except (DeadlineExceeded, InternalServerError) as e:

881

if attempt < max_retries:

882

delay = base_delay * (2 ** attempt)

883

print(f"Timeout/internal error (attempt {attempt + 1}), retrying in {delay}s: {e}")

884

time.sleep(delay)

885

continue

886

else:

887

print(f"Failed after {max_retries} retries due to timeout: {e}")

888

return None

889

890

except Exception as e:

891

print(f"Unexpected error (non-retryable): {e}")

892

return None

893

894

return None

895

896

def monitor_batch_operation_with_timeout(

897

operation_obj: operation.Operation,

898

timeout_hours: int = 24,

899

check_interval: int = 300 # 5 minutes

900

) -> Optional["BatchProcessResponse"]:

901

"""

902

Monitor batch operation with timeout.

903

904

Args:

905

operation_obj: Long-running operation object

906

timeout_hours: Maximum hours to wait

907

check_interval: Seconds between checks

908

909

Returns:

910

BatchProcessResponse: Result or None if timeout

911

"""

912

timeout_seconds = timeout_hours * 3600

913

start_time = time.time()

914

915

while not operation_obj.done():

916

elapsed = time.time() - start_time

917

918

if elapsed > timeout_seconds:

919

print(f"Operation timed out after {timeout_hours} hours")

920

# Attempt to cancel operation

921

try:

922

cancel_batch_operation(operation_obj.operation.name)

923

except Exception as e:

924

print(f"Failed to cancel timed-out operation: {e}")

925

return None

926

927

print(f"Operation running for {elapsed/3600:.1f} hours...")

928

time.sleep(check_interval)

929

930

# Operation completed

931

if operation_obj.exception():

932

print(f"Operation failed: {operation_obj.exception()}")

933

return None

934

935

return operation_obj.result()

936

```

937

938

## Complete Batch Processing Example

939

940

```python { .api }

941

def complete_batch_processing_workflow():

942

"""

943

Complete example of batch processing workflow.

944

"""

945

# Configuration

946

project_id = "my-project"

947

location = "us"

948

processor_id = "invoice-processor-123"

949

950

input_bucket = "company-invoices"

951

output_bucket = "processed-invoices"

952

953

# Setup batch processing

954

print("=== BATCH PROCESSING WORKFLOW ===")

955

956

# 1. Prepare batch request with multiple sources

957

document_sources = [

958

{

959

"type": "gcs_prefix",

960

"gcs_prefix": f"gs://{input_bucket}/2024/invoices/",

961

"mime_types": ["application/pdf"]

962

},

963

{

964

"type": "gcs_prefix",

965

"gcs_prefix": f"gs://{input_bucket}/2024/receipts/",

966

"mime_types": ["image/jpeg", "image/png"]

967

}

968

]

969

970

processor_name = f"projects/{project_id}/locations/{location}/processors/{processor_id}"

971

gcs_output_uri = f"gs://{output_bucket}/batch-{int(time.time())}/"

972

973

# 2. Create and execute batch request

974

request = create_multi_source_batch_request(

975

processor_name=processor_name,

976

document_sources=document_sources,

977

gcs_output_uri=gcs_output_uri

978

)

979

980

client = DocumentProcessorServiceClient()

981

operation = client.batch_process_documents(request=request)

982

983

print(f"Started batch operation: {operation.operation.name}")

984

985

# 3. Monitor progress

986

result = monitor_batch_operation(operation, check_interval=60)

987

988

# 4. Process results

989

print("\n=== PROCESSING RESULTS ===")

990

991

# Get summary

992

summary = process_batch_results(gcs_output_uri, output_format="summary")

993

print(f"Processed {summary['total_documents']} documents")

994

print(f"Total pages: {summary['total_pages']}")

995

print(f"Total entities: {summary['total_entities']}")

996

print(f"Documents with tables: {summary['documents_with_tables']}")

997

print(f"Processing errors: {summary['processing_errors']}")

998

999

# Get entity breakdown

1000

print(f"\nEntity types found:")

1001

for entity_type, count in summary['entity_types'].items():

1002

print(f" {entity_type}: {count}")

1003

1004

# 5. Export results to structured format

1005

entities = process_batch_results(gcs_output_uri, output_format="entities")

1006

1007

# Save results summary

1008

results_summary = {

1009

"operation_name": operation.operation.name,

1010

"processing_time": time.time(),

1011

"input_sources": document_sources,

1012

"output_uri": gcs_output_uri,

1013

"summary": summary,

1014

"entities": entities

1015

}

1016

1017

# Upload summary to Cloud Storage

1018

summary_uri = f"{gcs_output_uri}processing_summary.json"

1019

upload_json_to_gcs(results_summary, summary_uri)

1020

1021

print(f"\nBatch processing completed!")

1022

print(f"Results available at: {gcs_output_uri}")

1023

print(f"Summary saved to: {summary_uri}")

1024

1025

return results_summary

1026

1027

def upload_json_to_gcs(data: dict, gcs_uri: str) -> None:

1028

"""Upload JSON data to Cloud Storage."""

1029

import json

1030

from google.cloud import storage

1031

1032

# Parse GCS URI

1033

parts = gcs_uri[5:].split("/", 1) # Remove gs://

1034

bucket_name = parts[0]

1035

blob_name = parts[1]

1036

1037

# Upload to Cloud Storage

1038

storage_client = storage.Client()

1039

bucket = storage_client.bucket(bucket_name)

1040

blob = bucket.blob(blob_name)

1041

1042

blob.upload_from_string(

1043

json.dumps(data, indent=2),

1044

content_type="application/json"

1045

)

1046

1047

if __name__ == "__main__":

1048

complete_batch_processing_workflow()

1049

```

1050

1051

This comprehensive guide covers all aspects of batch processing in Google Cloud Document AI, from basic operations to advanced workflows with error handling, monitoring, and result processing.