Google Cloud Vision AI API client library for building and deploying Vertex AI Vision applications
—
Bidirectional streaming capabilities for video packets, events, and lease-based resource management. The StreamingService provides low-latency video streaming with precise resource control and event-driven processing for real-time applications.
Send and receive video/audio packets in real-time with bidirectional streaming support for interactive applications.
def send_packets(self, requests: Iterator[SendPacketsRequest]) -> Iterator[SendPacketsResponse]:
"""
Sends video packets to stream in bidirectional streaming mode.
Args:
requests (Iterator[SendPacketsRequest]): Stream of packet send requests
Yields:
SendPacketsResponse: Response for each sent packet with acknowledgments
"""
def receive_packets(self, requests: Iterator[ReceivePacketsRequest]) -> Iterator[ReceivePacketsResponse]:
"""
Receives video packets from stream in bidirectional streaming mode.
Args:
requests (Iterator[ReceivePacketsRequest]): Stream of packet receive requests
Yields:
ReceivePacketsResponse: Stream of received packets with metadata
"""
def receive_events(self, requests: Iterator[ReceiveEventsRequest]) -> Iterator[ReceiveEventsResponse]:
"""
Receives events from stream in bidirectional streaming mode.
Args:
requests (Iterator[ReceiveEventsRequest]): Stream of event receive requests
Yields:
ReceiveEventsResponse: Stream of received events and notifications
"""Acquire, renew, and release leases on streaming resources to ensure exclusive access and prevent conflicts.
def acquire_lease(self, request: AcquireLeaseRequest, *, retry=None, timeout=None, metadata=()) -> Lease:
"""
Acquires a lease on a streaming session.
Args:
request (AcquireLeaseRequest): Required. Request containing lease parameters
retry: Retry configuration for the request
timeout: Timeout for the request
metadata: Additional metadata for the request
Returns:
Lease: Acquired lease with expiration and renewal information
"""
def renew_lease(self, request: RenewLeaseRequest, *, retry=None, timeout=None, metadata=()) -> Lease:
"""
Renews an existing lease to extend its validity period.
Args:
request (RenewLeaseRequest): Required. Request containing lease renewal parameters
retry: Retry configuration for the request
timeout: Timeout for the request
metadata: Additional metadata for the request
Returns:
Lease: Renewed lease with updated expiration time
"""
def release_lease(self, request: ReleaseLeaseRequest, *, retry=None, timeout=None, metadata=()) -> ReleaseLeaseResponse:
"""
Releases an active lease to free up streaming resources.
Args:
request (ReleaseLeaseRequest): Required. Request containing lease release parameters
retry: Retry configuration for the request
timeout: Timeout for the request
metadata: Additional metadata for the request
Returns:
ReleaseLeaseResponse: Confirmation of lease release
"""class SendPacketsRequest:
"""Request for sending packets to a stream."""
# Union field oneof request:
setup_request: SendPacketsRequestSetupRequest # Initial setup request
packet: Packet # Video/audio packet to send
class SendPacketsRequestSetupRequest:
"""Setup request for packet sending session."""
series: str # Series resource path
lease_id: str # Lease ID for stream access
class SendPacketsResponse:
"""Response from packet sending operation."""
packet_id: str # Unique packet identifier
class ReceivePacketsRequest:
"""Request for receiving packets from a stream."""
# Union field oneof request:
setup_request: ReceivePacketsRequestSetupRequest # Initial setup request
commit_request: CommitRequest # Commit received packets
class ReceivePacketsRequestSetupRequest:
"""Setup request for packet receiving session."""
series: str # Series resource path
receiver: str # Receiver identifier
heartbeat_interval: Duration # Heartbeat frequency
writes_done_grace_period: Duration # Grace period for writes completion
class ReceivePacketsResponse:
"""Response containing received packets."""
packet: Packet # Received video/audio packet
class ReceiveEventsRequest:
"""Request for receiving events from a stream."""
# Union field oneof request:
setup_request: ReceiveEventsRequestSetupRequest # Initial setup request
commit_request: CommitRequest # Commit received events
class ReceiveEventsRequestSetupRequest:
"""Setup request for event receiving session."""
series: str # Series resource path
receiver: str # Receiver identifier
heartbeat_interval: Duration # Heartbeat frequency
writes_done_grace_period: Duration # Grace period for writes completion
class ReceiveEventsResponse:
"""Response containing received events."""
event_update: EventUpdate # Event update information
class CommitRequest:
"""Request to commit received data."""
offset: int # Offset to commit up toclass Packet:
"""Video or audio packet with metadata."""
header: PacketHeader # Packet metadata and timing
payload: bytes # Packet data payload
class PacketHeader:
"""Metadata for video/audio packets."""
capture_time: Timestamp # When packet was captured
server_metadata: ServerMetadata # Server-side metadata
series_metadata: SeriesMetadata # Series metadata
flags: PacketHeaderFlag # Packet flags
trace_context: str # Tracing context
# Union field oneof packet_type:
gstreamer_buffer_descriptor: GstreamerBufferDescriptor # GStreamer buffer info
raw_image_descriptor: RawImageDescriptor # Raw image format info
class ServerMetadata:
"""Server-side metadata for packets."""
offset: int # Packet offset in stream
ingest_time: Timestamp # Server ingestion time
class SeriesMetadata:
"""Metadata about the packet series."""
series: str # Series resource path
class GstreamerBufferDescriptor:
"""GStreamer-specific buffer information."""
caps_string: str # GStreamer capabilities string
is_key_frame: bool # Whether this is a key frame
pts_time: Duration # Presentation timestamp
dts_time: Duration # Decode timestamp
duration: Duration # Buffer duration
class RawImageDescriptor:
"""Raw image format descriptor."""
format: RawImageDescriptorFormat # Image format
width: int # Image width in pixels
height: int # Image height in pixels
class PacketHeaderFlag(Enum):
"""Flags for packet headers."""
FLAG_UNSPECIFIED = 0
IMMUTABLE = 1 # Packet is immutable
class RawImageDescriptorFormat(Enum):
"""Raw image format types."""
FORMAT_UNSPECIFIED = 0
SRGB = 1 # sRGB formatclass AcquireLeaseRequest:
"""Request message for acquiring a lease."""
series: str # The series name
owner: str # The owner name
term: Duration # The lease term
lease_type: LeaseType # The lease type (optional)
class RenewLeaseRequest:
"""Request message for renewing a lease."""
id: str # Lease id
series: str # Series name
owner: str # Lease owner
term: Duration # Lease term
class ReleaseLeaseRequest:
"""Request message for releasing lease."""
id: str # Lease id
series: str # Series name
owner: str # Lease ownerclass Lease:
"""Lease on streaming resources."""
id: str # Unique lease identifier
series: str # Series resource path
owner: str # Lease owner identifier
expire_time: Timestamp # Lease expiration time
lease_type: LeaseType # Type of lease
class LeaseType(Enum):
"""Types of streaming leases."""
LEASE_TYPE_UNSPECIFIED = 0
READER = 1 # Read-only lease
WRITER = 2 # Write-only lease
class ReleaseLeaseResponse:
"""Response from lease release operation."""
pass # Confirmation responseclass EventUpdate:
"""Update information for stream events."""
stream: str # Stream resource path
event: str # Event identifier
series: str # Series resource path
update_time: Timestamp # Time of update
offset: int # Event offset in streamclass RequestMetadata:
"""Metadata for streaming requests."""
# Union field oneof mode:
eager_mode: EagerMode # Eager streaming mode
controlled_mode: ControlledMode # Controlled streaming mode
class EagerMode:
"""Eager streaming mode configuration."""
pass # Stream data as fast as possible
class ControlledMode:
"""Controlled streaming mode configuration."""
starting_logical_offset: str # Starting offset for controlled streaming
fallback_starting_offset: str # Fallback offset if starting offset unavailablefrom google.cloud import visionai_v1
import asyncio
from typing import Iterator
async def send_video_stream():
"""Example of sending video packets to a stream."""
async with visionai_v1.StreamingServiceAsyncClient() as client:
# Create request iterator
def request_iterator() -> Iterator[visionai_v1.SendPacketsRequest]:
# Send setup request first
yield visionai_v1.SendPacketsRequest(
setup_request=visionai_v1.SendPacketsRequestSetupRequest(
series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1/series/video",
lease_id="lease-12345"
)
)
# Send video packets
for i in range(100):
packet = visionai_v1.Packet(
header=visionai_v1.PacketHeader(
capture_time={"seconds": int(time.time())},
gstreamer_buffer_descriptor=visionai_v1.GstreamerBufferDescriptor(
caps_string="video/x-raw,format=RGB,width=1920,height=1080",
is_key_frame=(i % 30 == 0), # Key frame every 30 frames
pts_time={"nanos": i * 33333333} # 30 FPS
)
),
payload=generate_video_frame() # Your video frame data
)
yield visionai_v1.SendPacketsRequest(packet=packet)
# Send packets and process responses
async for response in client.send_packets(request_iterator()):
print(f"Sent packet ID: {response.packet_id}")
async def receive_video_stream():
"""Example of receiving video packets from a stream."""
async with visionai_v1.StreamingServiceAsyncClient() as client:
def request_iterator() -> Iterator[visionai_v1.ReceivePacketsRequest]:
# Send setup request
yield visionai_v1.ReceivePacketsRequest(
setup_request=visionai_v1.ReceivePacketsRequestSetupRequest(
series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/output/series/video",
receiver="video-receiver-001",
heartbeat_interval={"seconds": 30}
)
)
# Receive packets
async for response in client.receive_packets(request_iterator()):
packet = response.packet
print(f"Received packet at offset: {packet.header.server_metadata.offset}")
process_video_frame(packet.payload) # Your processing logicfrom google.cloud import visionai_v1
import time
def manage_stream_lease():
"""Example of lease management for exclusive stream access."""
client = visionai_v1.StreamingServiceClient()
# Acquire lease
session = "projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1"
owner = "video-processor-001"
lease_duration = {"seconds": 300} # 5 minute lease
lease = client.acquire_lease(
request=visionai_v1.AcquireLeaseRequest(
series=f"{session}/series/video",
owner=owner,
term=lease_duration,
lease_type=visionai_v1.LeaseType.WRITER
)
)
print(f"Acquired lease ID: {lease.id}")
print(f"Expires at: {lease.expire_time}")
try:
# Use the stream with exclusive access
perform_stream_operations()
# Renew lease if needed (before expiration)
time.sleep(240) # Wait 4 minutes
renewed_lease = client.renew_lease(
request=visionai_v1.RenewLeaseRequest(
id=lease.id,
series=f"{session}/series/video",
owner=owner,
term=lease_duration
)
)
print(f"Renewed lease, new expiration: {renewed_lease.expire_time}")
finally:
# Always release lease when done
client.release_lease(
request=visionai_v1.ReleaseLeaseRequest(
id=lease.id,
series=f"{session}/series/video",
owner=owner
)
)
print("Lease released")
def perform_stream_operations():
"""Placeholder for stream operations while holding lease."""
pass
def generate_video_frame() -> bytes:
"""Placeholder for video frame generation."""
return b"video_frame_data"
def process_video_frame(frame_data: bytes):
"""Placeholder for video frame processing."""
passasync def process_stream_events():
"""Example of receiving and processing stream events."""
async with visionai_v1.StreamingServiceAsyncClient() as client:
def request_iterator() -> Iterator[visionai_v1.ReceiveEventsRequest]:
yield visionai_v1.ReceiveEventsRequest(
setup_request=visionai_v1.ReceiveEventsRequestSetupRequest(
series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1/series/events",
receiver="event-processor-001",
heartbeat_interval={"seconds": 60}
)
)
# Process incoming events
async for response in client.receive_events(request_iterator()):
event_update = response.event_update
print(f"Received event from stream: {event_update.stream}")
print(f"Event ID: {event_update.event}")
print(f"Update time: {event_update.update_time}")
print(f"Offset: {event_update.offset}")
# Process the event based on your application logic
await handle_stream_event(event_update)
async def handle_stream_event(event_update: visionai_v1.EventUpdate):
"""Handle individual stream events."""
# Your event processing logic here
pass
# Run the async examples
if __name__ == "__main__":
asyncio.run(send_video_stream())
asyncio.run(receive_video_stream())
asyncio.run(process_stream_events())Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-visionai