0
# Real-Time Streaming
1
2
Bidirectional streaming capabilities for video packets, events, and lease-based resource management. The StreamingService provides low-latency video streaming with precise resource control and event-driven processing for real-time applications.
3
4
## Capabilities
5
6
### Packet Streaming Operations
7
8
Send and receive video/audio packets in real-time with bidirectional streaming support for interactive applications.
9
10
```python { .api }
11
def send_packets(self, requests: Iterator[SendPacketsRequest]) -> Iterator[SendPacketsResponse]:
12
"""
13
Sends video packets to stream in bidirectional streaming mode.
14
15
Args:
16
requests (Iterator[SendPacketsRequest]): Stream of packet send requests
17
18
Yields:
19
SendPacketsResponse: Response for each sent packet with acknowledgments
20
"""
21
22
def receive_packets(self, requests: Iterator[ReceivePacketsRequest]) -> Iterator[ReceivePacketsResponse]:
23
"""
24
Receives video packets from stream in bidirectional streaming mode.
25
26
Args:
27
requests (Iterator[ReceivePacketsRequest]): Stream of packet receive requests
28
29
Yields:
30
ReceivePacketsResponse: Stream of received packets with metadata
31
"""
32
33
def receive_events(self, requests: Iterator[ReceiveEventsRequest]) -> Iterator[ReceiveEventsResponse]:
34
"""
35
Receives events from stream in bidirectional streaming mode.
36
37
Args:
38
requests (Iterator[ReceiveEventsRequest]): Stream of event receive requests
39
40
Yields:
41
ReceiveEventsResponse: Stream of received events and notifications
42
"""
43
```
44
45
### Lease Management
46
47
Acquire, renew, and release leases on streaming resources to ensure exclusive access and prevent conflicts.
48
49
```python { .api }
50
def acquire_lease(self, request: AcquireLeaseRequest, *, retry=None, timeout=None, metadata=()) -> Lease:
51
"""
52
Acquires a lease on a streaming session.
53
54
Args:
55
request (AcquireLeaseRequest): Required. Request containing lease parameters
56
retry: Retry configuration for the request
57
timeout: Timeout for the request
58
metadata: Additional metadata for the request
59
60
Returns:
61
Lease: Acquired lease with expiration and renewal information
62
"""
63
64
def renew_lease(self, request: RenewLeaseRequest, *, retry=None, timeout=None, metadata=()) -> Lease:
65
"""
66
Renews an existing lease to extend its validity period.
67
68
Args:
69
request (RenewLeaseRequest): Required. Request containing lease renewal parameters
70
retry: Retry configuration for the request
71
timeout: Timeout for the request
72
metadata: Additional metadata for the request
73
74
Returns:
75
Lease: Renewed lease with updated expiration time
76
"""
77
78
def release_lease(self, request: ReleaseLeaseRequest, *, retry=None, timeout=None, metadata=()) -> ReleaseLeaseResponse:
79
"""
80
Releases an active lease to free up streaming resources.
81
82
Args:
83
request (ReleaseLeaseRequest): Required. Request containing lease release parameters
84
retry: Retry configuration for the request
85
timeout: Timeout for the request
86
metadata: Additional metadata for the request
87
88
Returns:
89
ReleaseLeaseResponse: Confirmation of lease release
90
"""
91
```
92
93
## Types
94
95
### Streaming Request and Response Types
96
97
```python { .api }
98
class SendPacketsRequest:
99
"""Request for sending packets to a stream."""
100
# Union field oneof request:
101
setup_request: SendPacketsRequestSetupRequest # Initial setup request
102
packet: Packet # Video/audio packet to send
103
104
class SendPacketsRequestSetupRequest:
105
"""Setup request for packet sending session."""
106
series: str # Series resource path
107
lease_id: str # Lease ID for stream access
108
109
class SendPacketsResponse:
110
"""Response from packet sending operation."""
111
packet_id: str # Unique packet identifier
112
113
class ReceivePacketsRequest:
114
"""Request for receiving packets from a stream."""
115
# Union field oneof request:
116
setup_request: ReceivePacketsRequestSetupRequest # Initial setup request
117
commit_request: CommitRequest # Commit received packets
118
119
class ReceivePacketsRequestSetupRequest:
120
"""Setup request for packet receiving session."""
121
series: str # Series resource path
122
receiver: str # Receiver identifier
123
heartbeat_interval: Duration # Heartbeat frequency
124
writes_done_grace_period: Duration # Grace period for writes completion
125
126
class ReceivePacketsResponse:
127
"""Response containing received packets."""
128
packet: Packet # Received video/audio packet
129
130
class ReceiveEventsRequest:
131
"""Request for receiving events from a stream."""
132
# Union field oneof request:
133
setup_request: ReceiveEventsRequestSetupRequest # Initial setup request
134
commit_request: CommitRequest # Commit received events
135
136
class ReceiveEventsRequestSetupRequest:
137
"""Setup request for event receiving session."""
138
series: str # Series resource path
139
receiver: str # Receiver identifier
140
heartbeat_interval: Duration # Heartbeat frequency
141
writes_done_grace_period: Duration # Grace period for writes completion
142
143
class ReceiveEventsResponse:
144
"""Response containing received events."""
145
event_update: EventUpdate # Event update information
146
147
class CommitRequest:
148
"""Request to commit received data."""
149
offset: int # Offset to commit up to
150
```
151
152
### Packet Types
153
154
```python { .api }
155
class Packet:
156
"""Video or audio packet with metadata."""
157
header: PacketHeader # Packet metadata and timing
158
payload: bytes # Packet data payload
159
160
class PacketHeader:
161
"""Metadata for video/audio packets."""
162
capture_time: Timestamp # When packet was captured
163
server_metadata: ServerMetadata # Server-side metadata
164
series_metadata: SeriesMetadata # Series metadata
165
flags: PacketHeaderFlag # Packet flags
166
trace_context: str # Tracing context
167
# Union field oneof packet_type:
168
gstreamer_buffer_descriptor: GstreamerBufferDescriptor # GStreamer buffer info
169
raw_image_descriptor: RawImageDescriptor # Raw image format info
170
171
class ServerMetadata:
172
"""Server-side metadata for packets."""
173
offset: int # Packet offset in stream
174
ingest_time: Timestamp # Server ingestion time
175
176
class SeriesMetadata:
177
"""Metadata about the packet series."""
178
series: str # Series resource path
179
180
class GstreamerBufferDescriptor:
181
"""GStreamer-specific buffer information."""
182
caps_string: str # GStreamer capabilities string
183
is_key_frame: bool # Whether this is a key frame
184
pts_time: Duration # Presentation timestamp
185
dts_time: Duration # Decode timestamp
186
duration: Duration # Buffer duration
187
188
class RawImageDescriptor:
189
"""Raw image format descriptor."""
190
format: RawImageDescriptorFormat # Image format
191
width: int # Image width in pixels
192
height: int # Image height in pixels
193
194
class PacketHeaderFlag(Enum):
195
"""Flags for packet headers."""
196
FLAG_UNSPECIFIED = 0
197
IMMUTABLE = 1 # Packet is immutable
198
199
class RawImageDescriptorFormat(Enum):
200
"""Raw image format types."""
201
FORMAT_UNSPECIFIED = 0
202
SRGB = 1 # sRGB format
203
```
204
205
### Lease Request Types
206
207
```python { .api }
208
class AcquireLeaseRequest:
209
"""Request message for acquiring a lease."""
210
series: str # The series name
211
owner: str # The owner name
212
term: Duration # The lease term
213
lease_type: LeaseType # The lease type (optional)
214
215
class RenewLeaseRequest:
216
"""Request message for renewing a lease."""
217
id: str # Lease id
218
series: str # Series name
219
owner: str # Lease owner
220
term: Duration # Lease term
221
222
class ReleaseLeaseRequest:
223
"""Request message for releasing lease."""
224
id: str # Lease id
225
series: str # Series name
226
owner: str # Lease owner
227
```
228
229
### Lease Types
230
231
```python { .api }
232
class Lease:
233
"""Lease on streaming resources."""
234
id: str # Unique lease identifier
235
series: str # Series resource path
236
owner: str # Lease owner identifier
237
expire_time: Timestamp # Lease expiration time
238
lease_type: LeaseType # Type of lease
239
240
class LeaseType(Enum):
241
"""Types of streaming leases."""
242
LEASE_TYPE_UNSPECIFIED = 0
243
READER = 1 # Read-only lease
244
WRITER = 2 # Write-only lease
245
246
class ReleaseLeaseResponse:
247
"""Response from lease release operation."""
248
pass # Confirmation response
249
```
250
251
### Event Types
252
253
```python { .api }
254
class EventUpdate:
255
"""Update information for stream events."""
256
stream: str # Stream resource path
257
event: str # Event identifier
258
series: str # Series resource path
259
update_time: Timestamp # Time of update
260
offset: int # Event offset in stream
261
```
262
263
### Streaming Modes
264
265
```python { .api }
266
class RequestMetadata:
267
"""Metadata for streaming requests."""
268
# Union field oneof mode:
269
eager_mode: EagerMode # Eager streaming mode
270
controlled_mode: ControlledMode # Controlled streaming mode
271
272
class EagerMode:
273
"""Eager streaming mode configuration."""
274
pass # Stream data as fast as possible
275
276
class ControlledMode:
277
"""Controlled streaming mode configuration."""
278
starting_logical_offset: str # Starting offset for controlled streaming
279
fallback_starting_offset: str # Fallback offset if starting offset unavailable
280
```
281
282
## Usage Examples
283
284
### Real-Time Packet Streaming
285
286
```python
287
from google.cloud import visionai_v1
288
import asyncio
289
from typing import Iterator
290
291
async def send_video_stream():
292
"""Example of sending video packets to a stream."""
293
294
async with visionai_v1.StreamingServiceAsyncClient() as client:
295
# Create request iterator
296
def request_iterator() -> Iterator[visionai_v1.SendPacketsRequest]:
297
# Send setup request first
298
yield visionai_v1.SendPacketsRequest(
299
setup_request=visionai_v1.SendPacketsRequestSetupRequest(
300
series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1/series/video",
301
lease_id="lease-12345"
302
)
303
)
304
305
# Send video packets
306
for i in range(100):
307
packet = visionai_v1.Packet(
308
header=visionai_v1.PacketHeader(
309
capture_time={"seconds": int(time.time())},
310
gstreamer_buffer_descriptor=visionai_v1.GstreamerBufferDescriptor(
311
caps_string="video/x-raw,format=RGB,width=1920,height=1080",
312
is_key_frame=(i % 30 == 0), # Key frame every 30 frames
313
pts_time={"nanos": i * 33333333} # 30 FPS
314
)
315
),
316
payload=generate_video_frame() # Your video frame data
317
)
318
319
yield visionai_v1.SendPacketsRequest(packet=packet)
320
321
# Send packets and process responses
322
async for response in client.send_packets(request_iterator()):
323
print(f"Sent packet ID: {response.packet_id}")
324
325
async def receive_video_stream():
326
"""Example of receiving video packets from a stream."""
327
328
async with visionai_v1.StreamingServiceAsyncClient() as client:
329
def request_iterator() -> Iterator[visionai_v1.ReceivePacketsRequest]:
330
# Send setup request
331
yield visionai_v1.ReceivePacketsRequest(
332
setup_request=visionai_v1.ReceivePacketsRequestSetupRequest(
333
series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/output/series/video",
334
receiver="video-receiver-001",
335
heartbeat_interval={"seconds": 30}
336
)
337
)
338
339
# Receive packets
340
async for response in client.receive_packets(request_iterator()):
341
packet = response.packet
342
print(f"Received packet at offset: {packet.header.server_metadata.offset}")
343
process_video_frame(packet.payload) # Your processing logic
344
```
345
346
### Lease-Based Resource Management
347
348
```python
349
from google.cloud import visionai_v1
350
import time
351
352
def manage_stream_lease():
353
"""Example of lease management for exclusive stream access."""
354
355
client = visionai_v1.StreamingServiceClient()
356
357
# Acquire lease
358
session = "projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1"
359
owner = "video-processor-001"
360
lease_duration = {"seconds": 300} # 5 minute lease
361
362
lease = client.acquire_lease(
363
request=visionai_v1.AcquireLeaseRequest(
364
series=f"{session}/series/video",
365
owner=owner,
366
term=lease_duration,
367
lease_type=visionai_v1.LeaseType.WRITER
368
)
369
)
370
371
print(f"Acquired lease ID: {lease.id}")
372
print(f"Expires at: {lease.expire_time}")
373
374
try:
375
# Use the stream with exclusive access
376
perform_stream_operations()
377
378
# Renew lease if needed (before expiration)
379
time.sleep(240) # Wait 4 minutes
380
renewed_lease = client.renew_lease(
381
request=visionai_v1.RenewLeaseRequest(
382
id=lease.id,
383
series=f"{session}/series/video",
384
owner=owner,
385
term=lease_duration
386
)
387
)
388
389
print(f"Renewed lease, new expiration: {renewed_lease.expire_time}")
390
391
finally:
392
# Always release lease when done
393
client.release_lease(
394
request=visionai_v1.ReleaseLeaseRequest(
395
id=lease.id,
396
series=f"{session}/series/video",
397
owner=owner
398
)
399
)
400
print("Lease released")
401
402
def perform_stream_operations():
403
"""Placeholder for stream operations while holding lease."""
404
pass
405
406
def generate_video_frame() -> bytes:
407
"""Placeholder for video frame generation."""
408
return b"video_frame_data"
409
410
def process_video_frame(frame_data: bytes):
411
"""Placeholder for video frame processing."""
412
pass
413
```
414
415
### Event Stream Processing
416
417
```python
418
async def process_stream_events():
419
"""Example of receiving and processing stream events."""
420
421
async with visionai_v1.StreamingServiceAsyncClient() as client:
422
def request_iterator() -> Iterator[visionai_v1.ReceiveEventsRequest]:
423
yield visionai_v1.ReceiveEventsRequest(
424
setup_request=visionai_v1.ReceiveEventsRequestSetupRequest(
425
series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1/series/events",
426
receiver="event-processor-001",
427
heartbeat_interval={"seconds": 60}
428
)
429
)
430
431
# Process incoming events
432
async for response in client.receive_events(request_iterator()):
433
event_update = response.event_update
434
435
print(f"Received event from stream: {event_update.stream}")
436
print(f"Event ID: {event_update.event}")
437
print(f"Update time: {event_update.update_time}")
438
print(f"Offset: {event_update.offset}")
439
440
# Process the event based on your application logic
441
await handle_stream_event(event_update)
442
443
async def handle_stream_event(event_update: visionai_v1.EventUpdate):
444
"""Handle individual stream events."""
445
# Your event processing logic here
446
pass
447
448
# Run the async examples
449
if __name__ == "__main__":
450
asyncio.run(send_video_stream())
451
asyncio.run(receive_video_stream())
452
asyncio.run(process_stream_events())
453
```