or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

datastore.mdindex.mdpubsub.mdstorage.md

pubsub.mddocs/

0

# Google Cloud Pub/Sub

1

2

Google Cloud Pub/Sub is a messaging service for asynchronous communication between applications. It provides topic-based publish/subscribe messaging with support for both push and pull subscriptions, automatic scaling, and at-least-once message delivery.

3

4

## Capabilities

5

6

### Client Operations

7

8

High-level client for managing Pub/Sub topics and subscriptions.

9

10

```python { .api }

11

class Client:

12

def __init__(self, project=None, credentials=None, http=None):

13

"""

14

Initialize Pub/Sub client.

15

16

Parameters:

17

- project (str): Google Cloud project ID

18

- credentials: OAuth2 credentials object

19

- http: Optional HTTP client

20

"""

21

22

def list_topics(self, page_size=None, page_token=None):

23

"""

24

List topics in project.

25

26

Parameters:

27

- page_size (int): Maximum results per page

28

- page_token (str): Pagination token

29

30

Returns:

31

Iterator: Topic listing iterator

32

"""

33

34

def list_subscriptions(self, page_size=None, page_token=None, topic_name=None):

35

"""

36

List subscriptions in project.

37

38

Parameters:

39

- page_size (int): Maximum results per page

40

- page_token (str): Pagination token

41

- topic_name (str): Optional topic filter

42

43

Returns:

44

Iterator: Subscription listing iterator

45

"""

46

47

def topic(self, name, timestamp_messages=False):

48

"""

49

Create Topic instance.

50

51

Parameters:

52

- name (str): Topic name

53

- timestamp_messages (bool): Whether to add timestamps to messages

54

55

Returns:

56

Topic: Topic instance

57

"""

58

```

59

60

### Topic Operations

61

62

Topic management for message publishing with support for batch operations and message attributes.

63

64

```python { .api }

65

class Topic:

66

def __init__(self, name, client, timestamp_messages=False):

67

"""

68

Initialize topic.

69

70

Parameters:

71

- name (str): Topic name

72

- client (Client): Pub/Sub client

73

- timestamp_messages (bool): Add timestamps to messages

74

"""

75

76

def subscription(self, name, ack_deadline=None, push_endpoint=None):

77

"""

78

Create subscription instance for this topic.

79

80

Parameters:

81

- name (str): Subscription name

82

- ack_deadline (int): Message acknowledgment deadline in seconds

83

- push_endpoint (str): URL for push subscriptions

84

85

Returns:

86

Subscription: Subscription instance

87

"""

88

89

@classmethod

90

def from_api_repr(cls, resource, client):

91

"""

92

Create topic from API response.

93

94

Parameters:

95

- resource (dict): API response data

96

- client (Client): Pub/Sub client

97

98

Returns:

99

Topic: Topic instance

100

"""

101

102

def create(self, client=None):

103

"""

104

Create topic.

105

106

Parameters:

107

- client (Client): Optional client override

108

"""

109

110

def exists(self, client=None):

111

"""

112

Check if topic exists.

113

114

Parameters:

115

- client (Client): Optional client override

116

117

Returns:

118

bool: True if topic exists

119

"""

120

121

def publish(self, message, client=None, **attrs):

122

"""

123

Publish single message to topic.

124

125

Parameters:

126

- message (str): Message data

127

- client (Client): Optional client override

128

- **attrs: Message attributes as keyword arguments

129

130

Returns:

131

str: Published message ID

132

"""

133

134

def batch(self, client=None):

135

"""

136

Create batch publisher for efficient message batching.

137

138

Parameters:

139

- client (Client): Optional client override

140

141

Returns:

142

Batch: Batch publisher instance

143

"""

144

145

def delete(self, client=None):

146

"""

147

Delete topic.

148

149

Parameters:

150

- client (Client): Optional client override

151

"""

152

153

@property

154

def name(self):

155

"""

156

Topic name.

157

158

Returns:

159

str: Topic name

160

"""

161

162

@property

163

def project(self):

164

"""

165

Project ID.

166

167

Returns:

168

str: Project ID

169

"""

170

171

@property

172

def full_name(self):

173

"""

174

Fully qualified topic name.

175

176

Returns:

177

str: Full topic name

178

"""

179

180

@property

181

def path(self):

182

"""

183

API path for topic.

184

185

Returns:

186

str: Topic API path

187

"""

188

189

@property

190

def timestamp_messages(self):

191

"""

192

Whether messages are automatically timestamped.

193

194

Returns:

195

bool: Timestamp setting

196

"""

197

```

198

199

### Topic Batch Operations

200

201

Batch publisher for efficient message publishing with automatic batching and context manager support.

202

203

```python { .api }

204

class Batch:

205

def __init__(self, topic, client):

206

"""

207

Initialize batch publisher.

208

209

Parameters:

210

- topic (Topic): Associated topic

211

- client (Client): Pub/Sub client

212

"""

213

214

def publish(self, message, **attrs):

215

"""

216

Add message to batch.

217

218

Parameters:

219

- message (str): Message data

220

- **attrs: Message attributes as keyword arguments

221

"""

222

223

def commit(self, client=None):

224

"""

225

Send all batched messages.

226

227

Parameters:

228

- client (Client): Optional client override

229

230

Returns:

231

list[str]: List of published message IDs

232

"""

233

234

def __enter__(self):

235

"""

236

Enter context manager.

237

238

Returns:

239

Batch: Self

240

"""

241

242

def __exit__(self, exc_type, exc_val, exc_tb):

243

"""

244

Exit context manager and commit batch.

245

"""

246

247

def __iter__(self):

248

"""

249

Iterate over published message IDs (after commit).

250

251

Returns:

252

Iterator[str]: Message ID iterator

253

"""

254

```

255

256

### Subscription Operations

257

258

Subscription management for message consumption with support for both pull and push delivery models.

259

260

```python { .api }

261

class Subscription:

262

def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):

263

"""

264

Initialize subscription.

265

266

Parameters:

267

- name (str): Subscription name

268

- topic (Topic): Associated topic

269

- ack_deadline (int): Acknowledgment deadline in seconds

270

- push_endpoint (str): URL for push subscriptions

271

"""

272

273

@classmethod

274

def from_api_repr(cls, resource, client, topics=None):

275

"""

276

Create subscription from API response.

277

278

Parameters:

279

- resource (dict): API response data

280

- client (Client): Pub/Sub client

281

- topics (dict): Optional topic lookup cache

282

283

Returns:

284

Subscription: Subscription instance

285

"""

286

287

def create(self, client=None):

288

"""

289

Create subscription.

290

291

Parameters:

292

- client (Client): Optional client override

293

"""

294

295

def exists(self, client=None):

296

"""

297

Check if subscription exists.

298

299

Parameters:

300

- client (Client): Optional client override

301

302

Returns:

303

bool: True if subscription exists

304

"""

305

306

def reload(self, client=None):

307

"""

308

Reload subscription metadata from API.

309

310

Parameters:

311

- client (Client): Optional client override

312

"""

313

314

def delete(self, client=None):

315

"""

316

Delete subscription.

317

318

Parameters:

319

- client (Client): Optional client override

320

"""

321

322

def modify_push_configuration(self, push_endpoint, client=None):

323

"""

324

Modify push endpoint configuration.

325

326

Parameters:

327

- push_endpoint (str): New push endpoint URL (None for pull)

328

- client (Client): Optional client override

329

"""

330

331

def pull(self, return_immediately=False, max_messages=1, client=None):

332

"""

333

Pull messages from subscription.

334

335

Parameters:

336

- return_immediately (bool): Return immediately if no messages

337

- max_messages (int): Maximum messages to return

338

- client (Client): Optional client override

339

340

Returns:

341

list[Message]: List of received messages

342

"""

343

344

def acknowledge(self, ack_ids, client=None):

345

"""

346

Acknowledge received messages.

347

348

Parameters:

349

- ack_ids (list[str]): List of acknowledgment IDs

350

- client (Client): Optional client override

351

"""

352

353

def modify_ack_deadline(self, ack_ids, ack_deadline, client=None):

354

"""

355

Modify acknowledgment deadline for messages.

356

357

Parameters:

358

- ack_ids (list[str]): List of acknowledgment IDs

359

- ack_deadline (int): New deadline in seconds

360

- client (Client): Optional client override

361

"""

362

363

@property

364

def name(self):

365

"""

366

Subscription name.

367

368

Returns:

369

str: Subscription name

370

"""

371

372

@property

373

def topic(self):

374

"""

375

Associated topic.

376

377

Returns:

378

Topic: Associated topic

379

"""

380

381

@property

382

def ack_deadline(self):

383

"""

384

Acknowledgment deadline in seconds.

385

386

Returns:

387

int: Deadline in seconds

388

"""

389

390

@property

391

def push_endpoint(self):

392

"""

393

Push endpoint URL for push subscriptions.

394

395

Returns:

396

str or None: Push endpoint URL

397

"""

398

399

@property

400

def path(self):

401

"""

402

API path for subscription.

403

404

Returns:

405

str: Subscription API path

406

"""

407

```

408

409

### Message Handling

410

411

Message objects representing individual Pub/Sub messages with data and attributes.

412

413

```python { .api }

414

class Message:

415

def __init__(self, data, message_id, attributes=None):

416

"""

417

Initialize message.

418

419

Parameters:

420

- data (bytes): Message data

421

- message_id (str): Message ID assigned by API

422

- attributes (dict): Optional message attributes

423

"""

424

425

@property

426

def data(self):

427

"""

428

Message data.

429

430

Returns:

431

str: Message data

432

"""

433

434

@property

435

def message_id(self):

436

"""

437

Message ID assigned by the API.

438

439

Returns:

440

str: Message ID

441

"""

442

443

@property

444

def attributes(self):

445

"""

446

Message attributes dictionary.

447

448

Returns:

449

dict: Message attributes

450

"""

451

452

@property

453

def timestamp(self):

454

"""

455

Message timestamp from attributes (if present).

456

457

Returns:

458

datetime: Timestamp in UTC timezone

459

460

Raises:

461

ValueError: If timestamp not in attributes or invalid format

462

"""

463

464

@classmethod

465

def from_api_repr(cls, api_repr):

466

"""

467

Create message from API representation.

468

469

Parameters:

470

- api_repr (dict): API response data

471

472

Returns:

473

Message: Message instance

474

"""

475

```

476

477

## Usage Examples

478

479

### Basic Topic and Subscription Operations

480

481

```python

482

from gcloud import pubsub

483

484

# Initialize client

485

client = pubsub.Client(project='my-project')

486

487

# Create topic

488

topic = client.topic('my-topic')

489

topic.create()

490

491

# Create subscription

492

subscription = topic.subscription('my-subscription', ack_deadline=60)

493

subscription.create()

494

495

# Publish message

496

message_id = topic.publish('Hello, Pub/Sub!', priority='high', source='app1')

497

print(f"Published message: {message_id}")

498

```

499

500

### Message Publishing

501

502

```python

503

# Publish single message with attributes

504

topic.publish('Order processed', order_id='12345', status='completed')

505

506

# Batch publishing for efficiency

507

with topic.batch() as batch:

508

for i in range(10):

509

batch.publish(f'Message {i}', sequence=str(i))

510

511

# Get message IDs after batch commit

512

message_ids = list(batch)

513

print(f"Published {len(message_ids)} messages")

514

```

515

516

### Message Consumption

517

518

```python

519

# Pull messages from subscription

520

messages = subscription.pull(max_messages=5, return_immediately=True)

521

522

# Process messages

523

ack_ids = []

524

for message in messages:

525

print(f"Received: {message.data}")

526

print(f"Attributes: {message.attributes}")

527

528

# Process message here

529

process_message(message.data)

530

531

# Collect acknowledgment ID

532

ack_ids.append(message.ack_id)

533

534

# Acknowledge processed messages

535

if ack_ids:

536

subscription.acknowledge(ack_ids)

537

```

538

539

### Subscription Management

540

541

```python

542

# Create pull subscription

543

pull_subscription = topic.subscription('pull-sub', ack_deadline=30)

544

pull_subscription.create()

545

546

# Create push subscription

547

push_subscription = topic.subscription(

548

'push-sub',

549

push_endpoint='https://myapp.com/webhook'

550

)

551

push_subscription.create()

552

553

# Modify push configuration

554

subscription.modify_push_configuration('https://newapp.com/webhook')

555

556

# Convert push to pull

557

subscription.modify_push_configuration(None)

558

```

559

560

### Advanced Message Handling

561

562

```python

563

# Pull with immediate return

564

messages = subscription.pull(return_immediately=True, max_messages=10)

565

566

if not messages:

567

print("No messages available")

568

else:

569

# Extend acknowledgment deadline for processing time

570

ack_ids = [msg.ack_id for msg in messages]

571

subscription.modify_ack_deadline(ack_ids, 120) # 2 minutes

572

573

# Process messages

574

for message in messages:

575

try:

576

process_complex_message(message.data)

577

subscription.acknowledge([message.ack_id])

578

except Exception as e:

579

print(f"Processing failed: {e}")

580

# Message will be redelivered after ack deadline

581

```