or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

app-platform.mdhealth-check.mdindex.mdlive-video-analytics.mdstreaming.mdstreams-management.mdtypes.mdwarehouse.md

streams-management.mddocs/

0

# Stream Infrastructure Management

1

2

Infrastructure management for clusters, streams, events, and time series data organization. The StreamsService provides the foundational infrastructure for organizing and managing video streaming resources at scale.

3

4

## Capabilities

5

6

### Cluster Management

7

8

Create and manage streaming clusters that serve as containers for streams and provide resource isolation and scaling.

9

10

```python { .api }

11

def list_clusters(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListClustersResponse:

12

"""

13

Lists clusters in a project and location.

14

15

Args:

16

parent (str): Required. Parent resource path "projects/{project}/locations/{location}"

17

page_size (int): Maximum number of clusters to return

18

page_token (str): Token for pagination

19

filter (str): Filter expression for clusters

20

order_by (str): Sort order for results

21

22

Returns:

23

ListClustersResponse: Response with clusters and pagination

24

"""

25

26

def get_cluster(self, name: str) -> Cluster:

27

"""

28

Retrieves cluster details and configuration.

29

30

Args:

31

name (str): Required. Cluster resource path

32

"projects/{project}/locations/{location}/clusters/{cluster}"

33

34

Returns:

35

Cluster: The cluster resource with configuration and status

36

"""

37

38

def create_cluster(self, parent: str, cluster: Cluster, cluster_id: str) -> Operation:

39

"""

40

Creates a new streaming cluster.

41

42

Args:

43

parent (str): Required. Parent resource path "projects/{project}/locations/{location}"

44

cluster (Cluster): Required. Cluster configuration

45

cluster_id (str): Required. ID for the new cluster

46

47

Returns:

48

Operation: Long-running operation for cluster creation

49

"""

50

51

def update_cluster(self, cluster: Cluster, update_mask: FieldMask = None) -> Operation:

52

"""

53

Updates cluster configuration and settings.

54

55

Args:

56

cluster (Cluster): Required. Updated cluster configuration

57

update_mask (FieldMask): Fields to update

58

59

Returns:

60

Operation: Long-running operation for cluster update

61

"""

62

63

def delete_cluster(self, name: str) -> Operation:

64

"""

65

Deletes a streaming cluster and all contained resources.

66

67

Args:

68

name (str): Required. Cluster resource path to delete

69

70

Returns:

71

Operation: Long-running operation for cluster deletion

72

"""

73

```

74

75

### Stream Management

76

77

Manage individual streams within clusters for organizing video data flows and processing pipelines.

78

79

```python { .api }

80

def list_streams(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListStreamsResponse:

81

"""

82

Lists streams in a cluster.

83

84

Args:

85

parent (str): Required. Cluster resource path

86

page_size (int): Maximum number of streams to return

87

page_token (str): Token for pagination

88

filter (str): Filter expression for streams

89

order_by (str): Sort order for results

90

91

Returns:

92

ListStreamsResponse: Response with streams and pagination

93

"""

94

95

def get_stream(self, name: str) -> Stream:

96

"""

97

Retrieves stream details and configuration.

98

99

Args:

100

name (str): Required. Stream resource path

101

"projects/{project}/locations/{location}/clusters/{cluster}/streams/{stream}"

102

103

Returns:

104

Stream: The stream resource with configuration and status

105

"""

106

107

def create_stream(self, parent: str, stream: Stream, stream_id: str) -> Operation:

108

"""

109

Creates a new stream within a cluster.

110

111

Args:

112

parent (str): Required. Cluster resource path

113

stream (Stream): Required. Stream configuration

114

stream_id (str): Required. ID for the new stream

115

116

Returns:

117

Operation: Long-running operation for stream creation

118

"""

119

120

def update_stream(self, stream: Stream, update_mask: FieldMask = None) -> Operation:

121

"""

122

Updates stream configuration and settings.

123

124

Args:

125

stream (Stream): Required. Updated stream configuration

126

update_mask (FieldMask): Fields to update

127

128

Returns:

129

Operation: Long-running operation for stream update

130

"""

131

132

def delete_stream(self, name: str) -> Operation:

133

"""

134

Deletes a stream and associated data.

135

136

Args:

137

name (str): Required. Stream resource path to delete

138

139

Returns:

140

Operation: Long-running operation for stream deletion

141

"""

142

143

def get_stream_thumbnail(self, stream: str, gcs_object_name: str, event: str = None) -> Operation:

144

"""

145

Generates and retrieves a thumbnail image for a stream.

146

147

Args:

148

stream (str): Required. Stream resource path

149

gcs_object_name (str): Required. GCS object name for thumbnail storage

150

event (str): Specific event for thumbnail generation

151

152

Returns:

153

Operation: Long-running operation for thumbnail generation

154

"""

155

156

def generate_stream_hls_token(self, stream: str) -> GenerateStreamHlsTokenResponse:

157

"""

158

Generates HLS streaming token for stream access.

159

160

Args:

161

stream (str): Required. Stream resource path

162

163

Returns:

164

GenerateStreamHlsTokenResponse: HLS token and streaming URLs

165

"""

166

```

167

168

### Event Management

169

170

Manage events within streams for tracking significant occurrences and organizing temporal data.

171

172

```python { .api }

173

def list_events(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListEventsResponse:

174

"""

175

Lists events in a cluster.

176

177

Args:

178

parent (str): Required. Cluster resource path

179

page_size (int): Maximum number of events to return

180

page_token (str): Token for pagination

181

filter (str): Filter expression for events

182

order_by (str): Sort order for results

183

184

Returns:

185

ListEventsResponse: Response with events and pagination

186

"""

187

188

def get_event(self, name: str) -> Event:

189

"""

190

Retrieves event details and metadata.

191

192

Args:

193

name (str): Required. Event resource path

194

"projects/{project}/locations/{location}/clusters/{cluster}/events/{event}"

195

196

Returns:

197

Event: The event resource with metadata and timing

198

"""

199

200

def create_event(self, parent: str, event: Event, event_id: str) -> Operation:

201

"""

202

Creates a new event in a cluster.

203

204

Args:

205

parent (str): Required. Cluster resource path

206

event (Event): Required. Event configuration and metadata

207

event_id (str): Required. ID for the new event

208

209

Returns:

210

Operation: Long-running operation for event creation

211

"""

212

213

def update_event(self, event: Event, update_mask: FieldMask = None) -> Operation:

214

"""

215

Updates event metadata and configuration.

216

217

Args:

218

event (Event): Required. Updated event configuration

219

update_mask (FieldMask): Fields to update

220

221

Returns:

222

Operation: Long-running operation for event update

223

"""

224

225

def delete_event(self, name: str) -> Operation:

226

"""

227

Deletes an event and associated metadata.

228

229

Args:

230

name (str): Required. Event resource path to delete

231

232

Returns:

233

Operation: Long-running operation for event deletion

234

"""

235

```

236

237

### Series Management

238

239

Manage time series data within streams for organizing sequential data and enabling temporal analysis.

240

241

```python { .api }

242

def list_series(self, parent: str, page_size: int = None, page_token: str = None, filter: str = None, order_by: str = None) -> ListSeriesResponse:

243

"""

244

Lists series in a cluster.

245

246

Args:

247

parent (str): Required. Cluster resource path

248

page_size (int): Maximum number of series to return

249

page_token (str): Token for pagination

250

filter (str): Filter expression for series

251

order_by (str): Sort order for results

252

253

Returns:

254

ListSeriesResponse: Response with series and pagination

255

"""

256

257

def get_series(self, name: str) -> Series:

258

"""

259

Retrieves series details and metadata.

260

261

Args:

262

name (str): Required. Series resource path

263

"projects/{project}/locations/{location}/clusters/{cluster}/series/{series}"

264

265

Returns:

266

Series: The series resource with metadata and configuration

267

"""

268

269

def create_series(self, parent: str, series: Series, series_id: str) -> Operation:

270

"""

271

Creates a new time series in a cluster.

272

273

Args:

274

parent (str): Required. Cluster resource path

275

series (Series): Required. Series configuration and metadata

276

series_id (str): Required. ID for the new series

277

278

Returns:

279

Operation: Long-running operation for series creation

280

"""

281

282

def update_series(self, series: Series, update_mask: FieldMask = None) -> Operation:

283

"""

284

Updates series metadata and configuration.

285

286

Args:

287

series (Series): Required. Updated series configuration

288

update_mask (FieldMask): Fields to update

289

290

Returns:

291

Operation: Long-running operation for series update

292

"""

293

294

def delete_series(self, name: str) -> Operation:

295

"""

296

Deletes a series and associated data.

297

298

Args:

299

name (str): Required. Series resource path to delete

300

301

Returns:

302

Operation: Long-running operation for series deletion

303

"""

304

```

305

306

### Channel Operations

307

308

Manage channels for materializing data streams and enabling data access patterns.

309

310

```python { .api }

311

def materialize_channel(self, parent: str, channel_id: str, channel: Channel) -> Operation:

312

"""

313

Materializes a channel from series data.

314

315

Args:

316

parent (str): Required. Parent resource path

317

channel_id (str): Required. ID for the materialized channel

318

channel (Channel): Required. Channel configuration

319

320

Returns:

321

Operation: Long-running operation for channel materialization

322

"""

323

```

324

325

## Types

326

327

### Cluster Resources

328

329

```python { .api }

330

class Cluster:

331

"""Streaming cluster configuration and status."""

332

name: str # Resource name

333

display_name: str # Human-readable name

334

description: str # Cluster description

335

state: ClusterState # Current cluster state

336

psc_target: str # Private Service Connect target

337

create_time: Timestamp # Creation timestamp

338

update_time: Timestamp # Last update timestamp

339

labels: Dict[str, str] # Resource labels

340

341

class ClusterState(Enum):

342

"""Cluster operational states."""

343

STATE_UNSPECIFIED = 0

344

PROVISIONING = 1 # Cluster being provisioned

345

RUNNING = 2 # Cluster operational

346

STOPPING = 3 # Cluster being stopped

347

ERROR = 4 # Cluster in error state

348

```

349

350

### Stream Resources

351

352

```python { .api }

353

class Stream:

354

"""Stream configuration and metadata."""

355

name: str # Resource name

356

display_name: str # Human-readable name

357

description: str # Stream description

358

enable_hls_playback: bool # Enable HLS playback

359

media_warehouse_asset: str # Associated warehouse asset

360

create_time: Timestamp # Creation timestamp

361

update_time: Timestamp # Last update timestamp

362

labels: Dict[str, str] # Resource labels

363

364

class GenerateStreamHlsTokenResponse:

365

"""Response for HLS token generation."""

366

token: str # HLS streaming token

367

expiration_time: Timestamp # Token expiration time

368

```

369

370

### Event Resources

371

372

```python { .api }

373

class Event:

374

"""Event metadata and timing information."""

375

name: str # Resource name

376

display_name: str # Human-readable name

377

description: str # Event description

378

create_time: Timestamp # Creation timestamp

379

update_time: Timestamp # Last update timestamp

380

labels: Dict[str, str] # Resource labels

381

# Union field oneof clock:

382

grace_period: Duration # Grace period for event

383

alignment_clock: AlignmentClock # Clock alignment for event

384

385

class AlignmentClock(Enum):

386

"""Clock alignment types for events."""

387

ALIGNMENT_CLOCK_UNSPECIFIED = 0

388

LIVE = 1 # Live clock alignment

389

CAPTURE = 2 # Capture time alignment

390

```

391

392

### Series Resources

393

394

```python { .api }

395

class Series:

396

"""Time series metadata and configuration."""

397

name: str # Resource name

398

display_name: str # Human-readable name

399

description: str # Series description

400

stream: str # Associated stream resource

401

event: str # Associated event resource

402

create_time: Timestamp # Creation timestamp

403

update_time: Timestamp # Last update timestamp

404

labels: Dict[str, str] # Resource labels

405

```

406

407

### Channel Resources

408

409

```python { .api }

410

class Channel:

411

"""Channel configuration for data materialization."""

412

stream: str # Source stream for channel

413

event: str # Source event for channel

414

# Union field oneof input_config:

415

input_config: ChannelInputConfig # Input configuration

416

417

class ChannelInputConfig:

418

"""Input configuration for channel."""

419

pass # Configuration for channel inputs

420

```

421

422

## Usage Examples

423

424

### Infrastructure Setup Workflow

425

426

```python

427

from google.cloud import visionai_v1

428

429

# Create client

430

client = visionai_v1.StreamsServiceClient()

431

432

# Define paths

433

parent = "projects/my-project/locations/us-central1"

434

435

# Step 1: Create cluster

436

cluster = visionai_v1.Cluster(

437

display_name="Video Processing Cluster",

438

description="Cluster for real-time video processing",

439

labels={

440

"environment": "production",

441

"team": "video-analytics"

442

}

443

)

444

445

create_cluster_op = client.create_cluster(

446

parent=parent,

447

cluster=cluster,

448

cluster_id="video-cluster"

449

)

450

451

cluster_result = create_cluster_op.result()

452

cluster_path = cluster_result.name

453

454

print(f"Created cluster: {cluster_path}")

455

456

# Step 2: Create streams within cluster

457

streams = [

458

{

459

"id": "camera-1-stream",

460

"config": visionai_v1.Stream(

461

display_name="Camera 1 Stream",

462

description="Security camera 1 video stream",

463

enable_hls_playback=True

464

)

465

},

466

{

467

"id": "camera-2-stream",

468

"config": visionai_v1.Stream(

469

display_name="Camera 2 Stream",

470

description="Security camera 2 video stream",

471

enable_hls_playback=True

472

)

473

}

474

]

475

476

created_streams = []

477

for stream_info in streams:

478

create_stream_op = client.create_stream(

479

parent=cluster_path,

480

stream=stream_info["config"],

481

stream_id=stream_info["id"]

482

)

483

484

stream_result = create_stream_op.result()

485

created_streams.append(stream_result)

486

print(f"Created stream: {stream_result.name}")

487

488

# Step 3: Create events for temporal organization

489

events = [

490

{

491

"id": "motion-detection-event",

492

"config": visionai_v1.Event(

493

display_name="Motion Detection Event",

494

description="Motion detected in surveillance area",

495

grace_period={"seconds": 30}

496

)

497

},

498

{

499

"id": "person-detected-event",

500

"config": visionai_v1.Event(

501

display_name="Person Detected Event",

502

description="Person detected in restricted area",

503

grace_period={"seconds": 10}

504

)

505

}

506

]

507

508

created_events = []

509

for event_info in events:

510

create_event_op = client.create_event(

511

parent=cluster_path,

512

event=event_info["config"],

513

event_id=event_info["id"]

514

)

515

516

event_result = create_event_op.result()

517

created_events.append(event_result)

518

print(f"Created event: {event_result.name}")

519

520

# Step 4: Create time series for data organization

521

for i, stream in enumerate(created_streams):

522

for j, event in enumerate(created_events):

523

series = visionai_v1.Series(

524

display_name=f"Series {i+1}-{j+1}",

525

description=f"Time series for stream {i+1} and event {j+1}",

526

stream=stream.name,

527

event=event.name

528

)

529

530

create_series_op = client.create_series(

531

parent=cluster_path,

532

series=series,

533

series_id=f"series-{i+1}-{j+1}"

534

)

535

536

series_result = create_series_op.result()

537

print(f"Created series: {series_result.name}")

538

```

539

540

### Stream Management Operations

541

542

```python

543

def manage_stream_lifecycle():

544

"""Example of managing stream lifecycle operations."""

545

546

client = visionai_v1.StreamsServiceClient()

547

548

# List all streams in cluster

549

cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"

550

streams = client.list_streams(parent=cluster_path)

551

552

for stream in streams:

553

print(f"Stream: {stream.name}")

554

print(f" Display Name: {stream.display_name}")

555

print(f" HLS Enabled: {stream.enable_hls_playback}")

556

print(f" Created: {stream.create_time}")

557

558

# Generate HLS token for streaming

559

if stream.enable_hls_playback:

560

hls_response = client.generate_stream_hls_token(stream=stream.name)

561

print(f" HLS Token: {hls_response.token}")

562

print(f" Token Expires: {hls_response.expiration_time}")

563

564

# Generate thumbnail

565

thumbnail_op = client.get_stream_thumbnail(

566

stream=stream.name,

567

gcs_object_name=f"thumbnails/{stream.name.split('/')[-1]}.jpg"

568

)

569

570

thumbnail_result = thumbnail_op.result()

571

print(f" Thumbnail generated")

572

573

def monitor_events_and_series():

574

"""Monitor events and series in a cluster."""

575

576

client = visionai_v1.StreamsServiceClient()

577

cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"

578

579

# List and monitor events

580

events = client.list_events(

581

parent=cluster_path,

582

filter='labels.priority="high"',

583

order_by="create_time desc"

584

)

585

586

for event in events:

587

print(f"Event: {event.display_name}")

588

print(f" Description: {event.description}")

589

print(f" Created: {event.create_time}")

590

591

# Find associated series

592

series_list = client.list_series(

593

parent=cluster_path,

594

filter=f'event="{event.name}"'

595

)

596

597

for series in series_list:

598

print(f" Associated Series: {series.display_name}")

599

print(f" Stream: {series.stream}")

600

```

601

602

### Channel Materialization

603

604

```python

605

def materialize_data_channels():

606

"""Example of materializing channels from series data."""

607

608

client = visionai_v1.StreamsServiceClient()

609

cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"

610

611

# Define channel configuration

612

channel = visionai_v1.Channel(

613

stream="projects/my-project/locations/us-central1/clusters/video-cluster/streams/camera-1-stream",

614

event="projects/my-project/locations/us-central1/clusters/video-cluster/events/motion-detection-event",

615

input_config=visionai_v1.ChannelInputConfig()

616

)

617

618

# Materialize the channel

619

materialize_op = client.materialize_channel(

620

parent=cluster_path,

621

channel_id="motion-detection-channel",

622

channel=channel

623

)

624

625

channel_result = materialize_op.result()

626

print(f"Materialized channel: {channel_result}")

627

628

def cleanup_resources():

629

"""Clean up streaming resources."""

630

631

client = visionai_v1.StreamsServiceClient()

632

cluster_path = "projects/my-project/locations/us-central1/clusters/video-cluster"

633

634

# Delete series first (dependent resources)

635

series_list = client.list_series(parent=cluster_path)

636

for series in series_list:

637

delete_op = client.delete_series(name=series.name)

638

delete_op.result()

639

print(f"Deleted series: {series.name}")

640

641

# Delete events

642

events_list = client.list_events(parent=cluster_path)

643

for event in events_list:

644

delete_op = client.delete_event(name=event.name)

645

delete_op.result()

646

print(f"Deleted event: {event.name}")

647

648

# Delete streams

649

streams_list = client.list_streams(parent=cluster_path)

650

for stream in streams_list:

651

delete_op = client.delete_stream(name=stream.name)

652

delete_op.result()

653

print(f"Deleted stream: {stream.name}")

654

655

# Finally delete cluster

656

delete_cluster_op = client.delete_cluster(name=cluster_path)

657

delete_cluster_op.result()

658

print(f"Deleted cluster: {cluster_path}")

659

```