Google Cloud Vision AI API client library for building and deploying Vertex AI Vision applications
—
Real-time video analysis capabilities with custom operators, analysis definitions, and batch processing workflows. The LiveVideoAnalytics service provides the foundation for building sophisticated video understanding applications with ML-powered processing pipelines.
Create and manage video analysis definitions that specify processing workflows, operators, and output configurations.
def list_analyses(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListAnalysesResponse:
"""
Lists analyses in a project and location.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
page_size (int): Maximum number of analyses to return
page_token (str): Token for pagination
filter (str): Filter expression for analyses
order_by (str): Sort order for results
Returns:
ListAnalysesResponse: Response with analyses and pagination
"""
def get_analysis(self, name: str) -> Analysis:
"""
Retrieves a specific analysis configuration.
Args:
name (str): Required. Analysis resource path
"projects/{project}/locations/{location}/analyses/{analysis}"
Returns:
Analysis: The analysis resource with configuration
"""
def create_analysis(self, parent: str, analysis: Analysis, analysis_id: str) -> Operation:
"""
Creates a new video analysis definition.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
analysis (Analysis): Required. Analysis configuration
analysis_id (str): Required. ID for the new analysis
Returns:
Operation: Long-running operation for analysis creation
"""
def update_analysis(self, analysis: Analysis, update_mask: FieldMask = None) -> Operation:
"""
Updates an existing analysis configuration.
Args:
analysis (Analysis): Required. Updated analysis configuration
update_mask (FieldMask): Fields to update
Returns:
Operation: Long-running operation for analysis update
"""
def delete_analysis(self, name: str) -> Operation:
"""
Deletes a video analysis definition.
Args:
name (str): Required. Analysis resource path to delete
Returns:
Operation: Long-running operation for deletion
"""Manage custom and public operators that perform specific video analysis tasks within processing pipelines.
def list_operators(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListOperatorsResponse:
"""
Lists user-created operators.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
page_size (int): Maximum number of operators to return
page_token (str): Token for pagination
filter (str): Filter expression for operators
order_by (str): Sort order for results
Returns:
ListOperatorsResponse: Response with operators and pagination
"""
def list_public_operators(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListPublicOperatorsResponse:
"""
Lists public operators available for use.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
page_size (int): Maximum number to return
page_token (str): Token for pagination
filter (str): Filter expression
order_by (str): Sort order for results
Returns:
ListPublicOperatorsResponse: Response with public operators
"""
def resolve_operator_info(self, parent: str, queries: List[OperatorQuery]) -> ResolveOperatorInfoResponse:
"""
Resolves detailed information about operators.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
queries (List[OperatorQuery]): Operator queries to resolve
Returns:
ResolveOperatorInfoResponse: Resolved operator information
"""
def get_operator(self, name: str) -> Operator:
"""
Retrieves operator details and configuration.
Args:
name (str): Required. Operator resource path
"projects/{project}/locations/{location}/operators/{operator}"
Returns:
Operator: The operator resource with configuration
"""
def create_operator(self, parent: str, operator: Operator, operator_id: str) -> Operation:
"""
Creates a custom operator for video analysis.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
operator (Operator): Required. Operator configuration
operator_id (str): Required. ID for the new operator
Returns:
Operation: Long-running operation for operator creation
"""
def update_operator(self, operator: Operator, update_mask: FieldMask = None) -> Operation:
"""
Updates operator configuration.
Args:
operator (Operator): Required. Updated operator configuration
update_mask (FieldMask): Fields to update
Returns:
Operation: Long-running operation for operator update
"""
def delete_operator(self, name: str) -> Operation:
"""
Deletes a custom operator.
Args:
name (str): Required. Operator resource path to delete
Returns:
Operation: Long-running operation for operator deletion
"""Manage analysis processes that execute video analysis workflows on specific data sources with batch and streaming modes.
def list_processes(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListProcessesResponse:
"""
Lists processes in a project and location.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
page_size (int): Maximum number of processes to return
page_token (str): Token for pagination
filter (str): Filter expression for processes
order_by (str): Sort order for results
Returns:
ListProcessesResponse: Response with processes and pagination
"""
def get_process(self, name: str) -> Process:
"""
Retrieves process details and status.
Args:
name (str): Required. Process resource path
"projects/{project}/locations/{location}/processes/{process}"
Returns:
Process: The process resource with configuration and status
"""
def create_process(self, parent: str, process: Process, process_id: str) -> Operation:
"""
Creates a new analysis process.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
process (Process): Required. Process configuration
process_id (str): Required. ID for the new process
Returns:
Operation: Long-running operation for process creation
"""
def update_process(self, process: Process, update_mask: FieldMask = None) -> Operation:
"""
Updates process configuration.
Args:
process (Process): Required. Updated process configuration
update_mask (FieldMask): Fields to update
Returns:
Operation: Long-running operation for process update
"""
def delete_process(self, name: str) -> Operation:
"""
Deletes an analysis process.
Args:
name (str): Required. Process resource path to delete
Returns:
Operation: Long-running operation for process deletion
"""
def batch_run_process(self, parent: str, requests: List[CreateProcessRequest]) -> Operation:
"""
Runs multiple processes in batch mode.
Args:
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
requests (List[CreateProcessRequest]): Process creation requests to execute
Returns:
Operation: Long-running operation for batch process execution
"""class Analysis:
"""Video analysis definition and configuration."""
name: str # Resource name
analysis_definition: AnalysisDefinition # Analysis workflow definition
input_streams_mapping: Dict[str, str] # Input stream mappings
output_streams_mapping: Dict[str, str] # Output stream mappings
disable_event_watch: bool # Disable event watching
create_time: Timestamp # Creation timestamp
update_time: Timestamp # Last update timestamp
labels: Dict[str, str] # Resource labels
class AnalysisDefinition:
"""Definition of analysis workflow and operators."""
analyzers: List[AnalyzerDefinition] # Analyzer configurations
class AnalyzerDefinition:
"""Individual analyzer configuration within analysis."""
analyzer: str # Analyzer identifier
operator: str # Operator resource name
inputs: List[AnalyzerInput] # Input configurations
attrs: Dict[str, AttributeValue] # Analyzer attributes
debug_options: AnalyzerDebugOptions # Debug configuration
class AnalyzerInput:
"""Input configuration for analyzer."""
input: str # Input identifier
class AttributeValue:
"""Attribute value for analyzer configuration."""
# Union field oneof value:
i: int # Integer value
f: float # Float value
b: bool # Boolean value
s: bytes # String/bytes value
class AnalyzerDebugOptions:
"""Debug options for analyzer."""
environment_variables: Dict[str, str] # Environment variablesclass Operator:
"""Custom operator for video analysis."""
name: str # Resource name
operator_definition: OperatorDefinition # Operator implementation
docker_image: str # Docker image for operator
create_time: Timestamp # Creation timestamp
update_time: Timestamp # Last update timestamp
labels: Dict[str, str] # Resource labels
class OperatorDefinition:
"""Definition of operator implementation."""
operator: str # Operator identifier
input_args: List[OperatorInputArg] # Input argument definitions
output_args: List[OperatorOutputArg] # Output argument definitions
attrs: List[OperatorAttribute] # Operator attributes
resources: ResourceSpecification # Resource requirements
short_description: str # Brief description
description: str # Detailed description
class OperatorInputArg:
"""Input argument definition for operator."""
argument: str # Argument name
type: str # Argument type
class OperatorOutputArg:
"""Output argument definition for operator."""
argument: str # Argument name
type: str # Argument type
class OperatorAttribute:
"""Attribute definition for operator."""
attribute: str # Attribute name
type: str # Attribute type
default_value: AttributeValue # Default value
class OperatorQuery:
"""Query for resolving operator information."""
operator: str # Operator identifier to query
tag: str # Operator version tag
registry: Registry # Registry containing operator
class Registry:
"""Registry configuration for operators."""
# Union field oneof registry:
public_registry: PublicRegistry # Public registry
private_registry: PrivateRegistry # Private registry
class PublicRegistry:
"""Public operator registry configuration."""
pass
class PrivateRegistry:
"""Private operator registry configuration."""
passclass Process:
"""Analysis process configuration and status."""
name: str # Resource name
analysis: str # Analysis resource name
attribute_overrides: List[str] # Attribute overrides
run_status: RunStatus # Current run status
run_mode: RunMode # Execution mode
event_id: str # Associated event ID
batch_id: str # Batch execution ID
retry_count: int # Number of retries
create_time: Timestamp # Creation timestamp
update_time: Timestamp # Last update timestamp
labels: Dict[str, str] # Resource labels
class CreateProcessRequest:
"""Request for creating a process."""
parent: str # Parent resource path
process_id: str # Process identifier
process: Process # Process configuration
class ResourceSpecification:
"""Resource requirements specification."""
resource_type: ResourceSpecificationType # Type of resource
# Union field oneof resource:
cpu: ResourceSpecificationCPU # CPU resources
gpu: ResourceSpecificationGPU # GPU resources
class ResourceSpecificationCPU:
"""CPU resource specification."""
cpu_limit: float # CPU limit
memory_limit: int # Memory limit in bytes
class ResourceSpecificationGPU:
"""GPU resource specification."""
gpu_limit: int # Number of GPUs
gpu_type: str # GPU type specificationclass RunStatus(Enum):
"""Process execution status."""
RUN_STATUS_UNSPECIFIED = 0
INITIALIZING = 1 # Process initializing
RUNNING = 2 # Process running
COMPLETED = 3 # Process completed successfully
FAILED = 4 # Process failed
PENDING = 5 # Process pending execution
CANCELLING = 6 # Process being cancelled
CANCELLED = 7 # Process cancelled
class RunMode(Enum):
"""Process execution mode."""
RUN_MODE_UNSPECIFIED = 0
LIVE = 1 # Live/streaming mode
OFFLINE = 2 # Batch/offline mode
class ResourceSpecificationType(Enum):
"""Resource specification types."""
RESOURCE_SPECIFICATION_TYPE_UNSPECIFIED = 0
CPU = 1 # CPU resources
GPU = 2 # GPU resources
CUSTOM = 3 # Custom resource typefrom google.cloud import visionai_v1
# Create client
client = visionai_v1.LiveVideoAnalyticsClient()
# Define paths
parent = "projects/my-project/locations/us-central1"
# Create analysis definition
analysis_def = visionai_v1.AnalysisDefinition(
analyzers=[
visionai_v1.AnalyzerDefinition(
analyzer="object-detector",
operator="google/object_detection:latest",
inputs=[
visionai_v1.AnalyzerInput(input="video-stream")
],
attrs={
"confidence_threshold": visionai_v1.AttributeValue(f=0.7),
"max_detections": visionai_v1.AttributeValue(i=100)
}
),
visionai_v1.AnalyzerDefinition(
analyzer="person-blur",
operator="google/person_blur:latest",
inputs=[
visionai_v1.AnalyzerInput(input="video-stream")
]
)
]
)
# Create analysis
analysis = visionai_v1.Analysis(
analysis_definition=analysis_def,
input_streams_mapping={
"video-stream": "projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1"
},
output_streams_mapping={
"analyzed-stream": "projects/my-project/locations/us-central1/clusters/my-cluster/streams/output"
}
)
create_op = client.create_analysis(
parent=parent,
analysis=analysis,
analysis_id="security-analysis"
)
created_analysis = create_op.result()# Define custom operator
operator_def = visionai_v1.OperatorDefinition(
operator="my-custom-detector",
input_args=[
visionai_v1.OperatorInputArg(
argument="video",
type="VIDEO_STREAM"
)
],
output_args=[
visionai_v1.OperatorOutputArg(
argument="detections",
type="OBJECT_DETECTION_PREDICTION"
)
],
attrs=[
visionai_v1.OperatorAttribute(
attribute="model_path",
type="STRING",
default_value=visionai_v1.AttributeValue(s=b"/models/detector.pb")
)
],
resources=visionai_v1.ResourceSpecification(
resource_type=visionai_v1.ResourceSpecificationType.GPU,
gpu=visionai_v1.ResourceSpecificationGPU(
gpu_limit=1,
gpu_type="nvidia-tesla-t4"
)
),
short_description="Custom object detector",
description="Custom trained object detection model"
)
# Create operator
operator = visionai_v1.Operator(
operator_definition=operator_def,
docker_image="gcr.io/my-project/custom-detector:v1.0"
)
create_op = client.create_operator(
parent=parent,
operator=operator,
operator_id="custom-detector"
)
created_operator = create_op.result()# Create multiple processes for batch execution
process_requests = [
visionai_v1.CreateProcessRequest(
parent=parent,
process_id=f"batch-process-{i}",
process=visionai_v1.Process(
analysis=f"{parent}/analyses/security-analysis",
run_mode=visionai_v1.RunMode.OFFLINE,
batch_id="batch-001"
)
)
for i in range(5)
]
# Execute batch
batch_op = client.batch_run_process(
parent=parent,
requests=process_requests
)
batch_result = batch_op.result()
# Monitor process status
for request in process_requests:
process_name = f"{parent}/processes/{request.process_id}"
process = client.get_process(name=process_name)
print(f"Process {request.process_id}: {process.run_status}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-visionai