or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md

subscriber.mddocs/

0

# Subscriber Client

1

2

The SubscriberClient provides high-level functionality for subscribing to Google Cloud Pub/Sub subscriptions. It handles automatic message acknowledgment, flow control, scheduling, and OpenTelemetry integration.

3

4

## Capabilities

5

6

### Client Initialization

7

8

Create and configure a SubscriberClient with custom flow control and subscriber options.

9

10

```python { .api }

11

class SubscriberClient:

12

def __init__(

13

self,

14

flow_control: Optional[FlowControl] = None,

15

subscriber_options: Optional[SubscriberOptions] = None,

16

**kwargs

17

):

18

"""

19

Initialize the subscriber client.

20

21

Parameters:

22

- flow_control: Settings for message flow control

23

- subscriber_options: Options for subscriber client behavior

24

- **kwargs: Additional arguments passed to underlying GAPIC client

25

"""

26

27

@classmethod

28

def from_service_account_file(

29

cls,

30

filename: str,

31

**kwargs

32

) -> "SubscriberClient":

33

"""

34

Create client from service account file.

35

36

Parameters:

37

- filename: Path to service account JSON file

38

- **kwargs: Additional arguments for client initialization

39

40

Returns:

41

SubscriberClient instance

42

"""

43

```

44

45

### Message Subscription

46

47

Subscribe to messages from subscriptions with callback-based processing.

48

49

```python { .api }

50

def subscribe(

51

self,

52

subscription: str,

53

callback: Callable[[Message], Any],

54

flow_control: Union[FlowControl, Sequence] = (),

55

scheduler: Optional[ThreadScheduler] = None,

56

use_legacy_flow_control: bool = False,

57

await_callbacks_on_shutdown: bool = False

58

) -> StreamingPullFuture:

59

"""

60

Subscribe to messages from a subscription.

61

62

Parameters:

63

- subscription: Full subscription path (e.g., "projects/my-project/subscriptions/my-sub")

64

- callback: Function to process received messages

65

- flow_control: Flow control settings or legacy sequence format

66

- scheduler: Custom scheduler for message processing

67

- use_legacy_flow_control: Whether to use legacy flow control behavior

68

- await_callbacks_on_shutdown: Whether to wait for callbacks on shutdown

69

70

Returns:

71

StreamingPullFuture that can be used to control the subscription

72

"""

73

```

74

75

### Subscription Management

76

77

Create, retrieve, update, and delete subscriptions using the underlying GAPIC client methods.

78

79

```python { .api }

80

def create_subscription(

81

self,

82

request: Optional[CreateSubscriptionRequest] = None,

83

*,

84

name: Optional[str] = None,

85

topic: Optional[str] = None,

86

**kwargs

87

) -> Subscription:

88

"""

89

Create a new subscription.

90

91

Parameters:

92

- request: The request object for creating a subscription

93

- name: Subscription name (e.g., "projects/my-project/subscriptions/my-sub")

94

- topic: Topic name to subscribe to

95

- **kwargs: Additional keyword arguments

96

97

Returns:

98

Created Subscription object

99

"""

100

101

def get_subscription(

102

self,

103

request: Optional[GetSubscriptionRequest] = None,

104

*,

105

subscription: Optional[str] = None,

106

**kwargs

107

) -> Subscription:

108

"""

109

Get a subscription.

110

111

Parameters:

112

- request: The request object for getting a subscription

113

- subscription: Subscription name to retrieve

114

- **kwargs: Additional keyword arguments

115

116

Returns:

117

Subscription object

118

"""

119

120

def update_subscription(

121

self,

122

request: Optional[UpdateSubscriptionRequest] = None,

123

*,

124

subscription: Optional[Subscription] = None,

125

update_mask: Optional[FieldMask] = None,

126

**kwargs

127

) -> Subscription:

128

"""

129

Update a subscription.

130

131

Parameters:

132

- request: The request object for updating a subscription

133

- subscription: Updated subscription configuration

134

- update_mask: Fields to update

135

- **kwargs: Additional keyword arguments

136

137

Returns:

138

Updated Subscription object

139

"""

140

141

def list_subscriptions(

142

self,

143

request: Optional[ListSubscriptionsRequest] = None,

144

*,

145

project: Optional[str] = None,

146

**kwargs

147

) -> ListSubscriptionsResponse:

148

"""

149

List subscriptions in a project.

150

151

Parameters:

152

- request: The request object for listing subscriptions

153

- project: Project path (e.g., "projects/my-project")

154

- **kwargs: Additional keyword arguments

155

156

Returns:

157

ListSubscriptionsResponse with subscriptions

158

"""

159

160

def delete_subscription(

161

self,

162

request: Optional[DeleteSubscriptionRequest] = None,

163

*,

164

subscription: Optional[str] = None,

165

**kwargs

166

) -> None:

167

"""

168

Delete a subscription.

169

170

Parameters:

171

- request: The request object for deleting a subscription

172

- subscription: Subscription name to delete

173

- **kwargs: Additional keyword arguments

174

"""

175

```

176

177

### Message Operations

178

179

Low-level message operations for acknowledgment and deadline modification.

180

181

```python { .api }

182

def acknowledge(

183

self,

184

request: Optional[AcknowledgeRequest] = None,

185

*,

186

subscription: Optional[str] = None,

187

ack_ids: Optional[Sequence[str]] = None,

188

**kwargs

189

) -> None:

190

"""

191

Acknowledge messages by their acknowledgment IDs.

192

193

Parameters:

194

- request: The request object for acknowledging messages

195

- subscription: Subscription name

196

- ack_ids: List of acknowledgment IDs to acknowledge

197

- **kwargs: Additional keyword arguments

198

"""

199

200

def modify_ack_deadline(

201

self,

202

request: Optional[ModifyAckDeadlineRequest] = None,

203

*,

204

subscription: Optional[str] = None,

205

ack_ids: Optional[Sequence[str]] = None,

206

ack_deadline_seconds: Optional[int] = None,

207

**kwargs

208

) -> None:

209

"""

210

Modify acknowledgment deadline for messages.

211

212

Parameters:

213

- request: The request object for modifying acknowledgment deadlines

214

- subscription: Subscription name

215

- ack_ids: List of acknowledgment IDs to modify

216

- ack_deadline_seconds: New deadline in seconds

217

- **kwargs: Additional keyword arguments

218

"""

219

220

def pull(

221

self,

222

request: Optional[PullRequest] = None,

223

*,

224

subscription: Optional[str] = None,

225

max_messages: Optional[int] = None,

226

**kwargs

227

) -> PullResponse:

228

"""

229

Pull messages from a subscription synchronously.

230

231

Parameters:

232

- request: The request object for pulling messages

233

- subscription: Subscription name to pull from

234

- max_messages: Maximum number of messages to return

235

- **kwargs: Additional keyword arguments

236

237

Returns:

238

PullResponse with received messages

239

"""

240

241

def streaming_pull(

242

self,

243

requests: Iterator[StreamingPullRequest],

244

**kwargs

245

) -> Iterator[StreamingPullResponse]:

246

"""

247

Establish a streaming pull connection.

248

249

Parameters:

250

- requests: Iterator of streaming pull requests

251

- **kwargs: Additional keyword arguments

252

253

Returns:

254

Iterator of streaming pull responses

255

"""

256

```

257

258

### Snapshot Operations

259

260

Create and manage snapshots for seeking to specific points in time.

261

262

```python { .api }

263

def create_snapshot(

264

self,

265

request: Optional[CreateSnapshotRequest] = None,

266

*,

267

name: Optional[str] = None,

268

subscription: Optional[str] = None,

269

**kwargs

270

) -> Snapshot:

271

"""

272

Create a snapshot of a subscription.

273

274

Parameters:

275

- request: The request object for creating a snapshot

276

- name: Snapshot name (e.g., "projects/my-project/snapshots/my-snapshot")

277

- subscription: Subscription to create snapshot from

278

- **kwargs: Additional keyword arguments

279

280

Returns:

281

Created Snapshot object

282

"""

283

284

def get_snapshot(

285

self,

286

request: Optional[GetSnapshotRequest] = None,

287

*,

288

snapshot: Optional[str] = None,

289

**kwargs

290

) -> Snapshot:

291

"""

292

Get a snapshot.

293

294

Parameters:

295

- request: The request object for getting a snapshot

296

- snapshot: Snapshot name to retrieve

297

- **kwargs: Additional keyword arguments

298

299

Returns:

300

Snapshot object

301

"""

302

303

def list_snapshots(

304

self,

305

request: Optional[ListSnapshotsRequest] = None,

306

*,

307

project: Optional[str] = None,

308

**kwargs

309

) -> ListSnapshotsResponse:

310

"""

311

List snapshots in a project.

312

313

Parameters:

314

- request: The request object for listing snapshots

315

- project: Project path (e.g., "projects/my-project")

316

- **kwargs: Additional keyword arguments

317

318

Returns:

319

ListSnapshotsResponse with snapshots

320

"""

321

322

def update_snapshot(

323

self,

324

request: Optional[UpdateSnapshotRequest] = None,

325

*,

326

snapshot: Optional[Snapshot] = None,

327

update_mask: Optional[FieldMask] = None,

328

**kwargs

329

) -> Snapshot:

330

"""

331

Update a snapshot.

332

333

Parameters:

334

- request: The request object for updating a snapshot

335

- snapshot: Updated snapshot configuration

336

- update_mask: Fields to update

337

- **kwargs: Additional keyword arguments

338

339

Returns:

340

Updated Snapshot object

341

"""

342

343

def delete_snapshot(

344

self,

345

request: Optional[DeleteSnapshotRequest] = None,

346

*,

347

snapshot: Optional[str] = None,

348

**kwargs

349

) -> None:

350

"""

351

Delete a snapshot.

352

353

Parameters:

354

- request: The request object for deleting a snapshot

355

- snapshot: Snapshot name to delete

356

- **kwargs: Additional keyword arguments

357

"""

358

359

def seek(

360

self,

361

request: Optional[SeekRequest] = None,

362

*,

363

subscription: Optional[str] = None,

364

**kwargs

365

) -> SeekResponse:

366

"""

367

Seek a subscription to a specific snapshot or time.

368

369

Parameters:

370

- request: The request object for seeking

371

- subscription: Subscription name to seek

372

- **kwargs: Additional keyword arguments

373

374

Returns:

375

SeekResponse indicating seek result

376

"""

377

```

378

379

### Path Helper Methods

380

381

Utility methods for constructing and parsing resource paths.

382

383

```python { .api }

384

@staticmethod

385

def subscription_path(project: str, subscription: str) -> str:

386

"""

387

Construct a subscription path from project ID and subscription name.

388

389

Parameters:

390

- project: Project ID

391

- subscription: Subscription name

392

393

Returns:

394

Full subscription path string

395

"""

396

397

@staticmethod

398

def snapshot_path(project: str, snapshot: str) -> str:

399

"""

400

Construct a snapshot path from project ID and snapshot name.

401

402

Parameters:

403

- project: Project ID

404

- snapshot: Snapshot name

405

406

Returns:

407

Full snapshot path string

408

"""

409

410

@staticmethod

411

def topic_path(project: str, topic: str) -> str:

412

"""

413

Construct a topic path from project ID and topic name.

414

415

Parameters:

416

- project: Project ID

417

- topic: Topic name

418

419

Returns:

420

Full topic path string

421

"""

422

423

@staticmethod

424

def parse_subscription_path(path: str) -> Dict[str, str]:

425

"""

426

Parse a subscription path into its components.

427

428

Parameters:

429

- path: Subscription path string

430

431

Returns:

432

Dictionary with 'project' and 'subscription' keys

433

"""

434

435

@staticmethod

436

def parse_snapshot_path(path: str) -> Dict[str, str]:

437

"""

438

Parse a snapshot path into its components.

439

440

Parameters:

441

- path: Snapshot path string

442

443

Returns:

444

Dictionary with 'project' and 'snapshot' keys

445

"""

446

447

@staticmethod

448

def parse_topic_path(path: str) -> Dict[str, str]:

449

"""

450

Parse a topic path into its components.

451

452

Parameters:

453

- path: Topic path string

454

455

Returns:

456

Dictionary with 'project' and 'topic' keys

457

"""

458

```

459

460

### Client Management

461

462

Control client lifecycle and access underlying components.

463

464

```python { .api }

465

def close(self) -> None:

466

"""

467

Close the subscriber client and stop all subscriptions.

468

"""

469

470

@property

471

def target(self) -> str:

472

"""

473

Get the target endpoint for the client.

474

475

Returns:

476

Target endpoint URL

477

"""

478

479

@property

480

def api(self):

481

"""

482

Get the underlying GAPIC subscriber client.

483

484

Returns:

485

GAPIC SubscriberClient instance

486

"""

487

488

@property

489

def closed(self) -> bool:

490

"""

491

Check if the client is closed.

492

493

Returns:

494

True if client is closed

495

"""

496

497

@property

498

def open_telemetry_enabled(self) -> bool:

499

"""

500

Check if OpenTelemetry tracing is enabled.

501

502

Returns:

503

True if OpenTelemetry is enabled

504

"""

505

```

506

507

### Context Manager Support

508

509

Use SubscriberClient as a context manager for automatic cleanup.

510

511

```python { .api }

512

def __enter__(self) -> "SubscriberClient":

513

"""

514

Enter context manager.

515

516

Returns:

517

Self

518

"""

519

520

def __exit__(self, exc_type, exc_val, exc_tb) -> None:

521

"""

522

Exit context manager and close client.

523

"""

524

```

525

526

### Streaming Pull Future

527

528

Control and monitor streaming pull operations.

529

530

```python { .api }

531

class StreamingPullFuture:

532

def cancel(self) -> bool:

533

"""

534

Cancel the streaming pull operation.

535

536

Returns:

537

True if cancellation was successful

538

"""

539

540

def cancelled(self) -> bool:

541

"""

542

Check if the operation was cancelled.

543

544

Returns:

545

True if operation is cancelled

546

"""

547

548

def running(self) -> bool:

549

"""

550

Check if the operation is currently running.

551

552

Returns:

553

True if operation is running

554

"""

555

556

def result(self, timeout: Optional[float] = None) -> None:

557

"""

558

Wait for the streaming pull to complete.

559

560

Parameters:

561

- timeout: Maximum time to wait in seconds

562

563

Raises:

564

TimeoutError: If timeout is reached

565

"""

566

```

567

568

## Usage Examples

569

570

### Basic Subscription

571

572

```python

573

from google.cloud import pubsub_v1

574

575

# Create subscriber client

576

subscriber = pubsub_v1.SubscriberClient()

577

578

def callback(message):

579

print(f"Received: {message.data.decode('utf-8')}")

580

print(f"Attributes: {message.attributes}")

581

message.ack()

582

583

# Subscribe to messages

584

subscription_path = subscriber.subscription_path("my-project", "my-subscription")

585

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

586

587

print(f"Listening for messages on {subscription_path}...")

588

589

# Keep the main thread running

590

try:

591

streaming_pull_future.result()

592

except KeyboardInterrupt:

593

streaming_pull_future.cancel()

594

subscriber.close()

595

```

596

597

### Custom Flow Control

598

599

```python

600

from google.cloud.pubsub_v1 import types

601

602

# Configure flow control

603

flow_control = types.FlowControl(

604

max_messages=100, # Process up to 100 messages concurrently

605

max_bytes=10 * 1024 * 1024, # 10MB max outstanding bytes

606

max_lease_duration=600 # 10 minute max lease duration

607

)

608

609

subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)

610

```

611

612

### Message Processing with Error Handling

613

614

```python

615

def callback(message):

616

try:

617

# Process the message

618

data = message.data.decode('utf-8')

619

print(f"Processing: {data}")

620

621

# Simulate processing

622

process_message(data)

623

624

# Acknowledge successful processing

625

message.ack()

626

627

except Exception as e:

628

print(f"Error processing message: {e}")

629

# Negative acknowledge to retry later

630

message.nack()

631

632

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

633

```

634

635

### Context Manager Usage

636

637

```python

638

from google.cloud import pubsub_v1

639

640

def callback(message):

641

print(f"Received: {message.data.decode('utf-8')}")

642

message.ack()

643

644

# Use subscriber as context manager

645

with pubsub_v1.SubscriberClient() as subscriber:

646

subscription_path = subscriber.subscription_path("my-project", "my-subscription")

647

648

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

649

650

try:

651

# Listen for messages for 30 seconds

652

streaming_pull_future.result(timeout=30)

653

except Exception:

654

streaming_pull_future.cancel()

655

# Client automatically closed when exiting context

656

```

657

658

### Multiple Subscriptions

659

660

```python

661

import concurrent.futures

662

from google.cloud import pubsub_v1

663

664

subscriber = pubsub_v1.SubscriberClient()

665

666

def callback_a(message):

667

print(f"Subscription A: {message.data.decode('utf-8')}")

668

message.ack()

669

670

def callback_b(message):

671

print(f"Subscription B: {message.data.decode('utf-8')}")

672

message.ack()

673

674

# Subscribe to multiple subscriptions

675

sub_a_path = subscriber.subscription_path("my-project", "subscription-a")

676

sub_b_path = subscriber.subscription_path("my-project", "subscription-b")

677

678

future_a = subscriber.subscribe(sub_a_path, callback=callback_a)

679

future_b = subscriber.subscribe(sub_b_path, callback=callback_b)

680

681

# Wait for any subscription to complete or fail

682

futures = [future_a, future_b]

683

try:

684

concurrent.futures.as_completed(futures, timeout=300)

685

except KeyboardInterrupt:

686

for future in futures:

687

future.cancel()

688

finally:

689

subscriber.close()

690

```

691

692

### Message Deadline Modification

693

694

```python

695

def callback(message):

696

print(f"Processing: {message.data.decode('utf-8')}")

697

698

# Extend deadline if processing takes longer

699

message.modify_ack_deadline(60) # Extend by 60 seconds

700

701

try:

702

# Long processing operation

703

long_running_task(message.data)

704

message.ack()

705

except Exception as e:

706

print(f"Processing failed: {e}")

707

message.nack()

708

```