CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-documentai

Google Cloud Document AI client library for extracting structured information from documents using machine learning

Pending
Overview
Eval results
Files

batch-operations.mddocs/

Batch Operations

This guide covers batch processing capabilities in Google Cloud Document AI, including asynchronous document processing, operation monitoring, and handling large-scale document workflows.

Batch Processing Overview

Batch processing allows you to process multiple documents asynchronously, making it ideal for:

  • High-volume document processing
  • Processing large document collections
  • Scheduled document processing workflows
  • Documents stored in Cloud Storage

Key benefits:

  • Scalability: Process hundreds or thousands of documents
  • Cost-effective: Optimized pricing for bulk operations
  • Asynchronous: Non-blocking operations with progress monitoring
  • Integration: Direct Cloud Storage input/output integration

Basic Batch Processing

BatchProcessRequest Configuration

from google.cloud.documentai import DocumentProcessorServiceClient
from google.cloud.documentai.types import (
    BatchProcessRequest,
    BatchDocumentsInputConfig,
    DocumentOutputConfig,
    GcsDocuments,
    GcsDocument,
    GcsPrefix
)

def create_batch_process_request(
    processor_name: str,
    gcs_input_uri: str,
    gcs_output_uri: str,
    input_mime_type: str = "application/pdf"
) -> BatchProcessRequest:
    """
    Create a batch processing request for documents in Cloud Storage.
    
    Args:
        processor_name: Full processor resource name
        gcs_input_uri: Input Cloud Storage URI or prefix
        gcs_output_uri: Output Cloud Storage URI  
        input_mime_type: MIME type of input documents
        
    Returns:
        BatchProcessRequest: Configured batch processing request
    """
    # Configure input documents
    if gcs_input_uri.endswith('/'):
        # Process all documents with prefix
        gcs_documents = GcsDocuments(
            documents=[
                GcsDocument(
                    gcs_uri=gcs_input_uri + "*.pdf",
                    mime_type=input_mime_type
                )
            ]
        )
    else:
        # Process specific document
        gcs_documents = GcsDocuments(
            documents=[
                GcsDocument(
                    gcs_uri=gcs_input_uri,
                    mime_type=input_mime_type  
                )
            ]
        )
    
    # Configure input
    input_config = BatchDocumentsInputConfig(gcs_documents=gcs_documents)
    
    # Configure output
    gcs_output_config = {
        "gcs_uri": gcs_output_uri
    }
    output_config = DocumentOutputConfig(gcs_output_config=gcs_output_config)
    
    # Create batch request
    request = BatchProcessRequest(
        name=processor_name,
        input_documents=input_config,
        document_output_config=output_config
    )
    
    return request

Execute Batch Processing

from google.cloud.documentai import DocumentProcessorServiceClient
from google.api_core import operation
import time

def execute_batch_processing(
    project_id: str,
    location: str,
    processor_id: str,
    gcs_input_uri: str,
    gcs_output_uri: str,
    wait_for_completion: bool = False
) -> operation.Operation:
    """
    Execute batch processing operation.
    
    Args:
        project_id: Google Cloud project ID
        location: Processor location
        processor_id: Processor ID
        gcs_input_uri: Input Cloud Storage URI
        gcs_output_uri: Output Cloud Storage URI
        wait_for_completion: Whether to wait for operation completion
        
    Returns:
        Operation: Long-running operation object
    """
    client = DocumentProcessorServiceClient()
    
    # Build processor name
    processor_name = client.processor_path(project_id, location, processor_id)
    
    # Create batch request
    request = create_batch_process_request(
        processor_name=processor_name,
        gcs_input_uri=gcs_input_uri,
        gcs_output_uri=gcs_output_uri
    )
    
    print(f"Starting batch processing...")
    print(f"Input: {gcs_input_uri}")
    print(f"Output: {gcs_output_uri}")
    
    # Start batch processing
    operation = client.batch_process_documents(request=request)
    
    print(f"Operation name: {operation.operation.name}")
    
    if wait_for_completion:
        print("Waiting for operation to complete...")
        result = operation.result()
        print("Batch processing completed successfully!")
        return result
    else:
        print("Batch processing started. Use operation name to check progress.")
        return operation

# Example usage
operation = execute_batch_processing(
    project_id="my-project",
    location="us",
    processor_id="abc123",
    gcs_input_uri="gs://my-bucket/input-docs/",
    gcs_output_uri="gs://my-bucket/output/"
)

Advanced Batch Configuration

Multiple Document Sources

from google.cloud.documentai.types import (
    BatchProcessRequest,
    BatchDocumentsInputConfig,
    GcsDocuments,
    GcsDocument
)

def create_multi_source_batch_request(
    processor_name: str,
    document_sources: list[dict],
    gcs_output_uri: str
) -> BatchProcessRequest:
    """
    Create batch request with multiple document sources.
    
    Args:
        processor_name: Full processor resource name
        document_sources: List of document source configs
        gcs_output_uri: Output Cloud Storage URI
        
    Returns:
        BatchProcessRequest: Multi-source batch request
    """
    # Collect all documents from different sources
    all_documents = []
    
    for source in document_sources:
        if source["type"] == "gcs_prefix":
            # Add documents from GCS prefix
            prefix_documents = list_documents_from_gcs_prefix(
                source["gcs_prefix"],
                source.get("mime_types", ["application/pdf"])
            )
            all_documents.extend(prefix_documents)
            
        elif source["type"] == "gcs_list":
            # Add specific documents
            for doc_uri in source["gcs_uris"]:
                all_documents.append(
                    GcsDocument(
                        gcs_uri=doc_uri,
                        mime_type=source.get("mime_type", "application/pdf")
                    )
                )
    
    # Configure input
    gcs_documents = GcsDocuments(documents=all_documents)
    input_config = BatchDocumentsInputConfig(gcs_documents=gcs_documents)
    
    # Configure output  
    output_config = DocumentOutputConfig(
        gcs_output_config={"gcs_uri": gcs_output_uri}
    )
    
    return BatchProcessRequest(
        name=processor_name,
        input_documents=input_config,
        document_output_config=output_config
    )

def list_documents_from_gcs_prefix(
    gcs_prefix: str, 
    mime_types: list[str]
) -> list[GcsDocument]:
    """
    List documents from a GCS prefix.
    
    Args:
        gcs_prefix: Cloud Storage prefix
        mime_types: Allowed MIME types
        
    Returns:
        list[GcsDocument]: List of GCS documents
    """
    from google.cloud import storage
    
    # Parse GCS URI
    if not gcs_prefix.startswith("gs://"):
        raise ValueError("Invalid GCS URI")
    
    parts = gcs_prefix[5:].split("/", 1)
    bucket_name = parts[0]
    prefix = parts[1] if len(parts) > 1 else ""
    
    # List documents in bucket
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    
    documents = []
    mime_type_extensions = {
        "application/pdf": [".pdf"],
        "image/jpeg": [".jpg", ".jpeg"],
        "image/png": [".png"],
        "image/tiff": [".tiff", ".tif"]
    }
    
    for blob in bucket.list_blobs(prefix=prefix):
        # Check file extension matches allowed MIME types
        for mime_type in mime_types:
            extensions = mime_type_extensions.get(mime_type, [])
            if any(blob.name.lower().endswith(ext) for ext in extensions):
                documents.append(
                    GcsDocument(
                        gcs_uri=f"gs://{bucket_name}/{blob.name}",
                        mime_type=mime_type
                    )
                )
                break
    
    return documents

# Example usage
document_sources = [
    {
        "type": "gcs_prefix",
        "gcs_prefix": "gs://my-bucket/invoices/",
        "mime_types": ["application/pdf"]
    },
    {
        "type": "gcs_prefix", 
        "gcs_prefix": "gs://my-bucket/receipts/",
        "mime_types": ["image/jpeg", "image/png"]
    },
    {
        "type": "gcs_list",
        "gcs_uris": [
            "gs://another-bucket/doc1.pdf",
            "gs://another-bucket/doc2.pdf"
        ],
        "mime_type": "application/pdf"
    }
]

request = create_multi_source_batch_request(
    processor_name="projects/my-project/locations/us/processors/abc123",
    document_sources=document_sources,
    gcs_output_uri="gs://my-bucket/batch-output/"
)

Field Mask and Processing Options

from google.cloud.documentai.types import (
    BatchProcessRequest,
    ProcessOptions,
    OcrConfig
)
from google.protobuf.field_mask_pb2 import FieldMask

def create_batch_request_with_options(
    processor_name: str,
    gcs_input_uri: str,
    gcs_output_uri: str,
    field_mask_paths: list[str] = None,
    ocr_config: dict = None
) -> BatchProcessRequest:
    """
    Create batch request with processing options and field mask.
    
    Args:
        processor_name: Full processor resource name
        gcs_input_uri: Input Cloud Storage URI
        gcs_output_uri: Output Cloud Storage URI  
        field_mask_paths: List of field paths to return
        ocr_config: OCR configuration options
        
    Returns:
        BatchProcessRequest: Batch request with options
    """
    # Basic request setup
    request = create_batch_process_request(
        processor_name, gcs_input_uri, gcs_output_uri
    )
    
    # Add field mask if specified
    if field_mask_paths:
        request.field_mask = FieldMask(paths=field_mask_paths)
    
    # Add processing options if specified
    if ocr_config:
        ocr_options = OcrConfig(
            enable_native_pdf_parsing=ocr_config.get("enable_native_pdf_parsing", True),
            enable_image_quality_scores=ocr_config.get("enable_image_quality_scores", False),
            enable_symbol=ocr_config.get("enable_symbol", False)
        )
        
        request.process_options = ProcessOptions(ocr_config=ocr_options)
    
    return request

# Example usage with optimized processing
request = create_batch_request_with_options(
    processor_name="projects/my-project/locations/us/processors/abc123",
    gcs_input_uri="gs://my-bucket/documents/",
    gcs_output_uri="gs://my-bucket/results/",
    field_mask_paths=[
        "text",
        "entities.type_",
        "entities.mention_text", 
        "entities.confidence",
        "pages.tables"
    ],
    ocr_config={
        "enable_native_pdf_parsing": True,
        "enable_image_quality_scores": False
    }
)

Operation Management

Monitor Operation Progress

from google.api_core import operation
from google.cloud.documentai.types import BatchProcessMetadata
import time

def monitor_batch_operation(
    operation_obj: operation.Operation,
    check_interval: int = 30
) -> "BatchProcessResponse":
    """
    Monitor a batch processing operation until completion.
    
    Args:
        operation_obj: Long-running operation object
        check_interval: Seconds between progress checks
        
    Returns:
        BatchProcessResponse: Final operation result
    """
    print(f"Monitoring operation: {operation_obj.operation.name}")
    
    while not operation_obj.done():
        print("Operation in progress...")
        
        # Get operation metadata for progress information
        if operation_obj.metadata:
            try:
                metadata = BatchProcessMetadata.pb(operation_obj.metadata)
                if hasattr(metadata, 'individual_process_statuses'):
                    total_docs = len(metadata.individual_process_statuses)
                    completed_docs = sum(
                        1 for status in metadata.individual_process_statuses
                        if status.status.code == 0  # OK status
                    )
                    print(f"Progress: {completed_docs}/{total_docs} documents processed")
            except Exception as e:
                print(f"Could not parse metadata: {e}")
        
        time.sleep(check_interval)
    
    # Operation completed
    if operation_obj.exception():
        raise operation_obj.exception()
    
    print("Operation completed successfully!")
    return operation_obj.result()

def get_operation_status(operation_name: str) -> dict:
    """
    Get the current status of a batch operation.
    
    Args:
        operation_name: Name of the operation
        
    Returns:
        dict: Operation status information
    """
    from google.api_core import operations_v1
    from google.auth import default
    
    credentials, project = default()
    operations_client = operations_v1.OperationsClient(credentials=credentials)
    
    # Get operation
    operation_obj = operations_client.get_operation(name=operation_name)
    
    status = {
        "name": operation_obj.name,
        "done": operation_obj.done,
        "progress": {},
        "error": None,
        "result": None
    }
    
    # Parse metadata for progress
    if operation_obj.metadata:
        try:
            metadata = BatchProcessMetadata.pb(operation_obj.metadata)
            if hasattr(metadata, 'individual_process_statuses'):
                statuses = metadata.individual_process_statuses
                status["progress"] = {
                    "total_documents": len(statuses),
                    "completed_documents": sum(1 for s in statuses if s.status.code == 0),
                    "failed_documents": sum(1 for s in statuses if s.status.code != 0),
                    "state": str(metadata.state) if hasattr(metadata, 'state') else "RUNNING"
                }
        except Exception as e:
            status["progress"]["error"] = f"Could not parse progress: {e}"
    
    # Handle completion
    if operation_obj.done:
        if operation_obj.error:
            status["error"] = {
                "code": operation_obj.error.code,
                "message": operation_obj.error.message
            }
        else:
            status["result"] = "Operation completed successfully"
    
    return status

# Example usage
def batch_with_monitoring():
    """Example of batch processing with monitoring."""
    
    # Start batch operation
    operation = execute_batch_processing(
        project_id="my-project",
        location="us", 
        processor_id="abc123",
        gcs_input_uri="gs://my-bucket/input/",
        gcs_output_uri="gs://my-bucket/output/",
        wait_for_completion=False
    )
    
    # Monitor progress  
    result = monitor_batch_operation(operation, check_interval=60)
    
    return result

Cancel Operations

from google.api_core import operations_v1
from google.auth import default

def cancel_batch_operation(operation_name: str) -> bool:
    """
    Cancel a running batch processing operation.
    
    Args:
        operation_name: Name of the operation to cancel
        
    Returns:
        bool: True if cancellation was successful
    """
    credentials, project = default()
    operations_client = operations_v1.OperationsClient(credentials=credentials)
    
    try:
        # Cancel the operation
        operations_client.cancel_operation(name=operation_name)
        print(f"Cancellation requested for operation: {operation_name}")
        
        # Verify cancellation
        operation = operations_client.get_operation(name=operation_name)
        if operation.done and operation.error and operation.error.code == 1:  # CANCELLED
            print("Operation cancelled successfully")
            return True
        else:
            print("Operation cancellation in progress")
            return True
            
    except Exception as e:
        print(f"Failed to cancel operation: {e}")
        return False

def list_operations(project_id: str, location: str) -> list[dict]:
    """
    List all operations for a project and location.
    
    Args:
        project_id: Google Cloud project ID
        location: Location identifier
        
    Returns:
        list[dict]: List of operation information
    """
    from google.api_core import operations_v1
    from google.auth import default
    
    credentials, project = default()
    operations_client = operations_v1.OperationsClient(credentials=credentials)
    
    # List operations filter by location
    filter_str = f"name:projects/{project_id}/locations/{location}/operations/*"
    
    operations = []
    for operation in operations_client.list_operations(
        name=f"projects/{project_id}/locations/{location}",
        filter=filter_str
    ):
        op_info = {
            "name": operation.name,
            "done": operation.done,
            "error": operation.error.message if operation.error else None,
            "metadata": operation.metadata
        }
        operations.append(op_info)
    
    return operations

Process Batch Results

Read Batch Output

from google.cloud import storage
from google.cloud.documentai.types import Document
import json

def read_batch_results(
    gcs_output_uri: str,
    operation_name: str = None
) -> list[Document]:
    """
    Read processed documents from batch operation output.
    
    Args:
        gcs_output_uri: Output Cloud Storage URI
        operation_name: Optional operation name for filtering
        
    Returns:
        list[Document]: List of processed documents
    """
    # Parse GCS URI
    if not gcs_output_uri.startswith("gs://"):
        raise ValueError("Invalid GCS output URI")
    
    parts = gcs_output_uri[5:].split("/", 1)
    bucket_name = parts[0]
    prefix = parts[1] if len(parts) > 1 else ""
    
    # List output files
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    documents = []
    for blob in bucket.list_blobs(prefix=prefix):
        if blob.name.endswith('.json'):
            # Read document JSON
            document_json = json.loads(blob.download_as_text())
            
            # Convert to Document object
            document = Document.from_json(json.dumps(document_json))
            documents.append(document)
    
    return documents

def process_batch_results(
    gcs_output_uri: str,
    output_format: str = "json"
) -> dict:
    """
    Process and summarize batch operation results.
    
    Args:
        gcs_output_uri: Output Cloud Storage URI
        output_format: Output format ('json', 'summary', 'entities')
        
    Returns:
        dict: Processed results summary
    """
    documents = read_batch_results(gcs_output_uri)
    
    if output_format == "summary":
        return create_batch_summary(documents)
    elif output_format == "entities":
        return extract_batch_entities(documents)
    else:
        return {"documents": [doc.to_dict() for doc in documents]}

def create_batch_summary(documents: list[Document]) -> dict:
    """
    Create summary statistics for batch results.
    
    Args:
        documents: List of processed documents
        
    Returns:
        dict: Batch processing summary
    """
    summary = {
        "total_documents": len(documents),
        "total_pages": 0,
        "total_entities": 0,
        "entity_types": {},
        "documents_with_tables": 0,
        "documents_with_forms": 0,
        "processing_errors": 0
    }
    
    for doc in documents:
        # Count pages
        summary["total_pages"] += len(doc.pages)
        
        # Count entities
        summary["total_entities"] += len(doc.entities)
        
        # Count entity types
        for entity in doc.entities:
            entity_type = entity.type_
            summary["entity_types"][entity_type] = \
                summary["entity_types"].get(entity_type, 0) + 1
        
        # Check for tables and forms
        has_tables = any(len(page.tables) > 0 for page in doc.pages)
        has_forms = any(len(page.form_fields) > 0 for page in doc.pages)
        
        if has_tables:
            summary["documents_with_tables"] += 1
        if has_forms:
            summary["documents_with_forms"] += 1
        
        # Check for processing errors
        if doc.error and doc.error.code != 0:
            summary["processing_errors"] += 1
    
    return summary

def extract_batch_entities(documents: list[Document]) -> dict:
    """
    Extract and organize entities from all batch documents.
    
    Args:
        documents: List of processed documents
        
    Returns:
        dict: Organized entity data
    """
    entity_data = {}
    
    for doc_idx, doc in enumerate(documents):
        doc_entities = {}
        
        for entity in doc.entities:
            entity_type = entity.type_
            
            if entity_type not in doc_entities:
                doc_entities[entity_type] = []
            
            entity_info = {
                "text": entity.mention_text,
                "confidence": entity.confidence
            }
            
            # Add normalized value if available
            if entity.normalized_value:
                if entity.normalized_value.money_value:
                    entity_info["normalized_value"] = {
                        "type": "money",
                        "currency": entity.normalized_value.money_value.currency_code,
                        "amount": float(entity.normalized_value.money_value.units)
                    }
                elif entity.normalized_value.date_value:
                    entity_info["normalized_value"] = {
                        "type": "date",
                        "year": entity.normalized_value.date_value.year,
                        "month": entity.normalized_value.date_value.month,
                        "day": entity.normalized_value.date_value.day
                    }
            
            doc_entities[entity_type].append(entity_info)
        
        entity_data[f"document_{doc_idx}"] = doc_entities
    
    return entity_data

Batch Processing Patterns

Scheduled Batch Processing

import schedule
import time
from datetime import datetime

class BatchProcessor:
    """Scheduled batch processing manager."""
    
    def __init__(
        self,
        project_id: str,
        location: str,
        processor_id: str,
        input_bucket: str,
        output_bucket: str
    ):
        self.project_id = project_id
        self.location = location
        self.processor_id = processor_id
        self.input_bucket = input_bucket
        self.output_bucket = output_bucket
        self.client = DocumentProcessorServiceClient()
    
    def process_daily_documents(self):
        """Process documents that arrive daily."""
        timestamp = datetime.now().strftime("%Y%m%d")
        
        gcs_input_uri = f"gs://{self.input_bucket}/daily/{timestamp}/"
        gcs_output_uri = f"gs://{self.output_bucket}/processed/{timestamp}/"
        
        try:
            operation = execute_batch_processing(
                project_id=self.project_id,
                location=self.location,
                processor_id=self.processor_id,
                gcs_input_uri=gcs_input_uri,
                gcs_output_uri=gcs_output_uri,
                wait_for_completion=False
            )
            
            print(f"Started daily processing for {timestamp}")
            print(f"Operation: {operation.operation.name}")
            
        except Exception as e:
            print(f"Failed to start daily processing: {e}")
    
    def start_scheduler(self):
        """Start scheduled processing."""
        # Schedule daily processing at 2 AM
        schedule.every().day.at("02:00").do(self.process_daily_documents)
        
        print("Batch processor scheduler started")
        while True:
            schedule.run_pending()
            time.sleep(60)

# Usage
processor = BatchProcessor(
    project_id="my-project",
    location="us",
    processor_id="abc123",
    input_bucket="documents-input",
    output_bucket="documents-output"
)

# Start scheduled processing
# processor.start_scheduler()

Error Handling and Retry Logic

import time
import random
from typing import Optional

def robust_batch_processing(
    project_id: str,
    location: str,
    processor_id: str,
    gcs_input_uri: str,
    gcs_output_uri: str,
    max_retries: int = 3,
    base_delay: int = 60
) -> Optional["BatchProcessResponse"]:
    """
    Execute batch processing with error handling and retries.
    
    Args:
        project_id: Google Cloud project ID
        location: Processor location
        processor_id: Processor ID
        gcs_input_uri: Input Cloud Storage URI
        gcs_output_uri: Output Cloud Storage URI
        max_retries: Maximum number of retry attempts
        base_delay: Base delay in seconds for exponential backoff
        
    Returns:
        BatchProcessResponse: Processing result or None if failed
    """
    from google.api_core.exceptions import (
        ResourceExhausted,
        DeadlineExceeded,
        InternalServerError,
        ServiceUnavailable
    )
    
    for attempt in range(max_retries + 1):
        try:
            # Execute batch processing
            operation = execute_batch_processing(
                project_id=project_id,
                location=location,
                processor_id=processor_id,
                gcs_input_uri=gcs_input_uri,
                gcs_output_uri=gcs_output_uri,
                wait_for_completion=False
            )
            
            # Monitor operation with timeout
            return monitor_batch_operation_with_timeout(
                operation, timeout_hours=24
            )
            
        except (ResourceExhausted, ServiceUnavailable) as e:
            if attempt < max_retries:
                # Exponential backoff with jitter
                delay = base_delay * (2 ** attempt) + random.randint(0, 30)
                print(f"Rate limit/service error (attempt {attempt + 1}), retrying in {delay}s: {e}")
                time.sleep(delay)
                continue
            else:
                print(f"Failed after {max_retries} retries due to rate limiting: {e}")
                return None
                
        except (DeadlineExceeded, InternalServerError) as e:
            if attempt < max_retries:
                delay = base_delay * (2 ** attempt)
                print(f"Timeout/internal error (attempt {attempt + 1}), retrying in {delay}s: {e}")
                time.sleep(delay)
                continue
            else:
                print(f"Failed after {max_retries} retries due to timeout: {e}")
                return None
                
        except Exception as e:
            print(f"Unexpected error (non-retryable): {e}")
            return None
    
    return None

def monitor_batch_operation_with_timeout(
    operation_obj: operation.Operation,
    timeout_hours: int = 24,
    check_interval: int = 300  # 5 minutes
) -> Optional["BatchProcessResponse"]:
    """
    Monitor batch operation with timeout.
    
    Args:
        operation_obj: Long-running operation object
        timeout_hours: Maximum hours to wait
        check_interval: Seconds between checks
        
    Returns:
        BatchProcessResponse: Result or None if timeout
    """
    timeout_seconds = timeout_hours * 3600
    start_time = time.time()
    
    while not operation_obj.done():
        elapsed = time.time() - start_time
        
        if elapsed > timeout_seconds:
            print(f"Operation timed out after {timeout_hours} hours")
            # Attempt to cancel operation
            try:
                cancel_batch_operation(operation_obj.operation.name)
            except Exception as e:
                print(f"Failed to cancel timed-out operation: {e}")
            return None
        
        print(f"Operation running for {elapsed/3600:.1f} hours...")
        time.sleep(check_interval)
    
    # Operation completed
    if operation_obj.exception():
        print(f"Operation failed: {operation_obj.exception()}")
        return None
    
    return operation_obj.result()

Complete Batch Processing Example

def complete_batch_processing_workflow():
    """
    Complete example of batch processing workflow.
    """
    # Configuration
    project_id = "my-project"
    location = "us" 
    processor_id = "invoice-processor-123"
    
    input_bucket = "company-invoices"
    output_bucket = "processed-invoices"
    
    # Setup batch processing
    print("=== BATCH PROCESSING WORKFLOW ===")
    
    # 1. Prepare batch request with multiple sources
    document_sources = [
        {
            "type": "gcs_prefix",
            "gcs_prefix": f"gs://{input_bucket}/2024/invoices/",
            "mime_types": ["application/pdf"]
        },
        {
            "type": "gcs_prefix", 
            "gcs_prefix": f"gs://{input_bucket}/2024/receipts/",
            "mime_types": ["image/jpeg", "image/png"]
        }
    ]
    
    processor_name = f"projects/{project_id}/locations/{location}/processors/{processor_id}"
    gcs_output_uri = f"gs://{output_bucket}/batch-{int(time.time())}/"
    
    # 2. Create and execute batch request
    request = create_multi_source_batch_request(
        processor_name=processor_name,
        document_sources=document_sources,
        gcs_output_uri=gcs_output_uri
    )
    
    client = DocumentProcessorServiceClient()
    operation = client.batch_process_documents(request=request)
    
    print(f"Started batch operation: {operation.operation.name}")
    
    # 3. Monitor progress
    result = monitor_batch_operation(operation, check_interval=60)
    
    # 4. Process results
    print("\n=== PROCESSING RESULTS ===")
    
    # Get summary
    summary = process_batch_results(gcs_output_uri, output_format="summary")
    print(f"Processed {summary['total_documents']} documents")
    print(f"Total pages: {summary['total_pages']}")
    print(f"Total entities: {summary['total_entities']}")
    print(f"Documents with tables: {summary['documents_with_tables']}")
    print(f"Processing errors: {summary['processing_errors']}")
    
    # Get entity breakdown
    print(f"\nEntity types found:")
    for entity_type, count in summary['entity_types'].items():
        print(f"  {entity_type}: {count}")
    
    # 5. Export results to structured format
    entities = process_batch_results(gcs_output_uri, output_format="entities")
    
    # Save results summary
    results_summary = {
        "operation_name": operation.operation.name,
        "processing_time": time.time(),
        "input_sources": document_sources,
        "output_uri": gcs_output_uri,
        "summary": summary,
        "entities": entities
    }
    
    # Upload summary to Cloud Storage
    summary_uri = f"{gcs_output_uri}processing_summary.json"
    upload_json_to_gcs(results_summary, summary_uri)
    
    print(f"\nBatch processing completed!")
    print(f"Results available at: {gcs_output_uri}")
    print(f"Summary saved to: {summary_uri}")
    
    return results_summary

def upload_json_to_gcs(data: dict, gcs_uri: str) -> None:
    """Upload JSON data to Cloud Storage."""
    import json
    from google.cloud import storage
    
    # Parse GCS URI
    parts = gcs_uri[5:].split("/", 1)  # Remove gs://
    bucket_name = parts[0]
    blob_name = parts[1]
    
    # Upload to Cloud Storage
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    
    blob.upload_from_string(
        json.dumps(data, indent=2),
        content_type="application/json"
    )

if __name__ == "__main__":
    complete_batch_processing_workflow()

This comprehensive guide covers all aspects of batch processing in Google Cloud Document AI, from basic operations to advanced workflows with error handling, monitoring, and result processing.

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-documentai

docs

batch-operations.md

beta-features.md

document-processing.md

document-types.md

index.md

processor-management.md

tile.json