0
# Batch Operations
1
2
This guide covers batch processing capabilities in Google Cloud Document AI, including asynchronous document processing, operation monitoring, and handling large-scale document workflows.
3
4
## Batch Processing Overview
5
6
Batch processing allows you to process multiple documents asynchronously, making it ideal for:
7
- High-volume document processing
8
- Processing large document collections
9
- Scheduled document processing workflows
10
- Documents stored in Cloud Storage
11
12
Key benefits:
13
- **Scalability**: Process hundreds or thousands of documents
14
- **Cost-effective**: Optimized pricing for bulk operations
15
- **Asynchronous**: Non-blocking operations with progress monitoring
16
- **Integration**: Direct Cloud Storage input/output integration
17
18
## Basic Batch Processing
19
20
### BatchProcessRequest Configuration
21
22
```python { .api }
23
from google.cloud.documentai import DocumentProcessorServiceClient
24
from google.cloud.documentai.types import (
25
BatchProcessRequest,
26
BatchDocumentsInputConfig,
27
DocumentOutputConfig,
28
GcsDocuments,
29
GcsDocument,
30
GcsPrefix
31
)
32
33
def create_batch_process_request(
34
processor_name: str,
35
gcs_input_uri: str,
36
gcs_output_uri: str,
37
input_mime_type: str = "application/pdf"
38
) -> BatchProcessRequest:
39
"""
40
Create a batch processing request for documents in Cloud Storage.
41
42
Args:
43
processor_name: Full processor resource name
44
gcs_input_uri: Input Cloud Storage URI or prefix
45
gcs_output_uri: Output Cloud Storage URI
46
input_mime_type: MIME type of input documents
47
48
Returns:
49
BatchProcessRequest: Configured batch processing request
50
"""
51
# Configure input documents
52
if gcs_input_uri.endswith('/'):
53
# Process all documents with prefix
54
gcs_documents = GcsDocuments(
55
documents=[
56
GcsDocument(
57
gcs_uri=gcs_input_uri + "*.pdf",
58
mime_type=input_mime_type
59
)
60
]
61
)
62
else:
63
# Process specific document
64
gcs_documents = GcsDocuments(
65
documents=[
66
GcsDocument(
67
gcs_uri=gcs_input_uri,
68
mime_type=input_mime_type
69
)
70
]
71
)
72
73
# Configure input
74
input_config = BatchDocumentsInputConfig(gcs_documents=gcs_documents)
75
76
# Configure output
77
gcs_output_config = {
78
"gcs_uri": gcs_output_uri
79
}
80
output_config = DocumentOutputConfig(gcs_output_config=gcs_output_config)
81
82
# Create batch request
83
request = BatchProcessRequest(
84
name=processor_name,
85
input_documents=input_config,
86
document_output_config=output_config
87
)
88
89
return request
90
```
91
92
### Execute Batch Processing
93
94
```python { .api }
95
from google.cloud.documentai import DocumentProcessorServiceClient
96
from google.api_core import operation
97
import time
98
99
def execute_batch_processing(
100
project_id: str,
101
location: str,
102
processor_id: str,
103
gcs_input_uri: str,
104
gcs_output_uri: str,
105
wait_for_completion: bool = False
106
) -> operation.Operation:
107
"""
108
Execute batch processing operation.
109
110
Args:
111
project_id: Google Cloud project ID
112
location: Processor location
113
processor_id: Processor ID
114
gcs_input_uri: Input Cloud Storage URI
115
gcs_output_uri: Output Cloud Storage URI
116
wait_for_completion: Whether to wait for operation completion
117
118
Returns:
119
Operation: Long-running operation object
120
"""
121
client = DocumentProcessorServiceClient()
122
123
# Build processor name
124
processor_name = client.processor_path(project_id, location, processor_id)
125
126
# Create batch request
127
request = create_batch_process_request(
128
processor_name=processor_name,
129
gcs_input_uri=gcs_input_uri,
130
gcs_output_uri=gcs_output_uri
131
)
132
133
print(f"Starting batch processing...")
134
print(f"Input: {gcs_input_uri}")
135
print(f"Output: {gcs_output_uri}")
136
137
# Start batch processing
138
operation = client.batch_process_documents(request=request)
139
140
print(f"Operation name: {operation.operation.name}")
141
142
if wait_for_completion:
143
print("Waiting for operation to complete...")
144
result = operation.result()
145
print("Batch processing completed successfully!")
146
return result
147
else:
148
print("Batch processing started. Use operation name to check progress.")
149
return operation
150
151
# Example usage
152
operation = execute_batch_processing(
153
project_id="my-project",
154
location="us",
155
processor_id="abc123",
156
gcs_input_uri="gs://my-bucket/input-docs/",
157
gcs_output_uri="gs://my-bucket/output/"
158
)
159
```
160
161
## Advanced Batch Configuration
162
163
### Multiple Document Sources
164
165
```python { .api }
166
from google.cloud.documentai.types import (
167
BatchProcessRequest,
168
BatchDocumentsInputConfig,
169
GcsDocuments,
170
GcsDocument
171
)
172
173
def create_multi_source_batch_request(
174
processor_name: str,
175
document_sources: list[dict],
176
gcs_output_uri: str
177
) -> BatchProcessRequest:
178
"""
179
Create batch request with multiple document sources.
180
181
Args:
182
processor_name: Full processor resource name
183
document_sources: List of document source configs
184
gcs_output_uri: Output Cloud Storage URI
185
186
Returns:
187
BatchProcessRequest: Multi-source batch request
188
"""
189
# Collect all documents from different sources
190
all_documents = []
191
192
for source in document_sources:
193
if source["type"] == "gcs_prefix":
194
# Add documents from GCS prefix
195
prefix_documents = list_documents_from_gcs_prefix(
196
source["gcs_prefix"],
197
source.get("mime_types", ["application/pdf"])
198
)
199
all_documents.extend(prefix_documents)
200
201
elif source["type"] == "gcs_list":
202
# Add specific documents
203
for doc_uri in source["gcs_uris"]:
204
all_documents.append(
205
GcsDocument(
206
gcs_uri=doc_uri,
207
mime_type=source.get("mime_type", "application/pdf")
208
)
209
)
210
211
# Configure input
212
gcs_documents = GcsDocuments(documents=all_documents)
213
input_config = BatchDocumentsInputConfig(gcs_documents=gcs_documents)
214
215
# Configure output
216
output_config = DocumentOutputConfig(
217
gcs_output_config={"gcs_uri": gcs_output_uri}
218
)
219
220
return BatchProcessRequest(
221
name=processor_name,
222
input_documents=input_config,
223
document_output_config=output_config
224
)
225
226
def list_documents_from_gcs_prefix(
227
gcs_prefix: str,
228
mime_types: list[str]
229
) -> list[GcsDocument]:
230
"""
231
List documents from a GCS prefix.
232
233
Args:
234
gcs_prefix: Cloud Storage prefix
235
mime_types: Allowed MIME types
236
237
Returns:
238
list[GcsDocument]: List of GCS documents
239
"""
240
from google.cloud import storage
241
242
# Parse GCS URI
243
if not gcs_prefix.startswith("gs://"):
244
raise ValueError("Invalid GCS URI")
245
246
parts = gcs_prefix[5:].split("/", 1)
247
bucket_name = parts[0]
248
prefix = parts[1] if len(parts) > 1 else ""
249
250
# List documents in bucket
251
client = storage.Client()
252
bucket = client.bucket(bucket_name)
253
254
documents = []
255
mime_type_extensions = {
256
"application/pdf": [".pdf"],
257
"image/jpeg": [".jpg", ".jpeg"],
258
"image/png": [".png"],
259
"image/tiff": [".tiff", ".tif"]
260
}
261
262
for blob in bucket.list_blobs(prefix=prefix):
263
# Check file extension matches allowed MIME types
264
for mime_type in mime_types:
265
extensions = mime_type_extensions.get(mime_type, [])
266
if any(blob.name.lower().endswith(ext) for ext in extensions):
267
documents.append(
268
GcsDocument(
269
gcs_uri=f"gs://{bucket_name}/{blob.name}",
270
mime_type=mime_type
271
)
272
)
273
break
274
275
return documents
276
277
# Example usage
278
document_sources = [
279
{
280
"type": "gcs_prefix",
281
"gcs_prefix": "gs://my-bucket/invoices/",
282
"mime_types": ["application/pdf"]
283
},
284
{
285
"type": "gcs_prefix",
286
"gcs_prefix": "gs://my-bucket/receipts/",
287
"mime_types": ["image/jpeg", "image/png"]
288
},
289
{
290
"type": "gcs_list",
291
"gcs_uris": [
292
"gs://another-bucket/doc1.pdf",
293
"gs://another-bucket/doc2.pdf"
294
],
295
"mime_type": "application/pdf"
296
}
297
]
298
299
request = create_multi_source_batch_request(
300
processor_name="projects/my-project/locations/us/processors/abc123",
301
document_sources=document_sources,
302
gcs_output_uri="gs://my-bucket/batch-output/"
303
)
304
```
305
306
### Field Mask and Processing Options
307
308
```python { .api }
309
from google.cloud.documentai.types import (
310
BatchProcessRequest,
311
ProcessOptions,
312
OcrConfig
313
)
314
from google.protobuf.field_mask_pb2 import FieldMask
315
316
def create_batch_request_with_options(
317
processor_name: str,
318
gcs_input_uri: str,
319
gcs_output_uri: str,
320
field_mask_paths: list[str] = None,
321
ocr_config: dict = None
322
) -> BatchProcessRequest:
323
"""
324
Create batch request with processing options and field mask.
325
326
Args:
327
processor_name: Full processor resource name
328
gcs_input_uri: Input Cloud Storage URI
329
gcs_output_uri: Output Cloud Storage URI
330
field_mask_paths: List of field paths to return
331
ocr_config: OCR configuration options
332
333
Returns:
334
BatchProcessRequest: Batch request with options
335
"""
336
# Basic request setup
337
request = create_batch_process_request(
338
processor_name, gcs_input_uri, gcs_output_uri
339
)
340
341
# Add field mask if specified
342
if field_mask_paths:
343
request.field_mask = FieldMask(paths=field_mask_paths)
344
345
# Add processing options if specified
346
if ocr_config:
347
ocr_options = OcrConfig(
348
enable_native_pdf_parsing=ocr_config.get("enable_native_pdf_parsing", True),
349
enable_image_quality_scores=ocr_config.get("enable_image_quality_scores", False),
350
enable_symbol=ocr_config.get("enable_symbol", False)
351
)
352
353
request.process_options = ProcessOptions(ocr_config=ocr_options)
354
355
return request
356
357
# Example usage with optimized processing
358
request = create_batch_request_with_options(
359
processor_name="projects/my-project/locations/us/processors/abc123",
360
gcs_input_uri="gs://my-bucket/documents/",
361
gcs_output_uri="gs://my-bucket/results/",
362
field_mask_paths=[
363
"text",
364
"entities.type_",
365
"entities.mention_text",
366
"entities.confidence",
367
"pages.tables"
368
],
369
ocr_config={
370
"enable_native_pdf_parsing": True,
371
"enable_image_quality_scores": False
372
}
373
)
374
```
375
376
## Operation Management
377
378
### Monitor Operation Progress
379
380
```python { .api }
381
from google.api_core import operation
382
from google.cloud.documentai.types import BatchProcessMetadata
383
import time
384
385
def monitor_batch_operation(
386
operation_obj: operation.Operation,
387
check_interval: int = 30
388
) -> "BatchProcessResponse":
389
"""
390
Monitor a batch processing operation until completion.
391
392
Args:
393
operation_obj: Long-running operation object
394
check_interval: Seconds between progress checks
395
396
Returns:
397
BatchProcessResponse: Final operation result
398
"""
399
print(f"Monitoring operation: {operation_obj.operation.name}")
400
401
while not operation_obj.done():
402
print("Operation in progress...")
403
404
# Get operation metadata for progress information
405
if operation_obj.metadata:
406
try:
407
metadata = BatchProcessMetadata.pb(operation_obj.metadata)
408
if hasattr(metadata, 'individual_process_statuses'):
409
total_docs = len(metadata.individual_process_statuses)
410
completed_docs = sum(
411
1 for status in metadata.individual_process_statuses
412
if status.status.code == 0 # OK status
413
)
414
print(f"Progress: {completed_docs}/{total_docs} documents processed")
415
except Exception as e:
416
print(f"Could not parse metadata: {e}")
417
418
time.sleep(check_interval)
419
420
# Operation completed
421
if operation_obj.exception():
422
raise operation_obj.exception()
423
424
print("Operation completed successfully!")
425
return operation_obj.result()
426
427
def get_operation_status(operation_name: str) -> dict:
428
"""
429
Get the current status of a batch operation.
430
431
Args:
432
operation_name: Name of the operation
433
434
Returns:
435
dict: Operation status information
436
"""
437
from google.api_core import operations_v1
438
from google.auth import default
439
440
credentials, project = default()
441
operations_client = operations_v1.OperationsClient(credentials=credentials)
442
443
# Get operation
444
operation_obj = operations_client.get_operation(name=operation_name)
445
446
status = {
447
"name": operation_obj.name,
448
"done": operation_obj.done,
449
"progress": {},
450
"error": None,
451
"result": None
452
}
453
454
# Parse metadata for progress
455
if operation_obj.metadata:
456
try:
457
metadata = BatchProcessMetadata.pb(operation_obj.metadata)
458
if hasattr(metadata, 'individual_process_statuses'):
459
statuses = metadata.individual_process_statuses
460
status["progress"] = {
461
"total_documents": len(statuses),
462
"completed_documents": sum(1 for s in statuses if s.status.code == 0),
463
"failed_documents": sum(1 for s in statuses if s.status.code != 0),
464
"state": str(metadata.state) if hasattr(metadata, 'state') else "RUNNING"
465
}
466
except Exception as e:
467
status["progress"]["error"] = f"Could not parse progress: {e}"
468
469
# Handle completion
470
if operation_obj.done:
471
if operation_obj.error:
472
status["error"] = {
473
"code": operation_obj.error.code,
474
"message": operation_obj.error.message
475
}
476
else:
477
status["result"] = "Operation completed successfully"
478
479
return status
480
481
# Example usage
482
def batch_with_monitoring():
483
"""Example of batch processing with monitoring."""
484
485
# Start batch operation
486
operation = execute_batch_processing(
487
project_id="my-project",
488
location="us",
489
processor_id="abc123",
490
gcs_input_uri="gs://my-bucket/input/",
491
gcs_output_uri="gs://my-bucket/output/",
492
wait_for_completion=False
493
)
494
495
# Monitor progress
496
result = monitor_batch_operation(operation, check_interval=60)
497
498
return result
499
```
500
501
### Cancel Operations
502
503
```python { .api }
504
from google.api_core import operations_v1
505
from google.auth import default
506
507
def cancel_batch_operation(operation_name: str) -> bool:
508
"""
509
Cancel a running batch processing operation.
510
511
Args:
512
operation_name: Name of the operation to cancel
513
514
Returns:
515
bool: True if cancellation was successful
516
"""
517
credentials, project = default()
518
operations_client = operations_v1.OperationsClient(credentials=credentials)
519
520
try:
521
# Cancel the operation
522
operations_client.cancel_operation(name=operation_name)
523
print(f"Cancellation requested for operation: {operation_name}")
524
525
# Verify cancellation
526
operation = operations_client.get_operation(name=operation_name)
527
if operation.done and operation.error and operation.error.code == 1: # CANCELLED
528
print("Operation cancelled successfully")
529
return True
530
else:
531
print("Operation cancellation in progress")
532
return True
533
534
except Exception as e:
535
print(f"Failed to cancel operation: {e}")
536
return False
537
538
def list_operations(project_id: str, location: str) -> list[dict]:
539
"""
540
List all operations for a project and location.
541
542
Args:
543
project_id: Google Cloud project ID
544
location: Location identifier
545
546
Returns:
547
list[dict]: List of operation information
548
"""
549
from google.api_core import operations_v1
550
from google.auth import default
551
552
credentials, project = default()
553
operations_client = operations_v1.OperationsClient(credentials=credentials)
554
555
# List operations filter by location
556
filter_str = f"name:projects/{project_id}/locations/{location}/operations/*"
557
558
operations = []
559
for operation in operations_client.list_operations(
560
name=f"projects/{project_id}/locations/{location}",
561
filter=filter_str
562
):
563
op_info = {
564
"name": operation.name,
565
"done": operation.done,
566
"error": operation.error.message if operation.error else None,
567
"metadata": operation.metadata
568
}
569
operations.append(op_info)
570
571
return operations
572
```
573
574
## Process Batch Results
575
576
### Read Batch Output
577
578
```python { .api }
579
from google.cloud import storage
580
from google.cloud.documentai.types import Document
581
import json
582
583
def read_batch_results(
584
gcs_output_uri: str,
585
operation_name: str = None
586
) -> list[Document]:
587
"""
588
Read processed documents from batch operation output.
589
590
Args:
591
gcs_output_uri: Output Cloud Storage URI
592
operation_name: Optional operation name for filtering
593
594
Returns:
595
list[Document]: List of processed documents
596
"""
597
# Parse GCS URI
598
if not gcs_output_uri.startswith("gs://"):
599
raise ValueError("Invalid GCS output URI")
600
601
parts = gcs_output_uri[5:].split("/", 1)
602
bucket_name = parts[0]
603
prefix = parts[1] if len(parts) > 1 else ""
604
605
# List output files
606
storage_client = storage.Client()
607
bucket = storage_client.bucket(bucket_name)
608
609
documents = []
610
for blob in bucket.list_blobs(prefix=prefix):
611
if blob.name.endswith('.json'):
612
# Read document JSON
613
document_json = json.loads(blob.download_as_text())
614
615
# Convert to Document object
616
document = Document.from_json(json.dumps(document_json))
617
documents.append(document)
618
619
return documents
620
621
def process_batch_results(
622
gcs_output_uri: str,
623
output_format: str = "json"
624
) -> dict:
625
"""
626
Process and summarize batch operation results.
627
628
Args:
629
gcs_output_uri: Output Cloud Storage URI
630
output_format: Output format ('json', 'summary', 'entities')
631
632
Returns:
633
dict: Processed results summary
634
"""
635
documents = read_batch_results(gcs_output_uri)
636
637
if output_format == "summary":
638
return create_batch_summary(documents)
639
elif output_format == "entities":
640
return extract_batch_entities(documents)
641
else:
642
return {"documents": [doc.to_dict() for doc in documents]}
643
644
def create_batch_summary(documents: list[Document]) -> dict:
645
"""
646
Create summary statistics for batch results.
647
648
Args:
649
documents: List of processed documents
650
651
Returns:
652
dict: Batch processing summary
653
"""
654
summary = {
655
"total_documents": len(documents),
656
"total_pages": 0,
657
"total_entities": 0,
658
"entity_types": {},
659
"documents_with_tables": 0,
660
"documents_with_forms": 0,
661
"processing_errors": 0
662
}
663
664
for doc in documents:
665
# Count pages
666
summary["total_pages"] += len(doc.pages)
667
668
# Count entities
669
summary["total_entities"] += len(doc.entities)
670
671
# Count entity types
672
for entity in doc.entities:
673
entity_type = entity.type_
674
summary["entity_types"][entity_type] = \
675
summary["entity_types"].get(entity_type, 0) + 1
676
677
# Check for tables and forms
678
has_tables = any(len(page.tables) > 0 for page in doc.pages)
679
has_forms = any(len(page.form_fields) > 0 for page in doc.pages)
680
681
if has_tables:
682
summary["documents_with_tables"] += 1
683
if has_forms:
684
summary["documents_with_forms"] += 1
685
686
# Check for processing errors
687
if doc.error and doc.error.code != 0:
688
summary["processing_errors"] += 1
689
690
return summary
691
692
def extract_batch_entities(documents: list[Document]) -> dict:
693
"""
694
Extract and organize entities from all batch documents.
695
696
Args:
697
documents: List of processed documents
698
699
Returns:
700
dict: Organized entity data
701
"""
702
entity_data = {}
703
704
for doc_idx, doc in enumerate(documents):
705
doc_entities = {}
706
707
for entity in doc.entities:
708
entity_type = entity.type_
709
710
if entity_type not in doc_entities:
711
doc_entities[entity_type] = []
712
713
entity_info = {
714
"text": entity.mention_text,
715
"confidence": entity.confidence
716
}
717
718
# Add normalized value if available
719
if entity.normalized_value:
720
if entity.normalized_value.money_value:
721
entity_info["normalized_value"] = {
722
"type": "money",
723
"currency": entity.normalized_value.money_value.currency_code,
724
"amount": float(entity.normalized_value.money_value.units)
725
}
726
elif entity.normalized_value.date_value:
727
entity_info["normalized_value"] = {
728
"type": "date",
729
"year": entity.normalized_value.date_value.year,
730
"month": entity.normalized_value.date_value.month,
731
"day": entity.normalized_value.date_value.day
732
}
733
734
doc_entities[entity_type].append(entity_info)
735
736
entity_data[f"document_{doc_idx}"] = doc_entities
737
738
return entity_data
739
```
740
741
## Batch Processing Patterns
742
743
### Scheduled Batch Processing
744
745
```python { .api }
746
import schedule
747
import time
748
from datetime import datetime
749
750
class BatchProcessor:
751
"""Scheduled batch processing manager."""
752
753
def __init__(
754
self,
755
project_id: str,
756
location: str,
757
processor_id: str,
758
input_bucket: str,
759
output_bucket: str
760
):
761
self.project_id = project_id
762
self.location = location
763
self.processor_id = processor_id
764
self.input_bucket = input_bucket
765
self.output_bucket = output_bucket
766
self.client = DocumentProcessorServiceClient()
767
768
def process_daily_documents(self):
769
"""Process documents that arrive daily."""
770
timestamp = datetime.now().strftime("%Y%m%d")
771
772
gcs_input_uri = f"gs://{self.input_bucket}/daily/{timestamp}/"
773
gcs_output_uri = f"gs://{self.output_bucket}/processed/{timestamp}/"
774
775
try:
776
operation = execute_batch_processing(
777
project_id=self.project_id,
778
location=self.location,
779
processor_id=self.processor_id,
780
gcs_input_uri=gcs_input_uri,
781
gcs_output_uri=gcs_output_uri,
782
wait_for_completion=False
783
)
784
785
print(f"Started daily processing for {timestamp}")
786
print(f"Operation: {operation.operation.name}")
787
788
except Exception as e:
789
print(f"Failed to start daily processing: {e}")
790
791
def start_scheduler(self):
792
"""Start scheduled processing."""
793
# Schedule daily processing at 2 AM
794
schedule.every().day.at("02:00").do(self.process_daily_documents)
795
796
print("Batch processor scheduler started")
797
while True:
798
schedule.run_pending()
799
time.sleep(60)
800
801
# Usage
802
processor = BatchProcessor(
803
project_id="my-project",
804
location="us",
805
processor_id="abc123",
806
input_bucket="documents-input",
807
output_bucket="documents-output"
808
)
809
810
# Start scheduled processing
811
# processor.start_scheduler()
812
```
813
814
### Error Handling and Retry Logic
815
816
```python { .api }
817
import time
818
import random
819
from typing import Optional
820
821
def robust_batch_processing(
822
project_id: str,
823
location: str,
824
processor_id: str,
825
gcs_input_uri: str,
826
gcs_output_uri: str,
827
max_retries: int = 3,
828
base_delay: int = 60
829
) -> Optional["BatchProcessResponse"]:
830
"""
831
Execute batch processing with error handling and retries.
832
833
Args:
834
project_id: Google Cloud project ID
835
location: Processor location
836
processor_id: Processor ID
837
gcs_input_uri: Input Cloud Storage URI
838
gcs_output_uri: Output Cloud Storage URI
839
max_retries: Maximum number of retry attempts
840
base_delay: Base delay in seconds for exponential backoff
841
842
Returns:
843
BatchProcessResponse: Processing result or None if failed
844
"""
845
from google.api_core.exceptions import (
846
ResourceExhausted,
847
DeadlineExceeded,
848
InternalServerError,
849
ServiceUnavailable
850
)
851
852
for attempt in range(max_retries + 1):
853
try:
854
# Execute batch processing
855
operation = execute_batch_processing(
856
project_id=project_id,
857
location=location,
858
processor_id=processor_id,
859
gcs_input_uri=gcs_input_uri,
860
gcs_output_uri=gcs_output_uri,
861
wait_for_completion=False
862
)
863
864
# Monitor operation with timeout
865
return monitor_batch_operation_with_timeout(
866
operation, timeout_hours=24
867
)
868
869
except (ResourceExhausted, ServiceUnavailable) as e:
870
if attempt < max_retries:
871
# Exponential backoff with jitter
872
delay = base_delay * (2 ** attempt) + random.randint(0, 30)
873
print(f"Rate limit/service error (attempt {attempt + 1}), retrying in {delay}s: {e}")
874
time.sleep(delay)
875
continue
876
else:
877
print(f"Failed after {max_retries} retries due to rate limiting: {e}")
878
return None
879
880
except (DeadlineExceeded, InternalServerError) as e:
881
if attempt < max_retries:
882
delay = base_delay * (2 ** attempt)
883
print(f"Timeout/internal error (attempt {attempt + 1}), retrying in {delay}s: {e}")
884
time.sleep(delay)
885
continue
886
else:
887
print(f"Failed after {max_retries} retries due to timeout: {e}")
888
return None
889
890
except Exception as e:
891
print(f"Unexpected error (non-retryable): {e}")
892
return None
893
894
return None
895
896
def monitor_batch_operation_with_timeout(
897
operation_obj: operation.Operation,
898
timeout_hours: int = 24,
899
check_interval: int = 300 # 5 minutes
900
) -> Optional["BatchProcessResponse"]:
901
"""
902
Monitor batch operation with timeout.
903
904
Args:
905
operation_obj: Long-running operation object
906
timeout_hours: Maximum hours to wait
907
check_interval: Seconds between checks
908
909
Returns:
910
BatchProcessResponse: Result or None if timeout
911
"""
912
timeout_seconds = timeout_hours * 3600
913
start_time = time.time()
914
915
while not operation_obj.done():
916
elapsed = time.time() - start_time
917
918
if elapsed > timeout_seconds:
919
print(f"Operation timed out after {timeout_hours} hours")
920
# Attempt to cancel operation
921
try:
922
cancel_batch_operation(operation_obj.operation.name)
923
except Exception as e:
924
print(f"Failed to cancel timed-out operation: {e}")
925
return None
926
927
print(f"Operation running for {elapsed/3600:.1f} hours...")
928
time.sleep(check_interval)
929
930
# Operation completed
931
if operation_obj.exception():
932
print(f"Operation failed: {operation_obj.exception()}")
933
return None
934
935
return operation_obj.result()
936
```
937
938
## Complete Batch Processing Example
939
940
```python { .api }
941
def complete_batch_processing_workflow():
942
"""
943
Complete example of batch processing workflow.
944
"""
945
# Configuration
946
project_id = "my-project"
947
location = "us"
948
processor_id = "invoice-processor-123"
949
950
input_bucket = "company-invoices"
951
output_bucket = "processed-invoices"
952
953
# Setup batch processing
954
print("=== BATCH PROCESSING WORKFLOW ===")
955
956
# 1. Prepare batch request with multiple sources
957
document_sources = [
958
{
959
"type": "gcs_prefix",
960
"gcs_prefix": f"gs://{input_bucket}/2024/invoices/",
961
"mime_types": ["application/pdf"]
962
},
963
{
964
"type": "gcs_prefix",
965
"gcs_prefix": f"gs://{input_bucket}/2024/receipts/",
966
"mime_types": ["image/jpeg", "image/png"]
967
}
968
]
969
970
processor_name = f"projects/{project_id}/locations/{location}/processors/{processor_id}"
971
gcs_output_uri = f"gs://{output_bucket}/batch-{int(time.time())}/"
972
973
# 2. Create and execute batch request
974
request = create_multi_source_batch_request(
975
processor_name=processor_name,
976
document_sources=document_sources,
977
gcs_output_uri=gcs_output_uri
978
)
979
980
client = DocumentProcessorServiceClient()
981
operation = client.batch_process_documents(request=request)
982
983
print(f"Started batch operation: {operation.operation.name}")
984
985
# 3. Monitor progress
986
result = monitor_batch_operation(operation, check_interval=60)
987
988
# 4. Process results
989
print("\n=== PROCESSING RESULTS ===")
990
991
# Get summary
992
summary = process_batch_results(gcs_output_uri, output_format="summary")
993
print(f"Processed {summary['total_documents']} documents")
994
print(f"Total pages: {summary['total_pages']}")
995
print(f"Total entities: {summary['total_entities']}")
996
print(f"Documents with tables: {summary['documents_with_tables']}")
997
print(f"Processing errors: {summary['processing_errors']}")
998
999
# Get entity breakdown
1000
print(f"\nEntity types found:")
1001
for entity_type, count in summary['entity_types'].items():
1002
print(f" {entity_type}: {count}")
1003
1004
# 5. Export results to structured format
1005
entities = process_batch_results(gcs_output_uri, output_format="entities")
1006
1007
# Save results summary
1008
results_summary = {
1009
"operation_name": operation.operation.name,
1010
"processing_time": time.time(),
1011
"input_sources": document_sources,
1012
"output_uri": gcs_output_uri,
1013
"summary": summary,
1014
"entities": entities
1015
}
1016
1017
# Upload summary to Cloud Storage
1018
summary_uri = f"{gcs_output_uri}processing_summary.json"
1019
upload_json_to_gcs(results_summary, summary_uri)
1020
1021
print(f"\nBatch processing completed!")
1022
print(f"Results available at: {gcs_output_uri}")
1023
print(f"Summary saved to: {summary_uri}")
1024
1025
return results_summary
1026
1027
def upload_json_to_gcs(data: dict, gcs_uri: str) -> None:
1028
"""Upload JSON data to Cloud Storage."""
1029
import json
1030
from google.cloud import storage
1031
1032
# Parse GCS URI
1033
parts = gcs_uri[5:].split("/", 1) # Remove gs://
1034
bucket_name = parts[0]
1035
blob_name = parts[1]
1036
1037
# Upload to Cloud Storage
1038
storage_client = storage.Client()
1039
bucket = storage_client.bucket(bucket_name)
1040
blob = bucket.blob(blob_name)
1041
1042
blob.upload_from_string(
1043
json.dumps(data, indent=2),
1044
content_type="application/json"
1045
)
1046
1047
if __name__ == "__main__":
1048
complete_batch_processing_workflow()
1049
```
1050
1051
This comprehensive guide covers all aspects of batch processing in Google Cloud Document AI, from basic operations to advanced workflows with error handling, monitoring, and result processing.