Google Cloud Document AI client library for extracting structured information from documents using machine learning
—
This guide covers batch processing capabilities in Google Cloud Document AI, including asynchronous document processing, operation monitoring, and handling large-scale document workflows.
Batch processing allows you to process multiple documents asynchronously, making it ideal for:
Key benefits:
from google.cloud.documentai import DocumentProcessorServiceClient
from google.cloud.documentai.types import (
BatchProcessRequest,
BatchDocumentsInputConfig,
DocumentOutputConfig,
GcsDocuments,
GcsDocument,
GcsPrefix
)
def create_batch_process_request(
processor_name: str,
gcs_input_uri: str,
gcs_output_uri: str,
input_mime_type: str = "application/pdf"
) -> BatchProcessRequest:
"""
Create a batch processing request for documents in Cloud Storage.
Args:
processor_name: Full processor resource name
gcs_input_uri: Input Cloud Storage URI or prefix
gcs_output_uri: Output Cloud Storage URI
input_mime_type: MIME type of input documents
Returns:
BatchProcessRequest: Configured batch processing request
"""
# Configure input documents
if gcs_input_uri.endswith('/'):
# Process all documents with prefix
gcs_documents = GcsDocuments(
documents=[
GcsDocument(
gcs_uri=gcs_input_uri + "*.pdf",
mime_type=input_mime_type
)
]
)
else:
# Process specific document
gcs_documents = GcsDocuments(
documents=[
GcsDocument(
gcs_uri=gcs_input_uri,
mime_type=input_mime_type
)
]
)
# Configure input
input_config = BatchDocumentsInputConfig(gcs_documents=gcs_documents)
# Configure output
gcs_output_config = {
"gcs_uri": gcs_output_uri
}
output_config = DocumentOutputConfig(gcs_output_config=gcs_output_config)
# Create batch request
request = BatchProcessRequest(
name=processor_name,
input_documents=input_config,
document_output_config=output_config
)
return requestfrom google.cloud.documentai import DocumentProcessorServiceClient
from google.api_core import operation
import time
def execute_batch_processing(
project_id: str,
location: str,
processor_id: str,
gcs_input_uri: str,
gcs_output_uri: str,
wait_for_completion: bool = False
) -> operation.Operation:
"""
Execute batch processing operation.
Args:
project_id: Google Cloud project ID
location: Processor location
processor_id: Processor ID
gcs_input_uri: Input Cloud Storage URI
gcs_output_uri: Output Cloud Storage URI
wait_for_completion: Whether to wait for operation completion
Returns:
Operation: Long-running operation object
"""
client = DocumentProcessorServiceClient()
# Build processor name
processor_name = client.processor_path(project_id, location, processor_id)
# Create batch request
request = create_batch_process_request(
processor_name=processor_name,
gcs_input_uri=gcs_input_uri,
gcs_output_uri=gcs_output_uri
)
print(f"Starting batch processing...")
print(f"Input: {gcs_input_uri}")
print(f"Output: {gcs_output_uri}")
# Start batch processing
operation = client.batch_process_documents(request=request)
print(f"Operation name: {operation.operation.name}")
if wait_for_completion:
print("Waiting for operation to complete...")
result = operation.result()
print("Batch processing completed successfully!")
return result
else:
print("Batch processing started. Use operation name to check progress.")
return operation
# Example usage
operation = execute_batch_processing(
project_id="my-project",
location="us",
processor_id="abc123",
gcs_input_uri="gs://my-bucket/input-docs/",
gcs_output_uri="gs://my-bucket/output/"
)from google.cloud.documentai.types import (
BatchProcessRequest,
BatchDocumentsInputConfig,
GcsDocuments,
GcsDocument
)
def create_multi_source_batch_request(
processor_name: str,
document_sources: list[dict],
gcs_output_uri: str
) -> BatchProcessRequest:
"""
Create batch request with multiple document sources.
Args:
processor_name: Full processor resource name
document_sources: List of document source configs
gcs_output_uri: Output Cloud Storage URI
Returns:
BatchProcessRequest: Multi-source batch request
"""
# Collect all documents from different sources
all_documents = []
for source in document_sources:
if source["type"] == "gcs_prefix":
# Add documents from GCS prefix
prefix_documents = list_documents_from_gcs_prefix(
source["gcs_prefix"],
source.get("mime_types", ["application/pdf"])
)
all_documents.extend(prefix_documents)
elif source["type"] == "gcs_list":
# Add specific documents
for doc_uri in source["gcs_uris"]:
all_documents.append(
GcsDocument(
gcs_uri=doc_uri,
mime_type=source.get("mime_type", "application/pdf")
)
)
# Configure input
gcs_documents = GcsDocuments(documents=all_documents)
input_config = BatchDocumentsInputConfig(gcs_documents=gcs_documents)
# Configure output
output_config = DocumentOutputConfig(
gcs_output_config={"gcs_uri": gcs_output_uri}
)
return BatchProcessRequest(
name=processor_name,
input_documents=input_config,
document_output_config=output_config
)
def list_documents_from_gcs_prefix(
gcs_prefix: str,
mime_types: list[str]
) -> list[GcsDocument]:
"""
List documents from a GCS prefix.
Args:
gcs_prefix: Cloud Storage prefix
mime_types: Allowed MIME types
Returns:
list[GcsDocument]: List of GCS documents
"""
from google.cloud import storage
# Parse GCS URI
if not gcs_prefix.startswith("gs://"):
raise ValueError("Invalid GCS URI")
parts = gcs_prefix[5:].split("/", 1)
bucket_name = parts[0]
prefix = parts[1] if len(parts) > 1 else ""
# List documents in bucket
client = storage.Client()
bucket = client.bucket(bucket_name)
documents = []
mime_type_extensions = {
"application/pdf": [".pdf"],
"image/jpeg": [".jpg", ".jpeg"],
"image/png": [".png"],
"image/tiff": [".tiff", ".tif"]
}
for blob in bucket.list_blobs(prefix=prefix):
# Check file extension matches allowed MIME types
for mime_type in mime_types:
extensions = mime_type_extensions.get(mime_type, [])
if any(blob.name.lower().endswith(ext) for ext in extensions):
documents.append(
GcsDocument(
gcs_uri=f"gs://{bucket_name}/{blob.name}",
mime_type=mime_type
)
)
break
return documents
# Example usage
document_sources = [
{
"type": "gcs_prefix",
"gcs_prefix": "gs://my-bucket/invoices/",
"mime_types": ["application/pdf"]
},
{
"type": "gcs_prefix",
"gcs_prefix": "gs://my-bucket/receipts/",
"mime_types": ["image/jpeg", "image/png"]
},
{
"type": "gcs_list",
"gcs_uris": [
"gs://another-bucket/doc1.pdf",
"gs://another-bucket/doc2.pdf"
],
"mime_type": "application/pdf"
}
]
request = create_multi_source_batch_request(
processor_name="projects/my-project/locations/us/processors/abc123",
document_sources=document_sources,
gcs_output_uri="gs://my-bucket/batch-output/"
)from google.cloud.documentai.types import (
BatchProcessRequest,
ProcessOptions,
OcrConfig
)
from google.protobuf.field_mask_pb2 import FieldMask
def create_batch_request_with_options(
processor_name: str,
gcs_input_uri: str,
gcs_output_uri: str,
field_mask_paths: list[str] = None,
ocr_config: dict = None
) -> BatchProcessRequest:
"""
Create batch request with processing options and field mask.
Args:
processor_name: Full processor resource name
gcs_input_uri: Input Cloud Storage URI
gcs_output_uri: Output Cloud Storage URI
field_mask_paths: List of field paths to return
ocr_config: OCR configuration options
Returns:
BatchProcessRequest: Batch request with options
"""
# Basic request setup
request = create_batch_process_request(
processor_name, gcs_input_uri, gcs_output_uri
)
# Add field mask if specified
if field_mask_paths:
request.field_mask = FieldMask(paths=field_mask_paths)
# Add processing options if specified
if ocr_config:
ocr_options = OcrConfig(
enable_native_pdf_parsing=ocr_config.get("enable_native_pdf_parsing", True),
enable_image_quality_scores=ocr_config.get("enable_image_quality_scores", False),
enable_symbol=ocr_config.get("enable_symbol", False)
)
request.process_options = ProcessOptions(ocr_config=ocr_options)
return request
# Example usage with optimized processing
request = create_batch_request_with_options(
processor_name="projects/my-project/locations/us/processors/abc123",
gcs_input_uri="gs://my-bucket/documents/",
gcs_output_uri="gs://my-bucket/results/",
field_mask_paths=[
"text",
"entities.type_",
"entities.mention_text",
"entities.confidence",
"pages.tables"
],
ocr_config={
"enable_native_pdf_parsing": True,
"enable_image_quality_scores": False
}
)from google.api_core import operation
from google.cloud.documentai.types import BatchProcessMetadata
import time
def monitor_batch_operation(
operation_obj: operation.Operation,
check_interval: int = 30
) -> "BatchProcessResponse":
"""
Monitor a batch processing operation until completion.
Args:
operation_obj: Long-running operation object
check_interval: Seconds between progress checks
Returns:
BatchProcessResponse: Final operation result
"""
print(f"Monitoring operation: {operation_obj.operation.name}")
while not operation_obj.done():
print("Operation in progress...")
# Get operation metadata for progress information
if operation_obj.metadata:
try:
metadata = BatchProcessMetadata.pb(operation_obj.metadata)
if hasattr(metadata, 'individual_process_statuses'):
total_docs = len(metadata.individual_process_statuses)
completed_docs = sum(
1 for status in metadata.individual_process_statuses
if status.status.code == 0 # OK status
)
print(f"Progress: {completed_docs}/{total_docs} documents processed")
except Exception as e:
print(f"Could not parse metadata: {e}")
time.sleep(check_interval)
# Operation completed
if operation_obj.exception():
raise operation_obj.exception()
print("Operation completed successfully!")
return operation_obj.result()
def get_operation_status(operation_name: str) -> dict:
"""
Get the current status of a batch operation.
Args:
operation_name: Name of the operation
Returns:
dict: Operation status information
"""
from google.api_core import operations_v1
from google.auth import default
credentials, project = default()
operations_client = operations_v1.OperationsClient(credentials=credentials)
# Get operation
operation_obj = operations_client.get_operation(name=operation_name)
status = {
"name": operation_obj.name,
"done": operation_obj.done,
"progress": {},
"error": None,
"result": None
}
# Parse metadata for progress
if operation_obj.metadata:
try:
metadata = BatchProcessMetadata.pb(operation_obj.metadata)
if hasattr(metadata, 'individual_process_statuses'):
statuses = metadata.individual_process_statuses
status["progress"] = {
"total_documents": len(statuses),
"completed_documents": sum(1 for s in statuses if s.status.code == 0),
"failed_documents": sum(1 for s in statuses if s.status.code != 0),
"state": str(metadata.state) if hasattr(metadata, 'state') else "RUNNING"
}
except Exception as e:
status["progress"]["error"] = f"Could not parse progress: {e}"
# Handle completion
if operation_obj.done:
if operation_obj.error:
status["error"] = {
"code": operation_obj.error.code,
"message": operation_obj.error.message
}
else:
status["result"] = "Operation completed successfully"
return status
# Example usage
def batch_with_monitoring():
"""Example of batch processing with monitoring."""
# Start batch operation
operation = execute_batch_processing(
project_id="my-project",
location="us",
processor_id="abc123",
gcs_input_uri="gs://my-bucket/input/",
gcs_output_uri="gs://my-bucket/output/",
wait_for_completion=False
)
# Monitor progress
result = monitor_batch_operation(operation, check_interval=60)
return resultfrom google.api_core import operations_v1
from google.auth import default
def cancel_batch_operation(operation_name: str) -> bool:
"""
Cancel a running batch processing operation.
Args:
operation_name: Name of the operation to cancel
Returns:
bool: True if cancellation was successful
"""
credentials, project = default()
operations_client = operations_v1.OperationsClient(credentials=credentials)
try:
# Cancel the operation
operations_client.cancel_operation(name=operation_name)
print(f"Cancellation requested for operation: {operation_name}")
# Verify cancellation
operation = operations_client.get_operation(name=operation_name)
if operation.done and operation.error and operation.error.code == 1: # CANCELLED
print("Operation cancelled successfully")
return True
else:
print("Operation cancellation in progress")
return True
except Exception as e:
print(f"Failed to cancel operation: {e}")
return False
def list_operations(project_id: str, location: str) -> list[dict]:
"""
List all operations for a project and location.
Args:
project_id: Google Cloud project ID
location: Location identifier
Returns:
list[dict]: List of operation information
"""
from google.api_core import operations_v1
from google.auth import default
credentials, project = default()
operations_client = operations_v1.OperationsClient(credentials=credentials)
# List operations filter by location
filter_str = f"name:projects/{project_id}/locations/{location}/operations/*"
operations = []
for operation in operations_client.list_operations(
name=f"projects/{project_id}/locations/{location}",
filter=filter_str
):
op_info = {
"name": operation.name,
"done": operation.done,
"error": operation.error.message if operation.error else None,
"metadata": operation.metadata
}
operations.append(op_info)
return operationsfrom google.cloud import storage
from google.cloud.documentai.types import Document
import json
def read_batch_results(
gcs_output_uri: str,
operation_name: str = None
) -> list[Document]:
"""
Read processed documents from batch operation output.
Args:
gcs_output_uri: Output Cloud Storage URI
operation_name: Optional operation name for filtering
Returns:
list[Document]: List of processed documents
"""
# Parse GCS URI
if not gcs_output_uri.startswith("gs://"):
raise ValueError("Invalid GCS output URI")
parts = gcs_output_uri[5:].split("/", 1)
bucket_name = parts[0]
prefix = parts[1] if len(parts) > 1 else ""
# List output files
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
documents = []
for blob in bucket.list_blobs(prefix=prefix):
if blob.name.endswith('.json'):
# Read document JSON
document_json = json.loads(blob.download_as_text())
# Convert to Document object
document = Document.from_json(json.dumps(document_json))
documents.append(document)
return documents
def process_batch_results(
gcs_output_uri: str,
output_format: str = "json"
) -> dict:
"""
Process and summarize batch operation results.
Args:
gcs_output_uri: Output Cloud Storage URI
output_format: Output format ('json', 'summary', 'entities')
Returns:
dict: Processed results summary
"""
documents = read_batch_results(gcs_output_uri)
if output_format == "summary":
return create_batch_summary(documents)
elif output_format == "entities":
return extract_batch_entities(documents)
else:
return {"documents": [doc.to_dict() for doc in documents]}
def create_batch_summary(documents: list[Document]) -> dict:
"""
Create summary statistics for batch results.
Args:
documents: List of processed documents
Returns:
dict: Batch processing summary
"""
summary = {
"total_documents": len(documents),
"total_pages": 0,
"total_entities": 0,
"entity_types": {},
"documents_with_tables": 0,
"documents_with_forms": 0,
"processing_errors": 0
}
for doc in documents:
# Count pages
summary["total_pages"] += len(doc.pages)
# Count entities
summary["total_entities"] += len(doc.entities)
# Count entity types
for entity in doc.entities:
entity_type = entity.type_
summary["entity_types"][entity_type] = \
summary["entity_types"].get(entity_type, 0) + 1
# Check for tables and forms
has_tables = any(len(page.tables) > 0 for page in doc.pages)
has_forms = any(len(page.form_fields) > 0 for page in doc.pages)
if has_tables:
summary["documents_with_tables"] += 1
if has_forms:
summary["documents_with_forms"] += 1
# Check for processing errors
if doc.error and doc.error.code != 0:
summary["processing_errors"] += 1
return summary
def extract_batch_entities(documents: list[Document]) -> dict:
"""
Extract and organize entities from all batch documents.
Args:
documents: List of processed documents
Returns:
dict: Organized entity data
"""
entity_data = {}
for doc_idx, doc in enumerate(documents):
doc_entities = {}
for entity in doc.entities:
entity_type = entity.type_
if entity_type not in doc_entities:
doc_entities[entity_type] = []
entity_info = {
"text": entity.mention_text,
"confidence": entity.confidence
}
# Add normalized value if available
if entity.normalized_value:
if entity.normalized_value.money_value:
entity_info["normalized_value"] = {
"type": "money",
"currency": entity.normalized_value.money_value.currency_code,
"amount": float(entity.normalized_value.money_value.units)
}
elif entity.normalized_value.date_value:
entity_info["normalized_value"] = {
"type": "date",
"year": entity.normalized_value.date_value.year,
"month": entity.normalized_value.date_value.month,
"day": entity.normalized_value.date_value.day
}
doc_entities[entity_type].append(entity_info)
entity_data[f"document_{doc_idx}"] = doc_entities
return entity_dataimport schedule
import time
from datetime import datetime
class BatchProcessor:
"""Scheduled batch processing manager."""
def __init__(
self,
project_id: str,
location: str,
processor_id: str,
input_bucket: str,
output_bucket: str
):
self.project_id = project_id
self.location = location
self.processor_id = processor_id
self.input_bucket = input_bucket
self.output_bucket = output_bucket
self.client = DocumentProcessorServiceClient()
def process_daily_documents(self):
"""Process documents that arrive daily."""
timestamp = datetime.now().strftime("%Y%m%d")
gcs_input_uri = f"gs://{self.input_bucket}/daily/{timestamp}/"
gcs_output_uri = f"gs://{self.output_bucket}/processed/{timestamp}/"
try:
operation = execute_batch_processing(
project_id=self.project_id,
location=self.location,
processor_id=self.processor_id,
gcs_input_uri=gcs_input_uri,
gcs_output_uri=gcs_output_uri,
wait_for_completion=False
)
print(f"Started daily processing for {timestamp}")
print(f"Operation: {operation.operation.name}")
except Exception as e:
print(f"Failed to start daily processing: {e}")
def start_scheduler(self):
"""Start scheduled processing."""
# Schedule daily processing at 2 AM
schedule.every().day.at("02:00").do(self.process_daily_documents)
print("Batch processor scheduler started")
while True:
schedule.run_pending()
time.sleep(60)
# Usage
processor = BatchProcessor(
project_id="my-project",
location="us",
processor_id="abc123",
input_bucket="documents-input",
output_bucket="documents-output"
)
# Start scheduled processing
# processor.start_scheduler()import time
import random
from typing import Optional
def robust_batch_processing(
project_id: str,
location: str,
processor_id: str,
gcs_input_uri: str,
gcs_output_uri: str,
max_retries: int = 3,
base_delay: int = 60
) -> Optional["BatchProcessResponse"]:
"""
Execute batch processing with error handling and retries.
Args:
project_id: Google Cloud project ID
location: Processor location
processor_id: Processor ID
gcs_input_uri: Input Cloud Storage URI
gcs_output_uri: Output Cloud Storage URI
max_retries: Maximum number of retry attempts
base_delay: Base delay in seconds for exponential backoff
Returns:
BatchProcessResponse: Processing result or None if failed
"""
from google.api_core.exceptions import (
ResourceExhausted,
DeadlineExceeded,
InternalServerError,
ServiceUnavailable
)
for attempt in range(max_retries + 1):
try:
# Execute batch processing
operation = execute_batch_processing(
project_id=project_id,
location=location,
processor_id=processor_id,
gcs_input_uri=gcs_input_uri,
gcs_output_uri=gcs_output_uri,
wait_for_completion=False
)
# Monitor operation with timeout
return monitor_batch_operation_with_timeout(
operation, timeout_hours=24
)
except (ResourceExhausted, ServiceUnavailable) as e:
if attempt < max_retries:
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + random.randint(0, 30)
print(f"Rate limit/service error (attempt {attempt + 1}), retrying in {delay}s: {e}")
time.sleep(delay)
continue
else:
print(f"Failed after {max_retries} retries due to rate limiting: {e}")
return None
except (DeadlineExceeded, InternalServerError) as e:
if attempt < max_retries:
delay = base_delay * (2 ** attempt)
print(f"Timeout/internal error (attempt {attempt + 1}), retrying in {delay}s: {e}")
time.sleep(delay)
continue
else:
print(f"Failed after {max_retries} retries due to timeout: {e}")
return None
except Exception as e:
print(f"Unexpected error (non-retryable): {e}")
return None
return None
def monitor_batch_operation_with_timeout(
operation_obj: operation.Operation,
timeout_hours: int = 24,
check_interval: int = 300 # 5 minutes
) -> Optional["BatchProcessResponse"]:
"""
Monitor batch operation with timeout.
Args:
operation_obj: Long-running operation object
timeout_hours: Maximum hours to wait
check_interval: Seconds between checks
Returns:
BatchProcessResponse: Result or None if timeout
"""
timeout_seconds = timeout_hours * 3600
start_time = time.time()
while not operation_obj.done():
elapsed = time.time() - start_time
if elapsed > timeout_seconds:
print(f"Operation timed out after {timeout_hours} hours")
# Attempt to cancel operation
try:
cancel_batch_operation(operation_obj.operation.name)
except Exception as e:
print(f"Failed to cancel timed-out operation: {e}")
return None
print(f"Operation running for {elapsed/3600:.1f} hours...")
time.sleep(check_interval)
# Operation completed
if operation_obj.exception():
print(f"Operation failed: {operation_obj.exception()}")
return None
return operation_obj.result()def complete_batch_processing_workflow():
"""
Complete example of batch processing workflow.
"""
# Configuration
project_id = "my-project"
location = "us"
processor_id = "invoice-processor-123"
input_bucket = "company-invoices"
output_bucket = "processed-invoices"
# Setup batch processing
print("=== BATCH PROCESSING WORKFLOW ===")
# 1. Prepare batch request with multiple sources
document_sources = [
{
"type": "gcs_prefix",
"gcs_prefix": f"gs://{input_bucket}/2024/invoices/",
"mime_types": ["application/pdf"]
},
{
"type": "gcs_prefix",
"gcs_prefix": f"gs://{input_bucket}/2024/receipts/",
"mime_types": ["image/jpeg", "image/png"]
}
]
processor_name = f"projects/{project_id}/locations/{location}/processors/{processor_id}"
gcs_output_uri = f"gs://{output_bucket}/batch-{int(time.time())}/"
# 2. Create and execute batch request
request = create_multi_source_batch_request(
processor_name=processor_name,
document_sources=document_sources,
gcs_output_uri=gcs_output_uri
)
client = DocumentProcessorServiceClient()
operation = client.batch_process_documents(request=request)
print(f"Started batch operation: {operation.operation.name}")
# 3. Monitor progress
result = monitor_batch_operation(operation, check_interval=60)
# 4. Process results
print("\n=== PROCESSING RESULTS ===")
# Get summary
summary = process_batch_results(gcs_output_uri, output_format="summary")
print(f"Processed {summary['total_documents']} documents")
print(f"Total pages: {summary['total_pages']}")
print(f"Total entities: {summary['total_entities']}")
print(f"Documents with tables: {summary['documents_with_tables']}")
print(f"Processing errors: {summary['processing_errors']}")
# Get entity breakdown
print(f"\nEntity types found:")
for entity_type, count in summary['entity_types'].items():
print(f" {entity_type}: {count}")
# 5. Export results to structured format
entities = process_batch_results(gcs_output_uri, output_format="entities")
# Save results summary
results_summary = {
"operation_name": operation.operation.name,
"processing_time": time.time(),
"input_sources": document_sources,
"output_uri": gcs_output_uri,
"summary": summary,
"entities": entities
}
# Upload summary to Cloud Storage
summary_uri = f"{gcs_output_uri}processing_summary.json"
upload_json_to_gcs(results_summary, summary_uri)
print(f"\nBatch processing completed!")
print(f"Results available at: {gcs_output_uri}")
print(f"Summary saved to: {summary_uri}")
return results_summary
def upload_json_to_gcs(data: dict, gcs_uri: str) -> None:
"""Upload JSON data to Cloud Storage."""
import json
from google.cloud import storage
# Parse GCS URI
parts = gcs_uri[5:].split("/", 1) # Remove gs://
bucket_name = parts[0]
blob_name = parts[1]
# Upload to Cloud Storage
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(
json.dumps(data, indent=2),
content_type="application/json"
)
if __name__ == "__main__":
complete_batch_processing_workflow()This comprehensive guide covers all aspects of batch processing in Google Cloud Document AI, from basic operations to advanced workflows with error handling, monitoring, and result processing.
Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-documentai