CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-visionai

Google Cloud Vision AI API client library for building and deploying Vertex AI Vision applications

Pending
Overview
Eval results
Files

streaming.mddocs/

Real-Time Streaming

Bidirectional streaming capabilities for video packets, events, and lease-based resource management. The StreamingService provides low-latency video streaming with precise resource control and event-driven processing for real-time applications.

Capabilities

Packet Streaming Operations

Send and receive video/audio packets in real-time with bidirectional streaming support for interactive applications.

def send_packets(self, requests: Iterator[SendPacketsRequest]) -> Iterator[SendPacketsResponse]:
    """
    Sends video packets to stream in bidirectional streaming mode.
    
    Args:
        requests (Iterator[SendPacketsRequest]): Stream of packet send requests
    
    Yields:
        SendPacketsResponse: Response for each sent packet with acknowledgments
    """

def receive_packets(self, requests: Iterator[ReceivePacketsRequest]) -> Iterator[ReceivePacketsResponse]:
    """
    Receives video packets from stream in bidirectional streaming mode.
    
    Args:
        requests (Iterator[ReceivePacketsRequest]): Stream of packet receive requests
    
    Yields:
        ReceivePacketsResponse: Stream of received packets with metadata
    """

def receive_events(self, requests: Iterator[ReceiveEventsRequest]) -> Iterator[ReceiveEventsResponse]:
    """
    Receives events from stream in bidirectional streaming mode.
    
    Args:
        requests (Iterator[ReceiveEventsRequest]): Stream of event receive requests
    
    Yields:
        ReceiveEventsResponse: Stream of received events and notifications
    """

Lease Management

Acquire, renew, and release leases on streaming resources to ensure exclusive access and prevent conflicts.

def acquire_lease(self, request: AcquireLeaseRequest, *, retry=None, timeout=None, metadata=()) -> Lease:
    """
    Acquires a lease on a streaming session.
    
    Args:
        request (AcquireLeaseRequest): Required. Request containing lease parameters
        retry: Retry configuration for the request
        timeout: Timeout for the request
        metadata: Additional metadata for the request
    
    Returns:
        Lease: Acquired lease with expiration and renewal information
    """

def renew_lease(self, request: RenewLeaseRequest, *, retry=None, timeout=None, metadata=()) -> Lease:
    """
    Renews an existing lease to extend its validity period.
    
    Args:
        request (RenewLeaseRequest): Required. Request containing lease renewal parameters
        retry: Retry configuration for the request
        timeout: Timeout for the request
        metadata: Additional metadata for the request
    
    Returns:
        Lease: Renewed lease with updated expiration time
    """

def release_lease(self, request: ReleaseLeaseRequest, *, retry=None, timeout=None, metadata=()) -> ReleaseLeaseResponse:
    """
    Releases an active lease to free up streaming resources.
    
    Args:
        request (ReleaseLeaseRequest): Required. Request containing lease release parameters
        retry: Retry configuration for the request
        timeout: Timeout for the request
        metadata: Additional metadata for the request
    
    Returns:
        ReleaseLeaseResponse: Confirmation of lease release
    """

Types

Streaming Request and Response Types

class SendPacketsRequest:
    """Request for sending packets to a stream."""
    # Union field oneof request:
    setup_request: SendPacketsRequestSetupRequest  # Initial setup request
    packet: Packet  # Video/audio packet to send

class SendPacketsRequestSetupRequest:
    """Setup request for packet sending session."""
    series: str  # Series resource path
    lease_id: str  # Lease ID for stream access

class SendPacketsResponse:
    """Response from packet sending operation."""
    packet_id: str  # Unique packet identifier
    
class ReceivePacketsRequest:
    """Request for receiving packets from a stream."""
    # Union field oneof request:
    setup_request: ReceivePacketsRequestSetupRequest  # Initial setup request
    commit_request: CommitRequest  # Commit received packets

class ReceivePacketsRequestSetupRequest:
    """Setup request for packet receiving session."""
    series: str  # Series resource path
    receiver: str  # Receiver identifier
    heartbeat_interval: Duration  # Heartbeat frequency
    writes_done_grace_period: Duration  # Grace period for writes completion

class ReceivePacketsResponse:
    """Response containing received packets."""
    packet: Packet  # Received video/audio packet

class ReceiveEventsRequest:
    """Request for receiving events from a stream."""
    # Union field oneof request:
    setup_request: ReceiveEventsRequestSetupRequest  # Initial setup request
    commit_request: CommitRequest  # Commit received events

class ReceiveEventsRequestSetupRequest:
    """Setup request for event receiving session."""
    series: str  # Series resource path
    receiver: str  # Receiver identifier
    heartbeat_interval: Duration  # Heartbeat frequency
    writes_done_grace_period: Duration  # Grace period for writes completion

class ReceiveEventsResponse:
    """Response containing received events."""
    event_update: EventUpdate  # Event update information

class CommitRequest:
    """Request to commit received data."""
    offset: int  # Offset to commit up to

Packet Types

class Packet:
    """Video or audio packet with metadata."""
    header: PacketHeader  # Packet metadata and timing
    payload: bytes  # Packet data payload

class PacketHeader:
    """Metadata for video/audio packets."""
    capture_time: Timestamp  # When packet was captured
    server_metadata: ServerMetadata  # Server-side metadata
    series_metadata: SeriesMetadata  # Series metadata
    flags: PacketHeaderFlag  # Packet flags
    trace_context: str  # Tracing context
    # Union field oneof packet_type:
    gstreamer_buffer_descriptor: GstreamerBufferDescriptor  # GStreamer buffer info
    raw_image_descriptor: RawImageDescriptor  # Raw image format info

class ServerMetadata:
    """Server-side metadata for packets."""
    offset: int  # Packet offset in stream
    ingest_time: Timestamp  # Server ingestion time

class SeriesMetadata:
    """Metadata about the packet series.""" 
    series: str  # Series resource path
    
class GstreamerBufferDescriptor:
    """GStreamer-specific buffer information."""
    caps_string: str  # GStreamer capabilities string
    is_key_frame: bool  # Whether this is a key frame
    pts_time: Duration  # Presentation timestamp
    dts_time: Duration  # Decode timestamp
    duration: Duration  # Buffer duration

class RawImageDescriptor:
    """Raw image format descriptor."""
    format: RawImageDescriptorFormat  # Image format
    width: int  # Image width in pixels
    height: int  # Image height in pixels

class PacketHeaderFlag(Enum):
    """Flags for packet headers."""
    FLAG_UNSPECIFIED = 0
    IMMUTABLE = 1  # Packet is immutable
    
class RawImageDescriptorFormat(Enum):
    """Raw image format types."""
    FORMAT_UNSPECIFIED = 0
    SRGB = 1  # sRGB format

Lease Request Types

class AcquireLeaseRequest:
    """Request message for acquiring a lease."""
    series: str  # The series name
    owner: str  # The owner name
    term: Duration  # The lease term
    lease_type: LeaseType  # The lease type (optional)

class RenewLeaseRequest:
    """Request message for renewing a lease."""
    id: str  # Lease id
    series: str  # Series name
    owner: str  # Lease owner
    term: Duration  # Lease term

class ReleaseLeaseRequest:
    """Request message for releasing lease."""
    id: str  # Lease id
    series: str  # Series name
    owner: str  # Lease owner

Lease Types

class Lease:
    """Lease on streaming resources."""
    id: str  # Unique lease identifier
    series: str  # Series resource path
    owner: str  # Lease owner identifier
    expire_time: Timestamp  # Lease expiration time
    lease_type: LeaseType  # Type of lease

class LeaseType(Enum):
    """Types of streaming leases."""
    LEASE_TYPE_UNSPECIFIED = 0
    READER = 1  # Read-only lease
    WRITER = 2  # Write-only lease

class ReleaseLeaseResponse:
    """Response from lease release operation."""
    pass  # Confirmation response

Event Types

class EventUpdate:
    """Update information for stream events."""
    stream: str  # Stream resource path
    event: str  # Event identifier
    series: str  # Series resource path
    update_time: Timestamp  # Time of update
    offset: int  # Event offset in stream

Streaming Modes

class RequestMetadata:
    """Metadata for streaming requests."""
    # Union field oneof mode:
    eager_mode: EagerMode  # Eager streaming mode
    controlled_mode: ControlledMode  # Controlled streaming mode

class EagerMode:
    """Eager streaming mode configuration."""
    pass  # Stream data as fast as possible

class ControlledMode:
    """Controlled streaming mode configuration."""
    starting_logical_offset: str  # Starting offset for controlled streaming
    fallback_starting_offset: str  # Fallback offset if starting offset unavailable

Usage Examples

Real-Time Packet Streaming

from google.cloud import visionai_v1
import asyncio
from typing import Iterator

async def send_video_stream():
    """Example of sending video packets to a stream."""
    
    async with visionai_v1.StreamingServiceAsyncClient() as client:
        # Create request iterator
        def request_iterator() -> Iterator[visionai_v1.SendPacketsRequest]:
            # Send setup request first
            yield visionai_v1.SendPacketsRequest(
                setup_request=visionai_v1.SendPacketsRequestSetupRequest(
                    series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1/series/video",
                    lease_id="lease-12345"
                )
            )
            
            # Send video packets
            for i in range(100):
                packet = visionai_v1.Packet(
                    header=visionai_v1.PacketHeader(
                        capture_time={"seconds": int(time.time())},
                        gstreamer_buffer_descriptor=visionai_v1.GstreamerBufferDescriptor(
                            caps_string="video/x-raw,format=RGB,width=1920,height=1080",
                            is_key_frame=(i % 30 == 0),  # Key frame every 30 frames
                            pts_time={"nanos": i * 33333333}  # 30 FPS
                        )
                    ),
                    payload=generate_video_frame()  # Your video frame data
                )
                
                yield visionai_v1.SendPacketsRequest(packet=packet)
        
        # Send packets and process responses
        async for response in client.send_packets(request_iterator()):
            print(f"Sent packet ID: {response.packet_id}")

async def receive_video_stream():
    """Example of receiving video packets from a stream."""
    
    async with visionai_v1.StreamingServiceAsyncClient() as client:
        def request_iterator() -> Iterator[visionai_v1.ReceivePacketsRequest]:
            # Send setup request
            yield visionai_v1.ReceivePacketsRequest(
                setup_request=visionai_v1.ReceivePacketsRequestSetupRequest(
                    series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/output/series/video",
                    receiver="video-receiver-001",
                    heartbeat_interval={"seconds": 30}
                )
            )
        
        # Receive packets
        async for response in client.receive_packets(request_iterator()):
            packet = response.packet
            print(f"Received packet at offset: {packet.header.server_metadata.offset}")
            process_video_frame(packet.payload)  # Your processing logic

Lease-Based Resource Management

from google.cloud import visionai_v1
import time

def manage_stream_lease():
    """Example of lease management for exclusive stream access."""
    
    client = visionai_v1.StreamingServiceClient()
    
    # Acquire lease
    session = "projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1"
    owner = "video-processor-001"
    lease_duration = {"seconds": 300}  # 5 minute lease
    
    lease = client.acquire_lease(
        request=visionai_v1.AcquireLeaseRequest(
            series=f"{session}/series/video",
            owner=owner,
            term=lease_duration,
            lease_type=visionai_v1.LeaseType.WRITER
        )
    )
    
    print(f"Acquired lease ID: {lease.id}")
    print(f"Expires at: {lease.expire_time}")
    
    try:
        # Use the stream with exclusive access
        perform_stream_operations()
        
        # Renew lease if needed (before expiration)
        time.sleep(240)  # Wait 4 minutes
        renewed_lease = client.renew_lease(
            request=visionai_v1.RenewLeaseRequest(
                id=lease.id,
                series=f"{session}/series/video",
                owner=owner,
                term=lease_duration
            )
        )
        
        print(f"Renewed lease, new expiration: {renewed_lease.expire_time}")
        
    finally:
        # Always release lease when done
        client.release_lease(
            request=visionai_v1.ReleaseLeaseRequest(
                id=lease.id,
                series=f"{session}/series/video",
                owner=owner
            )
        )
        print("Lease released")

def perform_stream_operations():
    """Placeholder for stream operations while holding lease."""
    pass

def generate_video_frame() -> bytes:
    """Placeholder for video frame generation."""
    return b"video_frame_data"

def process_video_frame(frame_data: bytes):
    """Placeholder for video frame processing."""
    pass

Event Stream Processing

async def process_stream_events():
    """Example of receiving and processing stream events."""
    
    async with visionai_v1.StreamingServiceAsyncClient() as client:
        def request_iterator() -> Iterator[visionai_v1.ReceiveEventsRequest]:
            yield visionai_v1.ReceiveEventsRequest(
                setup_request=visionai_v1.ReceiveEventsRequestSetupRequest(
                    series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1/series/events",
                    receiver="event-processor-001",
                    heartbeat_interval={"seconds": 60}
                )
            )
        
        # Process incoming events
        async for response in client.receive_events(request_iterator()):
            event_update = response.event_update
            
            print(f"Received event from stream: {event_update.stream}")
            print(f"Event ID: {event_update.event}")
            print(f"Update time: {event_update.update_time}")
            print(f"Offset: {event_update.offset}")
            
            # Process the event based on your application logic
            await handle_stream_event(event_update)

async def handle_stream_event(event_update: visionai_v1.EventUpdate):
    """Handle individual stream events."""
    # Your event processing logic here
    pass

# Run the async examples
if __name__ == "__main__":
    asyncio.run(send_video_stream())
    asyncio.run(receive_video_stream())
    asyncio.run(process_stream_events())

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-visionai

docs

app-platform.md

health-check.md

index.md

live-video-analytics.md

streaming.md

streams-management.md

types.md

warehouse.md

tile.json