CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-visionai

Google Cloud Vision AI API client library for building and deploying Vertex AI Vision applications

Pending
Overview
Eval results
Files

live-video-analytics.mddocs/

Live Video Analytics

Real-time video analysis capabilities with custom operators, analysis definitions, and batch processing workflows. The LiveVideoAnalytics service provides the foundation for building sophisticated video understanding applications with ML-powered processing pipelines.

Capabilities

Analysis Management

Create and manage video analysis definitions that specify processing workflows, operators, and output configurations.

def list_analyses(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListAnalysesResponse:
    """
    Lists analyses in a project and location.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        page_size (int): Maximum number of analyses to return
        page_token (str): Token for pagination
        filter (str): Filter expression for analyses
        order_by (str): Sort order for results
        
    Returns:
        ListAnalysesResponse: Response with analyses and pagination
    """

def get_analysis(self, name: str) -> Analysis:
    """
    Retrieves a specific analysis configuration.
    
    Args:
        name (str): Required. Analysis resource path
                   "projects/{project}/locations/{location}/analyses/{analysis}"
    
    Returns:
        Analysis: The analysis resource with configuration
    """

def create_analysis(self, parent: str, analysis: Analysis, analysis_id: str) -> Operation:
    """
    Creates a new video analysis definition.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        analysis (Analysis): Required. Analysis configuration
        analysis_id (str): Required. ID for the new analysis
    
    Returns:
        Operation: Long-running operation for analysis creation
    """

def update_analysis(self, analysis: Analysis, update_mask: FieldMask = None) -> Operation:
    """
    Updates an existing analysis configuration.
    
    Args:
        analysis (Analysis): Required. Updated analysis configuration
        update_mask (FieldMask): Fields to update
    
    Returns:
        Operation: Long-running operation for analysis update
    """

def delete_analysis(self, name: str) -> Operation:
    """
    Deletes a video analysis definition.
    
    Args:
        name (str): Required. Analysis resource path to delete
    
    Returns:
        Operation: Long-running operation for deletion
    """

Operator Management

Manage custom and public operators that perform specific video analysis tasks within processing pipelines.

def list_operators(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListOperatorsResponse:
    """
    Lists user-created operators.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        page_size (int): Maximum number of operators to return
        page_token (str): Token for pagination
        filter (str): Filter expression for operators
        order_by (str): Sort order for results
        
    Returns:
        ListOperatorsResponse: Response with operators and pagination
    """

def list_public_operators(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListPublicOperatorsResponse:
    """
    Lists public operators available for use.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        page_size (int): Maximum number to return
        page_token (str): Token for pagination
        filter (str): Filter expression
        order_by (str): Sort order for results
        
    Returns:
        ListPublicOperatorsResponse: Response with public operators
    """

def resolve_operator_info(self, parent: str, queries: List[OperatorQuery]) -> ResolveOperatorInfoResponse:
    """
    Resolves detailed information about operators.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        queries (List[OperatorQuery]): Operator queries to resolve
    
    Returns:
        ResolveOperatorInfoResponse: Resolved operator information
    """

def get_operator(self, name: str) -> Operator:
    """
    Retrieves operator details and configuration.
    
    Args:
        name (str): Required. Operator resource path
                   "projects/{project}/locations/{location}/operators/{operator}"
    
    Returns:
        Operator: The operator resource with configuration
    """

def create_operator(self, parent: str, operator: Operator, operator_id: str) -> Operation:
    """
    Creates a custom operator for video analysis.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        operator (Operator): Required. Operator configuration
        operator_id (str): Required. ID for the new operator
    
    Returns:
        Operation: Long-running operation for operator creation
    """

def update_operator(self, operator: Operator, update_mask: FieldMask = None) -> Operation:
    """
    Updates operator configuration.
    
    Args:
        operator (Operator): Required. Updated operator configuration
        update_mask (FieldMask): Fields to update
    
    Returns:
        Operation: Long-running operation for operator update
    """

def delete_operator(self, name: str) -> Operation:
    """
    Deletes a custom operator.
    
    Args:
        name (str): Required. Operator resource path to delete
    
    Returns:
        Operation: Long-running operation for operator deletion
    """

Process Management

Manage analysis processes that execute video analysis workflows on specific data sources with batch and streaming modes.

def list_processes(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListProcessesResponse:
    """
    Lists processes in a project and location.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        page_size (int): Maximum number of processes to return
        page_token (str): Token for pagination
        filter (str): Filter expression for processes
        order_by (str): Sort order for results
        
    Returns:
        ListProcessesResponse: Response with processes and pagination
    """

def get_process(self, name: str) -> Process:
    """
    Retrieves process details and status.
    
    Args:
        name (str): Required. Process resource path
                   "projects/{project}/locations/{location}/processes/{process}"
    
    Returns:
        Process: The process resource with configuration and status
    """

def create_process(self, parent: str, process: Process, process_id: str) -> Operation:
    """
    Creates a new analysis process.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        process (Process): Required. Process configuration
        process_id (str): Required. ID for the new process
    
    Returns:
        Operation: Long-running operation for process creation
    """

def update_process(self, process: Process, update_mask: FieldMask = None) -> Operation:
    """
    Updates process configuration.
    
    Args:
        process (Process): Required. Updated process configuration
        update_mask (FieldMask): Fields to update
    
    Returns:
        Operation: Long-running operation for process update
    """

def delete_process(self, name: str) -> Operation:
    """
    Deletes an analysis process.
    
    Args:
        name (str): Required. Process resource path to delete
    
    Returns:
        Operation: Long-running operation for process deletion
    """

def batch_run_process(self, parent: str, requests: List[CreateProcessRequest]) -> Operation:
    """
    Runs multiple processes in batch mode.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        requests (List[CreateProcessRequest]): Process creation requests to execute
    
    Returns:
        Operation: Long-running operation for batch process execution
    """

Types

Analysis Resources

class Analysis:
    """Video analysis definition and configuration."""
    name: str  # Resource name
    analysis_definition: AnalysisDefinition  # Analysis workflow definition
    input_streams_mapping: Dict[str, str]  # Input stream mappings
    output_streams_mapping: Dict[str, str]  # Output stream mappings
    disable_event_watch: bool  # Disable event watching
    create_time: Timestamp  # Creation timestamp
    update_time: Timestamp  # Last update timestamp
    labels: Dict[str, str]  # Resource labels

class AnalysisDefinition:
    """Definition of analysis workflow and operators."""
    analyzers: List[AnalyzerDefinition]  # Analyzer configurations
    
class AnalyzerDefinition:
    """Individual analyzer configuration within analysis."""
    analyzer: str  # Analyzer identifier
    operator: str  # Operator resource name
    inputs: List[AnalyzerInput]  # Input configurations
    attrs: Dict[str, AttributeValue]  # Analyzer attributes
    debug_options: AnalyzerDebugOptions  # Debug configuration

class AnalyzerInput:
    """Input configuration for analyzer."""
    input: str  # Input identifier
    
class AttributeValue:
    """Attribute value for analyzer configuration."""
    # Union field oneof value:
    i: int  # Integer value
    f: float  # Float value  
    b: bool  # Boolean value
    s: bytes  # String/bytes value

class AnalyzerDebugOptions:
    """Debug options for analyzer."""
    environment_variables: Dict[str, str]  # Environment variables

Operator Resources

class Operator:
    """Custom operator for video analysis."""
    name: str  # Resource name
    operator_definition: OperatorDefinition  # Operator implementation
    docker_image: str  # Docker image for operator
    create_time: Timestamp  # Creation timestamp
    update_time: Timestamp  # Last update timestamp
    labels: Dict[str, str]  # Resource labels

class OperatorDefinition:
    """Definition of operator implementation."""
    operator: str  # Operator identifier
    input_args: List[OperatorInputArg]  # Input argument definitions
    output_args: List[OperatorOutputArg]  # Output argument definitions
    attrs: List[OperatorAttribute]  # Operator attributes
    resources: ResourceSpecification  # Resource requirements
    short_description: str  # Brief description
    description: str  # Detailed description

class OperatorInputArg:
    """Input argument definition for operator.""" 
    argument: str  # Argument name
    type: str  # Argument type
    
class OperatorOutputArg:
    """Output argument definition for operator."""
    argument: str  # Argument name  
    type: str  # Argument type

class OperatorAttribute:
    """Attribute definition for operator."""
    attribute: str  # Attribute name
    type: str  # Attribute type
    default_value: AttributeValue  # Default value

class OperatorQuery:
    """Query for resolving operator information."""
    operator: str  # Operator identifier to query
    tag: str  # Operator version tag
    registry: Registry  # Registry containing operator

class Registry:
    """Registry configuration for operators."""
    # Union field oneof registry:
    public_registry: PublicRegistry  # Public registry
    private_registry: PrivateRegistry  # Private registry

class PublicRegistry:
    """Public operator registry configuration."""
    pass

class PrivateRegistry:
    """Private operator registry configuration."""  
    pass

Process Resources

class Process:
    """Analysis process configuration and status."""
    name: str  # Resource name
    analysis: str  # Analysis resource name
    attribute_overrides: List[str]  # Attribute overrides
    run_status: RunStatus  # Current run status
    run_mode: RunMode  # Execution mode
    event_id: str  # Associated event ID
    batch_id: str  # Batch execution ID  
    retry_count: int  # Number of retries
    create_time: Timestamp  # Creation timestamp
    update_time: Timestamp  # Last update timestamp
    labels: Dict[str, str]  # Resource labels

class CreateProcessRequest:
    """Request for creating a process."""
    parent: str  # Parent resource path
    process_id: str  # Process identifier
    process: Process  # Process configuration

class ResourceSpecification:
    """Resource requirements specification."""
    resource_type: ResourceSpecificationType  # Type of resource
    # Union field oneof resource:
    cpu: ResourceSpecificationCPU  # CPU resources
    gpu: ResourceSpecificationGPU  # GPU resources
    
class ResourceSpecificationCPU:
    """CPU resource specification."""
    cpu_limit: float  # CPU limit
    memory_limit: int  # Memory limit in bytes

class ResourceSpecificationGPU:
    """GPU resource specification."""
    gpu_limit: int  # Number of GPUs
    gpu_type: str  # GPU type specification

Enums

class RunStatus(Enum):
    """Process execution status."""
    RUN_STATUS_UNSPECIFIED = 0
    INITIALIZING = 1  # Process initializing
    RUNNING = 2  # Process running
    COMPLETED = 3  # Process completed successfully
    FAILED = 4  # Process failed
    PENDING = 5  # Process pending execution
    CANCELLING = 6  # Process being cancelled
    CANCELLED = 7  # Process cancelled

class RunMode(Enum):
    """Process execution mode."""
    RUN_MODE_UNSPECIFIED = 0  
    LIVE = 1  # Live/streaming mode
    OFFLINE = 2  # Batch/offline mode

class ResourceSpecificationType(Enum):
    """Resource specification types."""
    RESOURCE_SPECIFICATION_TYPE_UNSPECIFIED = 0
    CPU = 1  # CPU resources
    GPU = 2  # GPU resources
    CUSTOM = 3  # Custom resource type

Usage Examples

Creating Video Analysis Pipeline

from google.cloud import visionai_v1

# Create client
client = visionai_v1.LiveVideoAnalyticsClient()

# Define paths
parent = "projects/my-project/locations/us-central1"

# Create analysis definition
analysis_def = visionai_v1.AnalysisDefinition(
    analyzers=[
        visionai_v1.AnalyzerDefinition(
            analyzer="object-detector",
            operator="google/object_detection:latest",
            inputs=[
                visionai_v1.AnalyzerInput(input="video-stream")
            ],
            attrs={
                "confidence_threshold": visionai_v1.AttributeValue(f=0.7),
                "max_detections": visionai_v1.AttributeValue(i=100)
            }
        ),
        visionai_v1.AnalyzerDefinition(
            analyzer="person-blur",
            operator="google/person_blur:latest", 
            inputs=[
                visionai_v1.AnalyzerInput(input="video-stream")
            ]
        )
    ]
)

# Create analysis
analysis = visionai_v1.Analysis(
    analysis_definition=analysis_def,
    input_streams_mapping={
        "video-stream": "projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1"
    },
    output_streams_mapping={
        "analyzed-stream": "projects/my-project/locations/us-central1/clusters/my-cluster/streams/output"
    }
)

create_op = client.create_analysis(
    parent=parent,
    analysis=analysis,
    analysis_id="security-analysis"
)

created_analysis = create_op.result()

Custom Operator Creation

# Define custom operator
operator_def = visionai_v1.OperatorDefinition(
    operator="my-custom-detector",
    input_args=[
        visionai_v1.OperatorInputArg(
            argument="video",
            type="VIDEO_STREAM"
        )
    ],
    output_args=[
        visionai_v1.OperatorOutputArg(
            argument="detections",
            type="OBJECT_DETECTION_PREDICTION"
        )
    ],
    attrs=[
        visionai_v1.OperatorAttribute(
            attribute="model_path",
            type="STRING",
            default_value=visionai_v1.AttributeValue(s=b"/models/detector.pb")
        )
    ],
    resources=visionai_v1.ResourceSpecification(
        resource_type=visionai_v1.ResourceSpecificationType.GPU,
        gpu=visionai_v1.ResourceSpecificationGPU(
            gpu_limit=1,
            gpu_type="nvidia-tesla-t4"
        )
    ),
    short_description="Custom object detector",
    description="Custom trained object detection model"
)

# Create operator
operator = visionai_v1.Operator(
    operator_definition=operator_def,
    docker_image="gcr.io/my-project/custom-detector:v1.0"
)

create_op = client.create_operator(
    parent=parent,
    operator=operator,
    operator_id="custom-detector"
)

created_operator = create_op.result()

Batch Process Execution

# Create multiple processes for batch execution
process_requests = [
    visionai_v1.CreateProcessRequest(
        parent=parent,
        process_id=f"batch-process-{i}",
        process=visionai_v1.Process(
            analysis=f"{parent}/analyses/security-analysis",
            run_mode=visionai_v1.RunMode.OFFLINE,
            batch_id="batch-001"
        )
    ) 
    for i in range(5)
]

# Execute batch
batch_op = client.batch_run_process(
    parent=parent,
    requests=process_requests
)

batch_result = batch_op.result()

# Monitor process status
for request in process_requests:
    process_name = f"{parent}/processes/{request.process_id}"
    process = client.get_process(name=process_name)
    print(f"Process {request.process_id}: {process.run_status}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-visionai

docs

app-platform.md

health-check.md

index.md

live-video-analytics.md

streaming.md

streams-management.md

types.md

warehouse.md

tile.json