or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

app-platform.mdhealth-check.mdindex.mdlive-video-analytics.mdstreaming.mdstreams-management.mdtypes.mdwarehouse.md

streaming.mddocs/

0

# Real-Time Streaming

1

2

Bidirectional streaming capabilities for video packets, events, and lease-based resource management. The StreamingService provides low-latency video streaming with precise resource control and event-driven processing for real-time applications.

3

4

## Capabilities

5

6

### Packet Streaming Operations

7

8

Send and receive video/audio packets in real-time with bidirectional streaming support for interactive applications.

9

10

```python { .api }

11

def send_packets(self, requests: Iterator[SendPacketsRequest]) -> Iterator[SendPacketsResponse]:

12

"""

13

Sends video packets to stream in bidirectional streaming mode.

14

15

Args:

16

requests (Iterator[SendPacketsRequest]): Stream of packet send requests

17

18

Yields:

19

SendPacketsResponse: Response for each sent packet with acknowledgments

20

"""

21

22

def receive_packets(self, requests: Iterator[ReceivePacketsRequest]) -> Iterator[ReceivePacketsResponse]:

23

"""

24

Receives video packets from stream in bidirectional streaming mode.

25

26

Args:

27

requests (Iterator[ReceivePacketsRequest]): Stream of packet receive requests

28

29

Yields:

30

ReceivePacketsResponse: Stream of received packets with metadata

31

"""

32

33

def receive_events(self, requests: Iterator[ReceiveEventsRequest]) -> Iterator[ReceiveEventsResponse]:

34

"""

35

Receives events from stream in bidirectional streaming mode.

36

37

Args:

38

requests (Iterator[ReceiveEventsRequest]): Stream of event receive requests

39

40

Yields:

41

ReceiveEventsResponse: Stream of received events and notifications

42

"""

43

```

44

45

### Lease Management

46

47

Acquire, renew, and release leases on streaming resources to ensure exclusive access and prevent conflicts.

48

49

```python { .api }

50

def acquire_lease(self, request: AcquireLeaseRequest, *, retry=None, timeout=None, metadata=()) -> Lease:

51

"""

52

Acquires a lease on a streaming session.

53

54

Args:

55

request (AcquireLeaseRequest): Required. Request containing lease parameters

56

retry: Retry configuration for the request

57

timeout: Timeout for the request

58

metadata: Additional metadata for the request

59

60

Returns:

61

Lease: Acquired lease with expiration and renewal information

62

"""

63

64

def renew_lease(self, request: RenewLeaseRequest, *, retry=None, timeout=None, metadata=()) -> Lease:

65

"""

66

Renews an existing lease to extend its validity period.

67

68

Args:

69

request (RenewLeaseRequest): Required. Request containing lease renewal parameters

70

retry: Retry configuration for the request

71

timeout: Timeout for the request

72

metadata: Additional metadata for the request

73

74

Returns:

75

Lease: Renewed lease with updated expiration time

76

"""

77

78

def release_lease(self, request: ReleaseLeaseRequest, *, retry=None, timeout=None, metadata=()) -> ReleaseLeaseResponse:

79

"""

80

Releases an active lease to free up streaming resources.

81

82

Args:

83

request (ReleaseLeaseRequest): Required. Request containing lease release parameters

84

retry: Retry configuration for the request

85

timeout: Timeout for the request

86

metadata: Additional metadata for the request

87

88

Returns:

89

ReleaseLeaseResponse: Confirmation of lease release

90

"""

91

```

92

93

## Types

94

95

### Streaming Request and Response Types

96

97

```python { .api }

98

class SendPacketsRequest:

99

"""Request for sending packets to a stream."""

100

# Union field oneof request:

101

setup_request: SendPacketsRequestSetupRequest # Initial setup request

102

packet: Packet # Video/audio packet to send

103

104

class SendPacketsRequestSetupRequest:

105

"""Setup request for packet sending session."""

106

series: str # Series resource path

107

lease_id: str # Lease ID for stream access

108

109

class SendPacketsResponse:

110

"""Response from packet sending operation."""

111

packet_id: str # Unique packet identifier

112

113

class ReceivePacketsRequest:

114

"""Request for receiving packets from a stream."""

115

# Union field oneof request:

116

setup_request: ReceivePacketsRequestSetupRequest # Initial setup request

117

commit_request: CommitRequest # Commit received packets

118

119

class ReceivePacketsRequestSetupRequest:

120

"""Setup request for packet receiving session."""

121

series: str # Series resource path

122

receiver: str # Receiver identifier

123

heartbeat_interval: Duration # Heartbeat frequency

124

writes_done_grace_period: Duration # Grace period for writes completion

125

126

class ReceivePacketsResponse:

127

"""Response containing received packets."""

128

packet: Packet # Received video/audio packet

129

130

class ReceiveEventsRequest:

131

"""Request for receiving events from a stream."""

132

# Union field oneof request:

133

setup_request: ReceiveEventsRequestSetupRequest # Initial setup request

134

commit_request: CommitRequest # Commit received events

135

136

class ReceiveEventsRequestSetupRequest:

137

"""Setup request for event receiving session."""

138

series: str # Series resource path

139

receiver: str # Receiver identifier

140

heartbeat_interval: Duration # Heartbeat frequency

141

writes_done_grace_period: Duration # Grace period for writes completion

142

143

class ReceiveEventsResponse:

144

"""Response containing received events."""

145

event_update: EventUpdate # Event update information

146

147

class CommitRequest:

148

"""Request to commit received data."""

149

offset: int # Offset to commit up to

150

```

151

152

### Packet Types

153

154

```python { .api }

155

class Packet:

156

"""Video or audio packet with metadata."""

157

header: PacketHeader # Packet metadata and timing

158

payload: bytes # Packet data payload

159

160

class PacketHeader:

161

"""Metadata for video/audio packets."""

162

capture_time: Timestamp # When packet was captured

163

server_metadata: ServerMetadata # Server-side metadata

164

series_metadata: SeriesMetadata # Series metadata

165

flags: PacketHeaderFlag # Packet flags

166

trace_context: str # Tracing context

167

# Union field oneof packet_type:

168

gstreamer_buffer_descriptor: GstreamerBufferDescriptor # GStreamer buffer info

169

raw_image_descriptor: RawImageDescriptor # Raw image format info

170

171

class ServerMetadata:

172

"""Server-side metadata for packets."""

173

offset: int # Packet offset in stream

174

ingest_time: Timestamp # Server ingestion time

175

176

class SeriesMetadata:

177

"""Metadata about the packet series."""

178

series: str # Series resource path

179

180

class GstreamerBufferDescriptor:

181

"""GStreamer-specific buffer information."""

182

caps_string: str # GStreamer capabilities string

183

is_key_frame: bool # Whether this is a key frame

184

pts_time: Duration # Presentation timestamp

185

dts_time: Duration # Decode timestamp

186

duration: Duration # Buffer duration

187

188

class RawImageDescriptor:

189

"""Raw image format descriptor."""

190

format: RawImageDescriptorFormat # Image format

191

width: int # Image width in pixels

192

height: int # Image height in pixels

193

194

class PacketHeaderFlag(Enum):

195

"""Flags for packet headers."""

196

FLAG_UNSPECIFIED = 0

197

IMMUTABLE = 1 # Packet is immutable

198

199

class RawImageDescriptorFormat(Enum):

200

"""Raw image format types."""

201

FORMAT_UNSPECIFIED = 0

202

SRGB = 1 # sRGB format

203

```

204

205

### Lease Request Types

206

207

```python { .api }

208

class AcquireLeaseRequest:

209

"""Request message for acquiring a lease."""

210

series: str # The series name

211

owner: str # The owner name

212

term: Duration # The lease term

213

lease_type: LeaseType # The lease type (optional)

214

215

class RenewLeaseRequest:

216

"""Request message for renewing a lease."""

217

id: str # Lease id

218

series: str # Series name

219

owner: str # Lease owner

220

term: Duration # Lease term

221

222

class ReleaseLeaseRequest:

223

"""Request message for releasing lease."""

224

id: str # Lease id

225

series: str # Series name

226

owner: str # Lease owner

227

```

228

229

### Lease Types

230

231

```python { .api }

232

class Lease:

233

"""Lease on streaming resources."""

234

id: str # Unique lease identifier

235

series: str # Series resource path

236

owner: str # Lease owner identifier

237

expire_time: Timestamp # Lease expiration time

238

lease_type: LeaseType # Type of lease

239

240

class LeaseType(Enum):

241

"""Types of streaming leases."""

242

LEASE_TYPE_UNSPECIFIED = 0

243

READER = 1 # Read-only lease

244

WRITER = 2 # Write-only lease

245

246

class ReleaseLeaseResponse:

247

"""Response from lease release operation."""

248

pass # Confirmation response

249

```

250

251

### Event Types

252

253

```python { .api }

254

class EventUpdate:

255

"""Update information for stream events."""

256

stream: str # Stream resource path

257

event: str # Event identifier

258

series: str # Series resource path

259

update_time: Timestamp # Time of update

260

offset: int # Event offset in stream

261

```

262

263

### Streaming Modes

264

265

```python { .api }

266

class RequestMetadata:

267

"""Metadata for streaming requests."""

268

# Union field oneof mode:

269

eager_mode: EagerMode # Eager streaming mode

270

controlled_mode: ControlledMode # Controlled streaming mode

271

272

class EagerMode:

273

"""Eager streaming mode configuration."""

274

pass # Stream data as fast as possible

275

276

class ControlledMode:

277

"""Controlled streaming mode configuration."""

278

starting_logical_offset: str # Starting offset for controlled streaming

279

fallback_starting_offset: str # Fallback offset if starting offset unavailable

280

```

281

282

## Usage Examples

283

284

### Real-Time Packet Streaming

285

286

```python

287

from google.cloud import visionai_v1

288

import asyncio

289

from typing import Iterator

290

291

async def send_video_stream():

292

"""Example of sending video packets to a stream."""

293

294

async with visionai_v1.StreamingServiceAsyncClient() as client:

295

# Create request iterator

296

def request_iterator() -> Iterator[visionai_v1.SendPacketsRequest]:

297

# Send setup request first

298

yield visionai_v1.SendPacketsRequest(

299

setup_request=visionai_v1.SendPacketsRequestSetupRequest(

300

series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1/series/video",

301

lease_id="lease-12345"

302

)

303

)

304

305

# Send video packets

306

for i in range(100):

307

packet = visionai_v1.Packet(

308

header=visionai_v1.PacketHeader(

309

capture_time={"seconds": int(time.time())},

310

gstreamer_buffer_descriptor=visionai_v1.GstreamerBufferDescriptor(

311

caps_string="video/x-raw,format=RGB,width=1920,height=1080",

312

is_key_frame=(i % 30 == 0), # Key frame every 30 frames

313

pts_time={"nanos": i * 33333333} # 30 FPS

314

)

315

),

316

payload=generate_video_frame() # Your video frame data

317

)

318

319

yield visionai_v1.SendPacketsRequest(packet=packet)

320

321

# Send packets and process responses

322

async for response in client.send_packets(request_iterator()):

323

print(f"Sent packet ID: {response.packet_id}")

324

325

async def receive_video_stream():

326

"""Example of receiving video packets from a stream."""

327

328

async with visionai_v1.StreamingServiceAsyncClient() as client:

329

def request_iterator() -> Iterator[visionai_v1.ReceivePacketsRequest]:

330

# Send setup request

331

yield visionai_v1.ReceivePacketsRequest(

332

setup_request=visionai_v1.ReceivePacketsRequestSetupRequest(

333

series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/output/series/video",

334

receiver="video-receiver-001",

335

heartbeat_interval={"seconds": 30}

336

)

337

)

338

339

# Receive packets

340

async for response in client.receive_packets(request_iterator()):

341

packet = response.packet

342

print(f"Received packet at offset: {packet.header.server_metadata.offset}")

343

process_video_frame(packet.payload) # Your processing logic

344

```

345

346

### Lease-Based Resource Management

347

348

```python

349

from google.cloud import visionai_v1

350

import time

351

352

def manage_stream_lease():

353

"""Example of lease management for exclusive stream access."""

354

355

client = visionai_v1.StreamingServiceClient()

356

357

# Acquire lease

358

session = "projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1"

359

owner = "video-processor-001"

360

lease_duration = {"seconds": 300} # 5 minute lease

361

362

lease = client.acquire_lease(

363

request=visionai_v1.AcquireLeaseRequest(

364

series=f"{session}/series/video",

365

owner=owner,

366

term=lease_duration,

367

lease_type=visionai_v1.LeaseType.WRITER

368

)

369

)

370

371

print(f"Acquired lease ID: {lease.id}")

372

print(f"Expires at: {lease.expire_time}")

373

374

try:

375

# Use the stream with exclusive access

376

perform_stream_operations()

377

378

# Renew lease if needed (before expiration)

379

time.sleep(240) # Wait 4 minutes

380

renewed_lease = client.renew_lease(

381

request=visionai_v1.RenewLeaseRequest(

382

id=lease.id,

383

series=f"{session}/series/video",

384

owner=owner,

385

term=lease_duration

386

)

387

)

388

389

print(f"Renewed lease, new expiration: {renewed_lease.expire_time}")

390

391

finally:

392

# Always release lease when done

393

client.release_lease(

394

request=visionai_v1.ReleaseLeaseRequest(

395

id=lease.id,

396

series=f"{session}/series/video",

397

owner=owner

398

)

399

)

400

print("Lease released")

401

402

def perform_stream_operations():

403

"""Placeholder for stream operations while holding lease."""

404

pass

405

406

def generate_video_frame() -> bytes:

407

"""Placeholder for video frame generation."""

408

return b"video_frame_data"

409

410

def process_video_frame(frame_data: bytes):

411

"""Placeholder for video frame processing."""

412

pass

413

```

414

415

### Event Stream Processing

416

417

```python

418

async def process_stream_events():

419

"""Example of receiving and processing stream events."""

420

421

async with visionai_v1.StreamingServiceAsyncClient() as client:

422

def request_iterator() -> Iterator[visionai_v1.ReceiveEventsRequest]:

423

yield visionai_v1.ReceiveEventsRequest(

424

setup_request=visionai_v1.ReceiveEventsRequestSetupRequest(

425

series="projects/my-project/locations/us-central1/clusters/my-cluster/streams/camera-1/series/events",

426

receiver="event-processor-001",

427

heartbeat_interval={"seconds": 60}

428

)

429

)

430

431

# Process incoming events

432

async for response in client.receive_events(request_iterator()):

433

event_update = response.event_update

434

435

print(f"Received event from stream: {event_update.stream}")

436

print(f"Event ID: {event_update.event}")

437

print(f"Update time: {event_update.update_time}")

438

print(f"Offset: {event_update.offset}")

439

440

# Process the event based on your application logic

441

await handle_stream_event(event_update)

442

443

async def handle_stream_event(event_update: visionai_v1.EventUpdate):

444

"""Handle individual stream events."""

445

# Your event processing logic here

446

pass

447

448

# Run the async examples

449

if __name__ == "__main__":

450

asyncio.run(send_video_stream())

451

asyncio.run(receive_video_stream())

452

asyncio.run(process_stream_events())

453

```