CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-visionai

Google Cloud Vision AI API client library for building and deploying Vertex AI Vision applications

Pending
Overview
Eval results
Files

streams-management.mddocs/

Stream Infrastructure Management

Infrastructure management for clusters, streams, events, and time series data organization. The StreamsService provides the foundational infrastructure for organizing and managing video streaming resources at scale.

Capabilities

Cluster Management

Create and manage streaming clusters that serve as containers for streams and provide resource isolation and scaling.

def list_clusters(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListClustersResponse:
    """
    Lists clusters in a project and location.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        page_size (int): Maximum number of clusters to return
        page_token (str): Token for pagination
        filter (str): Filter expression for clusters
        order_by (str): Sort order for results
        
    Returns:
        ListClustersResponse: Response with clusters and pagination
    """

def get_cluster(self, name: str) -> Cluster:
    """
    Retrieves cluster details and configuration.
    
    Args:
        name (str): Required. Cluster resource path
                   "projects/{project}/locations/{location}/clusters/{cluster}"
    
    Returns:
        Cluster: The cluster resource with configuration and status
    """

def create_cluster(self, parent: str, cluster: Cluster, cluster_id: str) -> Operation:
    """
    Creates a new streaming cluster.
    
    Args:
        parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
        cluster (Cluster): Required. Cluster configuration
        cluster_id (str): Required. ID for the new cluster
    
    Returns:
        Operation: Long-running operation for cluster creation
    """

def update_cluster(self, cluster: Cluster, update_mask: FieldMask = None) -> Operation:
    """
    Updates cluster configuration and settings.
    
    Args:
        cluster (Cluster): Required. Updated cluster configuration
        update_mask (FieldMask): Fields to update
    
    Returns:
        Operation: Long-running operation for cluster update
    """

def delete_cluster(self, name: str) -> Operation:
    """
    Deletes a streaming cluster and all contained resources.
    
    Args:
        name (str): Required. Cluster resource path to delete
    
    Returns:
        Operation: Long-running operation for cluster deletion
    """

Stream Management

Manage individual streams within clusters for organizing video data flows and processing pipelines.

def list_streams(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListStreamsResponse:
    """
    Lists streams in a cluster.
    
    Args:
        parent (str): Required. Cluster resource path
        page_size (int): Maximum number of streams to return
        page_token (str): Token for pagination
        filter (str): Filter expression for streams
        order_by (str): Sort order for results
        
    Returns:
        ListStreamsResponse: Response with streams and pagination
    """

def get_stream(self, name: str) -> Stream:
    """
    Retrieves stream details and configuration.
    
    Args:
        name (str): Required. Stream resource path
                   "projects/{project}/locations/{location}/clusters/{cluster}/streams/{stream}"
    
    Returns:
        Stream: The stream resource with configuration and status
    """

def create_stream(self, parent: str, stream: Stream, stream_id: str) -> Operation:
    """
    Creates a new stream within a cluster.
    
    Args:
        parent (str): Required. Cluster resource path
        stream (Stream): Required. Stream configuration
        stream_id (str): Required. ID for the new stream
    
    Returns:
        Operation: Long-running operation for stream creation
    """

def update_stream(self, stream: Stream, update_mask: FieldMask = None) -> Operation:
    """
    Updates stream configuration and settings.
    
    Args:
        stream (Stream): Required. Updated stream configuration
        update_mask (FieldMask): Fields to update
    
    Returns:
        Operation: Long-running operation for stream update
    """

def delete_stream(self, name: str) -> Operation:
    """
    Deletes a stream and associated data.
    
    Args:
        name (str): Required. Stream resource path to delete
    
    Returns:
        Operation: Long-running operation for stream deletion
    """

def get_stream_thumbnail(self, stream: str, gcs_object_name: str, event: str = None) -> Operation:
    """
    Generates and retrieves a thumbnail image for a stream.
    
    Args:
        stream (str): Required. Stream resource path
        gcs_object_name (str): Required. GCS object name for thumbnail storage
        event (str): Specific event for thumbnail generation
    
    Returns:
        Operation: Long-running operation for thumbnail generation
    """

def generate_stream_hls_token(self, stream: str) -> GenerateStreamHlsTokenResponse:
    """
    Generates HLS streaming token for stream access.
    
    Args:
        stream (str): Required. Stream resource path
    
    Returns:
        GenerateStreamHlsTokenResponse: HLS token and streaming URLs
    """

Event Management

Manage events within streams for tracking significant occurrences and organizing temporal data.

def list_events(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListEventsResponse:
    """
    Lists events in a cluster.
    
    Args:
        parent (str): Required. Cluster resource path
        page_size (int): Maximum number of events to return
        page_token (str): Token for pagination
        filter (str): Filter expression for events
        order_by (str): Sort order for results
        
    Returns:
        ListEventsResponse: Response with events and pagination
    """

def get_event(self, name: str) -> Event:
    """
    Retrieves event details and metadata.
    
    Args:
        name (str): Required. Event resource path
                   "projects/{project}/locations/{location}/clusters/{cluster}/events/{event}"
    
    Returns:
        Event: The event resource with metadata and timing
    """

def create_event(self, parent: str, event: Event, event_id: str) -> Operation:
    """
    Creates a new event in a cluster.
    
    Args:
        parent (str): Required. Cluster resource path
        event (Event): Required. Event configuration and metadata
        event_id (str): Required. ID for the new event
    
    Returns:
        Operation: Long-running operation for event creation
    """

def update_event(self, event: Event, update_mask: FieldMask = None) -> Operation:
    """
    Updates event metadata and configuration.
    
    Args:
        event (Event): Required. Updated event configuration
        update_mask (FieldMask): Fields to update
    
    Returns:
        Operation: Long-running operation for event update
    """

def delete_event(self, name: str) -> Operation:
    """
    Deletes an event and associated metadata.
    
    Args:
        name (str): Required. Event resource path to delete
    
    Returns:
        Operation: Long-running operation for event deletion
    """

Series Management

Manage time series data within streams for organizing sequential data and enabling temporal analysis.

def list_series(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListSeriesResponse:
    """
    Lists series in a cluster.
    
    Args:
        parent (str): Required. Cluster resource path
        page_size (int): Maximum number of series to return
        page_token (str): Token for pagination
        filter (str): Filter expression for series
        order_by (str): Sort order for results
        
    Returns:
        ListSeriesResponse: Response with series and pagination
    """

def get_series(self, name: str) -> Series:
    """
    Retrieves series details and metadata.
    
    Args:
        name (str): Required. Series resource path
                   "projects/{project}/locations/{location}/clusters/{cluster}/series/{series}"
    
    Returns:
        Series: The series resource with metadata and configuration
    """

def create_series(self, parent: str, series: Series, series_id: str) -> Operation:
    """
    Creates a new time series in a cluster.
    
    Args:
        parent (str): Required. Cluster resource path
        series (Series): Required. Series configuration and metadata
        series_id (str): Required. ID for the new series
    
    Returns:
        Operation: Long-running operation for series creation
    """

def update_series(self, series: Series, update_mask: FieldMask = None) -> Operation:
    """
    Updates series metadata and configuration.
    
    Args:
        series (Series): Required. Updated series configuration
        update_mask (FieldMask): Fields to update
    
    Returns:
        Operation: Long-running operation for series update
    """

def delete_series(self, name: str) -> Operation:
    """
    Deletes a series and associated data.
    
    Args:
        name (str): Required. Series resource path to delete
    
    Returns:
        Operation: Long-running operation for series deletion
    """

Channel Operations

Manage channels for materializing data streams and enabling data access patterns.

def materialize_channel(self, parent: str, channel_id: str, channel: Channel) -> Operation:
    """
    Materializes a channel from series data.
    
    Args:
        parent (str): Required. Parent resource path
        channel_id (str): Required. ID for the materialized channel
        channel (Channel): Required. Channel configuration
    
    Returns:
        Operation: Long-running operation for channel materialization
    """

Types

Cluster Resources

class Cluster:
    """Streaming cluster configuration and status."""
    name: str  # Resource name
    display_name: str  # Human-readable name
    description: str  # Cluster description
    state: ClusterState  # Current cluster state
    psc_target: str  # Private Service Connect target
    create_time: Timestamp  # Creation timestamp
    update_time: Timestamp  # Last update timestamp
    labels: Dict[str, str]  # Resource labels

class ClusterState(Enum):
    """Cluster operational states."""
    STATE_UNSPECIFIED = 0
    PROVISIONING = 1  # Cluster being provisioned
    RUNNING = 2  # Cluster operational
    STOPPING = 3  # Cluster being stopped
    ERROR = 4  # Cluster in error state

Stream Resources

class Stream:
    """Stream configuration and metadata."""
    name: str  # Resource name
    display_name: str  # Human-readable name
    description: str  # Stream description
    enable_hls_playback: bool  # Enable HLS playback
    media_warehouse_asset: str  # Associated warehouse asset
    create_time: Timestamp  # Creation timestamp
    update_time: Timestamp  # Last update timestamp
    labels: Dict[str, str]  # Resource labels

class GenerateStreamHlsTokenResponse:
    """Response for HLS token generation."""
    token: str  # HLS streaming token
    expiration_time: Timestamp  # Token expiration time

Event Resources

class Event:
    """Event metadata and timing information."""
    name: str  # Resource name
    display_name: str  # Human-readable name
    description: str  # Event description
    create_time: Timestamp  # Creation timestamp
    update_time: Timestamp  # Last update timestamp
    labels: Dict[str, str]  # Resource labels
    # Union field oneof clock:
    grace_period: Duration  # Grace period for event
    alignment_clock: AlignmentClock  # Clock alignment for event

class AlignmentClock(Enum):
    """Clock alignment types for events."""
    ALIGNMENT_CLOCK_UNSPECIFIED = 0
    LIVE = 1  # Live clock alignment
    CAPTURE = 2  # Capture time alignment

Series Resources

class Series:
    """Time series metadata and configuration.""" 
    name: str  # Resource name
    display_name: str  # Human-readable name
    description: str  # Series description
    stream: str  # Associated stream resource
    event: str  # Associated event resource
    create_time: Timestamp  # Creation timestamp
    update_time: Timestamp  # Last update timestamp
    labels: Dict[str, str]  # Resource labels

Channel Resources

class Channel:
    """Channel configuration for data materialization."""
    stream: str  # Source stream for channel
    event: str  # Source event for channel
    # Union field oneof input_config:
    input_config: ChannelInputConfig  # Input configuration
    
class ChannelInputConfig:
    """Input configuration for channel."""
    pass  # Configuration for channel inputs

Usage Examples

Infrastructure Setup Workflow

from google.cloud import visionai_v1

# Create client
client = visionai_v1.StreamsServiceClient()

# Define paths
parent = "projects/my-project/locations/us-central1"

# Step 1: Create cluster
cluster = visionai_v1.Cluster(
    display_name="Video Processing Cluster",
    description="Cluster for real-time video processing",
    labels={
        "environment": "production",
        "team": "video-analytics"
    }
)

create_cluster_op = client.create_cluster(
    parent=parent,
    cluster=cluster,
    cluster_id="video-cluster"
)

cluster_result = create_cluster_op.result()
cluster_path = cluster_result.name

print(f"Created cluster: {cluster_path}")

# Step 2: Create streams within cluster
streams = [
    {
        "id": "camera-1-stream",
        "config": visionai_v1.Stream(
            display_name="Camera 1 Stream",
            description="Security camera 1 video stream",
            enable_hls_playback=True
        )
    },
    {
        "id": "camera-2-stream", 
        "config": visionai_v1.Stream(
            display_name="Camera 2 Stream",
            description="Security camera 2 video stream",
            enable_hls_playback=True
        )
    }
]

created_streams = []
for stream_info in streams:
    create_stream_op = client.create_stream(
        parent=cluster_path,
        stream=stream_info["config"],
        stream_id=stream_info["id"]
    )
    
    stream_result = create_stream_op.result()
    created_streams.append(stream_result)
    print(f"Created stream: {stream_result.name}")

# Step 3: Create events for temporal organization
events = [
    {
        "id": "motion-detection-event",
        "config": visionai_v1.Event(
            display_name="Motion Detection Event",
            description="Motion detected in surveillance area",
            grace_period={"seconds": 30}
        )
    },
    {
        "id": "person-detected-event",
        "config": visionai_v1.Event(
            display_name="Person Detected Event", 
            description="Person detected in restricted area",
            grace_period={"seconds": 10}
        )
    }
]

created_events = []
for event_info in events:
    create_event_op = client.create_event(
        parent=cluster_path,
        event=event_info["config"],
        event_id=event_info["id"]
    )
    
    event_result = create_event_op.result()
    created_events.append(event_result)
    print(f"Created event: {event_result.name}")

# Step 4: Create time series for data organization
for i, stream in enumerate(created_streams):
    for j, event in enumerate(created_events):
        series = visionai_v1.Series(
            display_name=f"Series {i+1}-{j+1}",
            description=f"Time series for stream {i+1} and event {j+1}",
            stream=stream.name,
            event=event.name
        )
        
        create_series_op = client.create_series(
            parent=cluster_path,
            series=series,
            series_id=f"series-{i+1}-{j+1}"
        )
        
        series_result = create_series_op.result()
        print(f"Created series: {series_result.name}")

Stream Management Operations

def manage_stream_lifecycle():
    """Example of managing stream lifecycle operations."""
    
    client = visionai_v1.StreamsServiceClient()
    
    # List all streams in cluster
    cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"
    streams = client.list_streams(parent=cluster_path)
    
    for stream in streams:
        print(f"Stream: {stream.name}")
        print(f"  Display Name: {stream.display_name}")
        print(f"  HLS Enabled: {stream.enable_hls_playback}")
        print(f"  Created: {stream.create_time}")
        
        # Generate HLS token for streaming
        if stream.enable_hls_playback:
            hls_response = client.generate_stream_hls_token(stream=stream.name)
            print(f"  HLS Token: {hls_response.token}")
            print(f"  Token Expires: {hls_response.expiration_time}")
        
        # Generate thumbnail
        thumbnail_op = client.get_stream_thumbnail(
            stream=stream.name,
            gcs_object_name=f"thumbnails/{stream.name.split('/')[-1]}.jpg"
        )
        
        thumbnail_result = thumbnail_op.result()
        print(f"  Thumbnail generated")

def monitor_events_and_series():
    """Monitor events and series in a cluster."""
    
    client = visionai_v1.StreamsServiceClient()
    cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"
    
    # List and monitor events
    events = client.list_events(
        parent=cluster_path,
        filter='labels.priority="high"',
        order_by="create_time desc"
    )
    
    for event in events:
        print(f"Event: {event.display_name}")
        print(f"  Description: {event.description}")
        print(f"  Created: {event.create_time}")
        
        # Find associated series
        series_list = client.list_series(
            parent=cluster_path,
            filter=f'event="{event.name}"'
        )
        
        for series in series_list:
            print(f"  Associated Series: {series.display_name}")
            print(f"    Stream: {series.stream}")

Channel Materialization

def materialize_data_channels():
    """Example of materializing channels from series data."""
    
    client = visionai_v1.StreamsServiceClient()
    cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"
    
    # Define channel configuration
    channel = visionai_v1.Channel(
        stream="projects/my-project/locations/us-central1/clusters/video-cluster/streams/camera-1-stream",
        event="projects/my-project/locations/us-central1/clusters/video-cluster/events/motion-detection-event",
        input_config=visionai_v1.ChannelInputConfig()
    )
    
    # Materialize the channel
    materialize_op = client.materialize_channel(
        parent=cluster_path,
        channel_id="motion-detection-channel",
        channel=channel
    )
    
    channel_result = materialize_op.result()
    print(f"Materialized channel: {channel_result}")

def cleanup_resources():
    """Clean up streaming resources."""
    
    client = visionai_v1.StreamsServiceClient()
    cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"
    
    # Delete series first (dependent resources)
    series_list = client.list_series(parent=cluster_path)
    for series in series_list:
        delete_op = client.delete_series(name=series.name)
        delete_op.result()
        print(f"Deleted series: {series.name}")
    
    # Delete events
    events_list = client.list_events(parent=cluster_path) 
    for event in events_list:
        delete_op = client.delete_event(name=event.name)
        delete_op.result()
        print(f"Deleted event: {event.name}")
    
    # Delete streams
    streams_list = client.list_streams(parent=cluster_path)
    for stream in streams_list:
        delete_op = client.delete_stream(name=stream.name)
        delete_op.result()
        print(f"Deleted stream: {stream.name}")
    
    # Finally delete cluster
    delete_cluster_op = client.delete_cluster(name=cluster_path)
    delete_cluster_op.result()
    print(f"Deleted cluster: {cluster_path}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-visionai

docs

app-platform.md

health-check.md

index.md

live-video-analytics.md

streaming.md

streams-management.md

types.md

warehouse.md

tile.json