or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md

publisher.mddocs/

0

# Publisher Client

1

2

The PublisherClient provides high-level functionality for publishing messages to Google Cloud Pub/Sub topics. It handles automatic batching, flow control, message ordering, retry logic, and OpenTelemetry integration.

3

4

## Capabilities

5

6

### Client Initialization

7

8

Create and configure a PublisherClient with custom batching and flow control settings.

9

10

```python { .api }

11

class PublisherClient:

12

def __init__(

13

self,

14

batch_settings: Optional[BatchSettings] = None,

15

publisher_options: Optional[PublisherOptions] = None,

16

**kwargs

17

):

18

"""

19

Initialize the publisher client.

20

21

Parameters:

22

- batch_settings: Settings for message batching behavior

23

- publisher_options: Options for publisher client behavior

24

- **kwargs: Additional arguments passed to underlying GAPIC client

25

"""

26

27

@classmethod

28

def from_service_account_file(

29

cls,

30

filename: str,

31

**kwargs

32

) -> "PublisherClient":

33

"""

34

Create client from service account file.

35

36

Parameters:

37

- filename: Path to service account JSON file

38

- **kwargs: Additional arguments for client initialization

39

40

Returns:

41

PublisherClient instance

42

"""

43

```

44

45

### Message Publishing

46

47

Publish messages to topics with support for attributes, ordering keys, and futures for result handling.

48

49

```python { .api }

50

def publish(

51

self,

52

topic: str,

53

data: bytes,

54

ordering_key: str = "",

55

retry: OptionalRetry = DEFAULT,

56

timeout: OptionalTimeout = DEFAULT,

57

**attrs: Union[bytes, str]

58

) -> Future:

59

"""

60

Publish a message to a topic.

61

62

Parameters:

63

- topic: Full topic path (e.g., "projects/my-project/topics/my-topic")

64

- data: Message payload as bytes

65

- ordering_key: Optional key for message ordering (default: "")

66

- retry: Retry configuration for the publish operation

67

- timeout: Timeout configuration for the publish operation

68

- **attrs: Message attributes as keyword arguments (values can be bytes or str)

69

70

Returns:

71

Future that resolves to message ID string

72

"""

73

```

74

75

### Message Ordering

76

77

Resume publishing for an ordering key after an error has occurred.

78

79

```python { .api }

80

def resume_publish(self, topic: str, ordering_key: str) -> None:

81

"""

82

Resume publishing for ordering key after error.

83

84

Parameters:

85

- topic: Full topic path

86

- ordering_key: Ordering key to resume

87

"""

88

```

89

90

### Topic Management

91

92

Create, retrieve, update, and delete topics using the underlying GAPIC client methods.

93

94

```python { .api }

95

def create_topic(

96

self,

97

request: Optional[CreateTopicRequest] = None,

98

*,

99

name: Optional[str] = None,

100

**kwargs

101

) -> Topic:

102

"""

103

Create a new topic.

104

105

Parameters:

106

- request: The request object for creating a topic

107

- name: Topic name (e.g., "projects/my-project/topics/my-topic")

108

- **kwargs: Additional keyword arguments

109

110

Returns:

111

Created Topic object

112

"""

113

114

def get_topic(

115

self,

116

request: Optional[GetTopicRequest] = None,

117

*,

118

topic: Optional[str] = None,

119

**kwargs

120

) -> Topic:

121

"""

122

Get a topic.

123

124

Parameters:

125

- request: The request object for getting a topic

126

- topic: Topic name to retrieve

127

- **kwargs: Additional keyword arguments

128

129

Returns:

130

Topic object

131

"""

132

133

def list_topics(

134

self,

135

request: Optional[ListTopicsRequest] = None,

136

*,

137

project: Optional[str] = None,

138

**kwargs

139

) -> ListTopicsResponse:

140

"""

141

List topics in a project.

142

143

Parameters:

144

- request: The request object for listing topics

145

- project: Project path (e.g., "projects/my-project")

146

- **kwargs: Additional keyword arguments

147

148

Returns:

149

ListTopicsResponse with topics

150

"""

151

152

def list_topic_subscriptions(

153

self,

154

request: Optional[ListTopicSubscriptionsRequest] = None,

155

*,

156

topic: Optional[str] = None,

157

**kwargs

158

) -> ListTopicSubscriptionsResponse:

159

"""

160

List subscriptions attached to a topic.

161

162

Parameters:

163

- request: The request object for listing topic subscriptions

164

- topic: Topic name

165

- **kwargs: Additional keyword arguments

166

167

Returns:

168

ListTopicSubscriptionsResponse with subscription names

169

"""

170

171

def list_topic_snapshots(

172

self,

173

request: Optional[ListTopicSnapshotsRequest] = None,

174

*,

175

topic: Optional[str] = None,

176

**kwargs

177

) -> ListTopicSnapshotsResponse:

178

"""

179

List snapshots for a topic.

180

181

Parameters:

182

- request: The request object for listing topic snapshots

183

- topic: Topic name

184

- **kwargs: Additional keyword arguments

185

186

Returns:

187

ListTopicSnapshotsResponse with snapshot names

188

"""

189

190

def update_topic(

191

self,

192

request: Optional[UpdateTopicRequest] = None,

193

*,

194

topic: Optional[Topic] = None,

195

update_mask: Optional[FieldMask] = None,

196

**kwargs

197

) -> Topic:

198

"""

199

Update a topic.

200

201

Parameters:

202

- request: The request object for updating a topic

203

- topic: Updated topic configuration

204

- update_mask: Fields to update

205

- **kwargs: Additional keyword arguments

206

207

Returns:

208

Updated Topic object

209

"""

210

211

def delete_topic(

212

self,

213

request: Optional[DeleteTopicRequest] = None,

214

*,

215

topic: Optional[str] = None,

216

**kwargs

217

) -> None:

218

"""

219

Delete a topic.

220

221

Parameters:

222

- request: The request object for deleting a topic

223

- topic: Topic name to delete

224

- **kwargs: Additional keyword arguments

225

"""

226

```

227

228

### Path Helper Methods

229

230

Utility methods for constructing and parsing resource paths.

231

232

```python { .api }

233

@staticmethod

234

def topic_path(project: str, topic: str) -> str:

235

"""

236

Construct a topic path from project ID and topic name.

237

238

Parameters:

239

- project: Project ID

240

- topic: Topic name

241

242

Returns:

243

Full topic path string

244

"""

245

246

@staticmethod

247

def subscription_path(project: str, subscription: str) -> str:

248

"""

249

Construct a subscription path from project ID and subscription name.

250

251

Parameters:

252

- project: Project ID

253

- subscription: Subscription name

254

255

Returns:

256

Full subscription path string

257

"""

258

259

@staticmethod

260

def schema_path(project: str, schema: str) -> str:

261

"""

262

Construct a schema path from project ID and schema name.

263

264

Parameters:

265

- project: Project ID

266

- schema: Schema name

267

268

Returns:

269

Full schema path string

270

"""

271

272

@staticmethod

273

def parse_topic_path(path: str) -> Dict[str, str]:

274

"""

275

Parse a topic path into its components.

276

277

Parameters:

278

- path: Topic path string

279

280

Returns:

281

Dictionary with 'project' and 'topic' keys

282

"""

283

284

@staticmethod

285

def parse_subscription_path(path: str) -> Dict[str, str]:

286

"""

287

Parse a subscription path into its components.

288

289

Parameters:

290

- path: Subscription path string

291

292

Returns:

293

Dictionary with 'project' and 'subscription' keys

294

"""

295

296

@staticmethod

297

def parse_schema_path(path: str) -> Dict[str, str]:

298

"""

299

Parse a schema path into its components.

300

301

Parameters:

302

- path: Schema path string

303

304

Returns:

305

Dictionary with 'project' and 'schema' keys

306

"""

307

```

308

309

### Client Management

310

311

Control client lifecycle and access underlying components.

312

313

```python { .api }

314

def stop(self) -> None:

315

"""

316

Stop the publisher client and wait for all batches to complete.

317

"""

318

319

@property

320

def target(self) -> str:

321

"""

322

Get the target endpoint for the client.

323

324

Returns:

325

Target endpoint URL

326

"""

327

328

@property

329

def api(self):

330

"""

331

Get the underlying GAPIC publisher client.

332

333

Returns:

334

GAPIC PublisherClient instance

335

"""

336

337

@property

338

def open_telemetry_enabled(self) -> bool:

339

"""

340

Check if OpenTelemetry tracing is enabled.

341

342

Returns:

343

True if OpenTelemetry is enabled

344

"""

345

```

346

347

### Future Objects

348

349

Publisher operations return Future objects for asynchronous result handling.

350

351

```python { .api }

352

class Future:

353

def result(self, timeout: Optional[float] = None) -> str:

354

"""

355

Get the message ID or raise an exception.

356

357

Parameters:

358

- timeout: Number of seconds to wait before timeout

359

360

Returns:

361

Message ID string

362

363

Raises:

364

TimeoutError: If operation times out

365

Exception: For other errors in publish operation

366

"""

367

368

def add_done_callback(

369

self,

370

callback: Callable[["Future"], None]

371

) -> None:

372

"""

373

Add callback to be called when future completes.

374

375

Parameters:

376

- callback: Function to call with future as argument

377

"""

378

379

def cancel(self) -> bool:

380

"""

381

Attempt to cancel the operation.

382

383

Returns:

384

Always False (Pub/Sub operations cannot be cancelled)

385

"""

386

387

def cancelled(self) -> bool:

388

"""

389

Check if operation was cancelled.

390

391

Returns:

392

Always False (Pub/Sub operations cannot be cancelled)

393

"""

394

```

395

396

## Usage Examples

397

398

### Basic Publishing

399

400

```python

401

from google.cloud import pubsub_v1

402

403

# Create publisher client

404

publisher = pubsub_v1.PublisherClient()

405

406

# Publish a message

407

topic_path = publisher.topic_path("my-project", "my-topic")

408

message_data = b"Hello, World!"

409

future = publisher.publish(topic_path, message_data)

410

411

# Get message ID

412

try:

413

message_id = future.result(timeout=30)

414

print(f"Published message with ID: {message_id}")

415

except Exception as e:

416

print(f"Failed to publish: {e}")

417

```

418

419

### Publishing with Attributes

420

421

```python

422

# Publish message with attributes

423

future = publisher.publish(

424

topic_path,

425

b"Message with metadata",

426

event_type="user_action",

427

user_id="12345",

428

timestamp="2024-01-01T00:00:00Z"

429

)

430

```

431

432

### Message Ordering

433

434

```python

435

from google.cloud.pubsub_v1 import types

436

437

# Configure for message ordering

438

publisher_options = types.PublisherOptions(

439

enable_message_ordering=True

440

)

441

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

442

443

# Publish ordered messages

444

for i in range(5):

445

future = publisher.publish(

446

topic_path,

447

f"Message {i}".encode(),

448

ordering_key="user-123"

449

)

450

```

451

452

### Custom Batching

453

454

```python

455

# Configure custom batching

456

batch_settings = types.BatchSettings(

457

max_bytes=500000, # 500KB

458

max_latency=0.05, # 50ms

459

max_messages=50

460

)

461

462

publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)

463

```

464

465

### Callback Handling

466

467

```python

468

def publish_callback(future):

469

try:

470

message_id = future.result()

471

print(f"Published: {message_id}")

472

except Exception as e:

473

print(f"Publish failed: {e}")

474

475

# Publish with callback

476

future = publisher.publish(topic_path, b"Async message")

477

future.add_done_callback(publish_callback)

478

```