0
# Stream Infrastructure Management
1
2
Infrastructure management for clusters, streams, events, and time series data organization. The StreamsService provides the foundational infrastructure for organizing and managing video streaming resources at scale.
3
4
## Capabilities
5
6
### Cluster Management
7
8
Create and manage streaming clusters that serve as containers for streams and provide resource isolation and scaling.
9
10
```python { .api }
11
def list_clusters(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListClustersResponse:
12
"""
13
Lists clusters in a project and location.
14
15
Args:
16
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
17
page_size (int): Maximum number of clusters to return
18
page_token (str): Token for pagination
19
filter (str): Filter expression for clusters
20
order_by (str): Sort order for results
21
22
Returns:
23
ListClustersResponse: Response with clusters and pagination
24
"""
25
26
def get_cluster(self, name: str) -> Cluster:
27
"""
28
Retrieves cluster details and configuration.
29
30
Args:
31
name (str): Required. Cluster resource path
32
"projects/{project}/locations/{location}/clusters/{cluster}"
33
34
Returns:
35
Cluster: The cluster resource with configuration and status
36
"""
37
38
def create_cluster(self, parent: str, cluster: Cluster, cluster_id: str) -> Operation:
39
"""
40
Creates a new streaming cluster.
41
42
Args:
43
parent (str): Required. Parent resource path "projects/{project}/locations/{location}"
44
cluster (Cluster): Required. Cluster configuration
45
cluster_id (str): Required. ID for the new cluster
46
47
Returns:
48
Operation: Long-running operation for cluster creation
49
"""
50
51
def update_cluster(self, cluster: Cluster, update_mask: FieldMask = None) -> Operation:
52
"""
53
Updates cluster configuration and settings.
54
55
Args:
56
cluster (Cluster): Required. Updated cluster configuration
57
update_mask (FieldMask): Fields to update
58
59
Returns:
60
Operation: Long-running operation for cluster update
61
"""
62
63
def delete_cluster(self, name: str) -> Operation:
64
"""
65
Deletes a streaming cluster and all contained resources.
66
67
Args:
68
name (str): Required. Cluster resource path to delete
69
70
Returns:
71
Operation: Long-running operation for cluster deletion
72
"""
73
```
74
75
### Stream Management
76
77
Manage individual streams within clusters for organizing video data flows and processing pipelines.
78
79
```python { .api }
80
def list_streams(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListStreamsResponse:
81
"""
82
Lists streams in a cluster.
83
84
Args:
85
parent (str): Required. Cluster resource path
86
page_size (int): Maximum number of streams to return
87
page_token (str): Token for pagination
88
filter (str): Filter expression for streams
89
order_by (str): Sort order for results
90
91
Returns:
92
ListStreamsResponse: Response with streams and pagination
93
"""
94
95
def get_stream(self, name: str) -> Stream:
96
"""
97
Retrieves stream details and configuration.
98
99
Args:
100
name (str): Required. Stream resource path
101
"projects/{project}/locations/{location}/clusters/{cluster}/streams/{stream}"
102
103
Returns:
104
Stream: The stream resource with configuration and status
105
"""
106
107
def create_stream(self, parent: str, stream: Stream, stream_id: str) -> Operation:
108
"""
109
Creates a new stream within a cluster.
110
111
Args:
112
parent (str): Required. Cluster resource path
113
stream (Stream): Required. Stream configuration
114
stream_id (str): Required. ID for the new stream
115
116
Returns:
117
Operation: Long-running operation for stream creation
118
"""
119
120
def update_stream(self, stream: Stream, update_mask: FieldMask = None) -> Operation:
121
"""
122
Updates stream configuration and settings.
123
124
Args:
125
stream (Stream): Required. Updated stream configuration
126
update_mask (FieldMask): Fields to update
127
128
Returns:
129
Operation: Long-running operation for stream update
130
"""
131
132
def delete_stream(self, name: str) -> Operation:
133
"""
134
Deletes a stream and associated data.
135
136
Args:
137
name (str): Required. Stream resource path to delete
138
139
Returns:
140
Operation: Long-running operation for stream deletion
141
"""
142
143
def get_stream_thumbnail(self, stream: str, gcs_object_name: str, event: str = None) -> Operation:
144
"""
145
Generates and retrieves a thumbnail image for a stream.
146
147
Args:
148
stream (str): Required. Stream resource path
149
gcs_object_name (str): Required. GCS object name for thumbnail storage
150
event (str): Specific event for thumbnail generation
151
152
Returns:
153
Operation: Long-running operation for thumbnail generation
154
"""
155
156
def generate_stream_hls_token(self, stream: str) -> GenerateStreamHlsTokenResponse:
157
"""
158
Generates HLS streaming token for stream access.
159
160
Args:
161
stream (str): Required. Stream resource path
162
163
Returns:
164
GenerateStreamHlsTokenResponse: HLS token and streaming URLs
165
"""
166
```
167
168
### Event Management
169
170
Manage events within streams for tracking significant occurrences and organizing temporal data.
171
172
```python { .api }
173
def list_events(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListEventsResponse:
174
"""
175
Lists events in a cluster.
176
177
Args:
178
parent (str): Required. Cluster resource path
179
page_size (int): Maximum number of events to return
180
page_token (str): Token for pagination
181
filter (str): Filter expression for events
182
order_by (str): Sort order for results
183
184
Returns:
185
ListEventsResponse: Response with events and pagination
186
"""
187
188
def get_event(self, name: str) -> Event:
189
"""
190
Retrieves event details and metadata.
191
192
Args:
193
name (str): Required. Event resource path
194
"projects/{project}/locations/{location}/clusters/{cluster}/events/{event}"
195
196
Returns:
197
Event: The event resource with metadata and timing
198
"""
199
200
def create_event(self, parent: str, event: Event, event_id: str) -> Operation:
201
"""
202
Creates a new event in a cluster.
203
204
Args:
205
parent (str): Required. Cluster resource path
206
event (Event): Required. Event configuration and metadata
207
event_id (str): Required. ID for the new event
208
209
Returns:
210
Operation: Long-running operation for event creation
211
"""
212
213
def update_event(self, event: Event, update_mask: FieldMask = None) -> Operation:
214
"""
215
Updates event metadata and configuration.
216
217
Args:
218
event (Event): Required. Updated event configuration
219
update_mask (FieldMask): Fields to update
220
221
Returns:
222
Operation: Long-running operation for event update
223
"""
224
225
def delete_event(self, name: str) -> Operation:
226
"""
227
Deletes an event and associated metadata.
228
229
Args:
230
name (str): Required. Event resource path to delete
231
232
Returns:
233
Operation: Long-running operation for event deletion
234
"""
235
```
236
237
### Series Management
238
239
Manage time series data within streams for organizing sequential data and enabling temporal analysis.
240
241
```python { .api }
242
def list_series(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListSeriesResponse:
243
"""
244
Lists series in a cluster.
245
246
Args:
247
parent (str): Required. Cluster resource path
248
page_size (int): Maximum number of series to return
249
page_token (str): Token for pagination
250
filter (str): Filter expression for series
251
order_by (str): Sort order for results
252
253
Returns:
254
ListSeriesResponse: Response with series and pagination
255
"""
256
257
def get_series(self, name: str) -> Series:
258
"""
259
Retrieves series details and metadata.
260
261
Args:
262
name (str): Required. Series resource path
263
"projects/{project}/locations/{location}/clusters/{cluster}/series/{series}"
264
265
Returns:
266
Series: The series resource with metadata and configuration
267
"""
268
269
def create_series(self, parent: str, series: Series, series_id: str) -> Operation:
270
"""
271
Creates a new time series in a cluster.
272
273
Args:
274
parent (str): Required. Cluster resource path
275
series (Series): Required. Series configuration and metadata
276
series_id (str): Required. ID for the new series
277
278
Returns:
279
Operation: Long-running operation for series creation
280
"""
281
282
def update_series(self, series: Series, update_mask: FieldMask = None) -> Operation:
283
"""
284
Updates series metadata and configuration.
285
286
Args:
287
series (Series): Required. Updated series configuration
288
update_mask (FieldMask): Fields to update
289
290
Returns:
291
Operation: Long-running operation for series update
292
"""
293
294
def delete_series(self, name: str) -> Operation:
295
"""
296
Deletes a series and associated data.
297
298
Args:
299
name (str): Required. Series resource path to delete
300
301
Returns:
302
Operation: Long-running operation for series deletion
303
"""
304
```
305
306
### Channel Operations
307
308
Manage channels for materializing data streams and enabling data access patterns.
309
310
```python { .api }
311
def materialize_channel(self, parent: str, channel_id: str, channel: Channel) -> Operation:
312
"""
313
Materializes a channel from series data.
314
315
Args:
316
parent (str): Required. Parent resource path
317
channel_id (str): Required. ID for the materialized channel
318
channel (Channel): Required. Channel configuration
319
320
Returns:
321
Operation: Long-running operation for channel materialization
322
"""
323
```
324
325
## Types
326
327
### Cluster Resources
328
329
```python { .api }
330
class Cluster:
331
"""Streaming cluster configuration and status."""
332
name: str # Resource name
333
display_name: str # Human-readable name
334
description: str # Cluster description
335
state: ClusterState # Current cluster state
336
psc_target: str # Private Service Connect target
337
create_time: Timestamp # Creation timestamp
338
update_time: Timestamp # Last update timestamp
339
labels: Dict[str, str] # Resource labels
340
341
class ClusterState(Enum):
342
"""Cluster operational states."""
343
STATE_UNSPECIFIED = 0
344
PROVISIONING = 1 # Cluster being provisioned
345
RUNNING = 2 # Cluster operational
346
STOPPING = 3 # Cluster being stopped
347
ERROR = 4 # Cluster in error state
348
```
349
350
### Stream Resources
351
352
```python { .api }
353
class Stream:
354
"""Stream configuration and metadata."""
355
name: str # Resource name
356
display_name: str # Human-readable name
357
description: str # Stream description
358
enable_hls_playback: bool # Enable HLS playback
359
media_warehouse_asset: str # Associated warehouse asset
360
create_time: Timestamp # Creation timestamp
361
update_time: Timestamp # Last update timestamp
362
labels: Dict[str, str] # Resource labels
363
364
class GenerateStreamHlsTokenResponse:
365
"""Response for HLS token generation."""
366
token: str # HLS streaming token
367
expiration_time: Timestamp # Token expiration time
368
```
369
370
### Event Resources
371
372
```python { .api }
373
class Event:
374
"""Event metadata and timing information."""
375
name: str # Resource name
376
display_name: str # Human-readable name
377
description: str # Event description
378
create_time: Timestamp # Creation timestamp
379
update_time: Timestamp # Last update timestamp
380
labels: Dict[str, str] # Resource labels
381
# Union field oneof clock:
382
grace_period: Duration # Grace period for event
383
alignment_clock: AlignmentClock # Clock alignment for event
384
385
class AlignmentClock(Enum):
386
"""Clock alignment types for events."""
387
ALIGNMENT_CLOCK_UNSPECIFIED = 0
388
LIVE = 1 # Live clock alignment
389
CAPTURE = 2 # Capture time alignment
390
```
391
392
### Series Resources
393
394
```python { .api }
395
class Series:
396
"""Time series metadata and configuration."""
397
name: str # Resource name
398
display_name: str # Human-readable name
399
description: str # Series description
400
stream: str # Associated stream resource
401
event: str # Associated event resource
402
create_time: Timestamp # Creation timestamp
403
update_time: Timestamp # Last update timestamp
404
labels: Dict[str, str] # Resource labels
405
```
406
407
### Channel Resources
408
409
```python { .api }
410
class Channel:
411
"""Channel configuration for data materialization."""
412
stream: str # Source stream for channel
413
event: str # Source event for channel
414
# Union field oneof input_config:
415
input_config: ChannelInputConfig # Input configuration
416
417
class ChannelInputConfig:
418
"""Input configuration for channel."""
419
pass # Configuration for channel inputs
420
```
421
422
## Usage Examples
423
424
### Infrastructure Setup Workflow
425
426
```python
427
from google.cloud import visionai_v1
428
429
# Create client
430
client = visionai_v1.StreamsServiceClient()
431
432
# Define paths
433
parent = "projects/my-project/locations/us-central1"
434
435
# Step 1: Create cluster
436
cluster = visionai_v1.Cluster(
437
display_name="Video Processing Cluster",
438
description="Cluster for real-time video processing",
439
labels={
440
"environment": "production",
441
"team": "video-analytics"
442
}
443
)
444
445
create_cluster_op = client.create_cluster(
446
parent=parent,
447
cluster=cluster,
448
cluster_id="video-cluster"
449
)
450
451
cluster_result = create_cluster_op.result()
452
cluster_path = cluster_result.name
453
454
print(f"Created cluster: {cluster_path}")
455
456
# Step 2: Create streams within cluster
457
streams = [
458
{
459
"id": "camera-1-stream",
460
"config": visionai_v1.Stream(
461
display_name="Camera 1 Stream",
462
description="Security camera 1 video stream",
463
enable_hls_playback=True
464
)
465
},
466
{
467
"id": "camera-2-stream",
468
"config": visionai_v1.Stream(
469
display_name="Camera 2 Stream",
470
description="Security camera 2 video stream",
471
enable_hls_playback=True
472
)
473
}
474
]
475
476
created_streams = []
477
for stream_info in streams:
478
create_stream_op = client.create_stream(
479
parent=cluster_path,
480
stream=stream_info["config"],
481
stream_id=stream_info["id"]
482
)
483
484
stream_result = create_stream_op.result()
485
created_streams.append(stream_result)
486
print(f"Created stream: {stream_result.name}")
487
488
# Step 3: Create events for temporal organization
489
events = [
490
{
491
"id": "motion-detection-event",
492
"config": visionai_v1.Event(
493
display_name="Motion Detection Event",
494
description="Motion detected in surveillance area",
495
grace_period={"seconds": 30}
496
)
497
},
498
{
499
"id": "person-detected-event",
500
"config": visionai_v1.Event(
501
display_name="Person Detected Event",
502
description="Person detected in restricted area",
503
grace_period={"seconds": 10}
504
)
505
}
506
]
507
508
created_events = []
509
for event_info in events:
510
create_event_op = client.create_event(
511
parent=cluster_path,
512
event=event_info["config"],
513
event_id=event_info["id"]
514
)
515
516
event_result = create_event_op.result()
517
created_events.append(event_result)
518
print(f"Created event: {event_result.name}")
519
520
# Step 4: Create time series for data organization
521
for i, stream in enumerate(created_streams):
522
for j, event in enumerate(created_events):
523
series = visionai_v1.Series(
524
display_name=f"Series {i+1}-{j+1}",
525
description=f"Time series for stream {i+1} and event {j+1}",
526
stream=stream.name,
527
event=event.name
528
)
529
530
create_series_op = client.create_series(
531
parent=cluster_path,
532
series=series,
533
series_id=f"series-{i+1}-{j+1}"
534
)
535
536
series_result = create_series_op.result()
537
print(f"Created series: {series_result.name}")
538
```
539
540
### Stream Management Operations
541
542
```python
543
def manage_stream_lifecycle():
544
"""Example of managing stream lifecycle operations."""
545
546
client = visionai_v1.StreamsServiceClient()
547
548
# List all streams in cluster
549
cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"
550
streams = client.list_streams(parent=cluster_path)
551
552
for stream in streams:
553
print(f"Stream: {stream.name}")
554
print(f" Display Name: {stream.display_name}")
555
print(f" HLS Enabled: {stream.enable_hls_playback}")
556
print(f" Created: {stream.create_time}")
557
558
# Generate HLS token for streaming
559
if stream.enable_hls_playback:
560
hls_response = client.generate_stream_hls_token(stream=stream.name)
561
print(f" HLS Token: {hls_response.token}")
562
print(f" Token Expires: {hls_response.expiration_time}")
563
564
# Generate thumbnail
565
thumbnail_op = client.get_stream_thumbnail(
566
stream=stream.name,
567
gcs_object_name=f"thumbnails/{stream.name.split('/')[-1]}.jpg"
568
)
569
570
thumbnail_result = thumbnail_op.result()
571
print(f" Thumbnail generated")
572
573
def monitor_events_and_series():
574
"""Monitor events and series in a cluster."""
575
576
client = visionai_v1.StreamsServiceClient()
577
cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"
578
579
# List and monitor events
580
events = client.list_events(
581
parent=cluster_path,
582
filter='labels.priority="high"',
583
order_by="create_time desc"
584
)
585
586
for event in events:
587
print(f"Event: {event.display_name}")
588
print(f" Description: {event.description}")
589
print(f" Created: {event.create_time}")
590
591
# Find associated series
592
series_list = client.list_series(
593
parent=cluster_path,
594
filter=f'event="{event.name}"'
595
)
596
597
for series in series_list:
598
print(f" Associated Series: {series.display_name}")
599
print(f" Stream: {series.stream}")
600
```
601
602
### Channel Materialization
603
604
```python
605
def materialize_data_channels():
606
"""Example of materializing channels from series data."""
607
608
client = visionai_v1.StreamsServiceClient()
609
cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"
610
611
# Define channel configuration
612
channel = visionai_v1.Channel(
613
stream="projects/my-project/locations/us-central1/clusters/video-cluster/streams/camera-1-stream",
614
event="projects/my-project/locations/us-central1/clusters/video-cluster/events/motion-detection-event",
615
input_config=visionai_v1.ChannelInputConfig()
616
)
617
618
# Materialize the channel
619
materialize_op = client.materialize_channel(
620
parent=cluster_path,
621
channel_id="motion-detection-channel",
622
channel=channel
623
)
624
625
channel_result = materialize_op.result()
626
print(f"Materialized channel: {channel_result}")
627
628
def cleanup_resources():
629
"""Clean up streaming resources."""
630
631
client = visionai_v1.StreamsServiceClient()
632
cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"
633
634
# Delete series first (dependent resources)
635
series_list = client.list_series(parent=cluster_path)
636
for series in series_list:
637
delete_op = client.delete_series(name=series.name)
638
delete_op.result()
639
print(f"Deleted series: {series.name}")
640
641
# Delete events
642
events_list = client.list_events(parent=cluster_path)
643
for event in events_list:
644
delete_op = client.delete_event(name=event.name)
645
delete_op.result()
646
print(f"Deleted event: {event.name}")
647
648
# Delete streams
649
streams_list = client.list_streams(parent=cluster_path)
650
for stream in streams_list:
651
delete_op = client.delete_stream(name=stream.name)
652
delete_op.result()
653
print(f"Deleted stream: {stream.name}")
654
655
# Finally delete cluster
656
delete_cluster_op = client.delete_cluster(name=cluster_path)
657
delete_cluster_op.result()
658
print(f"Deleted cluster: {cluster_path}")
659
```