or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md

configuration.mddocs/

0

# Configuration

1

2

Google Cloud Pub/Sub provides comprehensive configuration options for controlling client behavior, batching, flow control, retry logic, and timeouts. These configuration types enable fine-tuning for different use cases and performance requirements.

3

4

## Capabilities

5

6

### Batch Settings

7

8

Configure message batching behavior for the publisher client.

9

10

```python { .api }

11

class BatchSettings(NamedTuple):

12

"""

13

Settings for batch publishing messages.

14

15

Attributes:

16

- max_bytes: Maximum total size of messages in a batch (default: 1MB)

17

- max_latency: Maximum time to wait before publishing batch (default: 0.01s)

18

- max_messages: Maximum number of messages in a batch (default: 100)

19

"""

20

21

max_bytes: int = 1000000

22

"""

23

Maximum total size of messages to collect before automatically

24

publishing the batch, including byte size overhead of the publish

25

request. Maximum server-side limit is 10,000,000 bytes.

26

"""

27

28

max_latency: float = 0.01

29

"""

30

Maximum number of seconds to wait for additional messages before

31

automatically publishing the batch.

32

"""

33

34

max_messages: int = 100

35

"""

36

Maximum number of messages to collect before automatically

37

publishing the batch.

38

"""

39

```

40

41

### Publisher Options

42

43

Configure publisher client behavior including message ordering, flow control, and OpenTelemetry.

44

45

```python { .api }

46

class PublisherOptions(NamedTuple):

47

"""

48

Options for the publisher client.

49

50

Attributes:

51

- enable_message_ordering: Whether to order messages by ordering key

52

- flow_control: Flow control settings for message publishing

53

- retry: Retry settings for publish operations

54

- timeout: Timeout settings for publish operations

55

- enable_open_telemetry_tracing: Whether to enable OpenTelemetry tracing

56

"""

57

58

enable_message_ordering: bool = False

59

"""

60

Whether to order messages in a batch by a supplied ordering key.

61

"""

62

63

flow_control: PublishFlowControl = PublishFlowControl()

64

"""

65

Flow control settings for message publishing by the client.

66

By default the publisher client does not do any throttling.

67

"""

68

69

retry: OptionalRetry = ...

70

"""

71

Retry settings for message publishing by the client.

72

Should be an instance of google.api_core.retry.Retry.

73

"""

74

75

timeout: OptionalTimeout = ConstantTimeout(60)

76

"""

77

Timeout settings for message publishing by the client.

78

Should be compatible with pubsub_v1.types.TimeoutType.

79

"""

80

81

enable_open_telemetry_tracing: bool = False

82

"""

83

Whether to enable OpenTelemetry tracing for publish operations.

84

"""

85

```

86

87

### Publish Flow Control

88

89

Configure flow control limits for the publisher client to prevent resource exhaustion.

90

91

```python { .api }

92

class PublishFlowControl(NamedTuple):

93

"""

94

Client flow control settings for message publishing.

95

96

Attributes:

97

- message_limit: Maximum number of messages awaiting publication

98

- byte_limit: Maximum total size of messages awaiting publication

99

- limit_exceeded_behavior: Action when flow control limits are exceeded

100

"""

101

102

message_limit: int = 1000

103

"""

104

Maximum number of messages awaiting to be published.

105

"""

106

107

byte_limit: int = 10000000

108

"""

109

Maximum total size of messages awaiting to be published.

110

"""

111

112

limit_exceeded_behavior: LimitExceededBehavior = LimitExceededBehavior.IGNORE

113

"""

114

Action to take when publish flow control limits are exceeded.

115

"""

116

```

117

118

### Limit Exceeded Behavior

119

120

Enum defining actions when flow control limits are exceeded.

121

122

```python { .api }

123

class LimitExceededBehavior(str, Enum):

124

"""

125

Possible actions when exceeding publish flow control limits.

126

"""

127

128

IGNORE = "ignore"

129

"""

130

Ignore flow control limits and continue publishing.

131

"""

132

133

BLOCK = "block"

134

"""

135

Block publishing until flow control limits are no longer exceeded.

136

"""

137

138

ERROR = "error"

139

"""

140

Raise an error when flow control limits are exceeded.

141

"""

142

```

143

144

### Subscriber Flow Control

145

146

Configure flow control limits for the subscriber client to manage message processing load.

147

148

```python { .api }

149

class FlowControl(NamedTuple):

150

"""

151

Settings for controlling the rate at which messages are pulled

152

with an asynchronous subscription.

153

154

Attributes:

155

- max_bytes: Maximum total size of outstanding messages

156

- max_messages: Maximum number of outstanding messages

157

- max_lease_duration: Maximum time to hold a lease on a message

158

- min_duration_per_lease_extension: Minimum lease extension duration

159

- max_duration_per_lease_extension: Maximum lease extension duration

160

"""

161

162

max_bytes: int = 104857600 # 100 MiB

163

"""

164

Maximum total size of received - but not yet processed - messages

165

before pausing the message stream.

166

"""

167

168

max_messages: int = 1000

169

"""

170

Maximum number of received - but not yet processed - messages

171

before pausing the message stream.

172

"""

173

174

max_lease_duration: float = 3600 # 1 hour

175

"""

176

Maximum amount of time in seconds to hold a lease on a message

177

before dropping it from lease management.

178

"""

179

180

min_duration_per_lease_extension: float = 0

181

"""

182

Minimum amount of time in seconds for a single lease extension attempt.

183

Must be between 10 and 600 seconds (inclusive). Ignored by default,

184

but set to 60 seconds if subscription has exactly-once delivery enabled.

185

"""

186

187

max_duration_per_lease_extension: float = 0

188

"""

189

Maximum amount of time in seconds for a single lease extension attempt.

190

Bounds the delay before message redelivery if subscriber fails to extend

191

the deadline. Must be between 10 and 600 seconds (inclusive).

192

Ignored if set to 0.

193

"""

194

```

195

196

### Subscriber Options

197

198

Configure subscriber client behavior including OpenTelemetry tracing.

199

200

```python { .api }

201

class SubscriberOptions(NamedTuple):

202

"""

203

Options for the subscriber client.

204

205

Attributes:

206

- enable_open_telemetry_tracing: Whether to enable OpenTelemetry tracing

207

"""

208

209

enable_open_telemetry_tracing: bool = False

210

"""

211

Whether to enable OpenTelemetry tracing for subscriber operations.

212

"""

213

```

214

215

## Usage Examples

216

217

### Custom Batch Settings

218

219

```python

220

from google.cloud import pubsub_v1

221

from google.cloud.pubsub_v1 import types

222

223

# Configure custom batching

224

batch_settings = types.BatchSettings(

225

max_bytes=500000, # 500KB batches

226

max_latency=0.05, # 50ms max wait

227

max_messages=50 # Max 50 messages per batch

228

)

229

230

# Create publisher with custom batching

231

publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)

232

```

233

234

### Publisher with Message Ordering

235

236

```python

237

from google.cloud import pubsub_v1

238

from google.cloud.pubsub_v1 import types

239

240

# Configure publisher for message ordering

241

publisher_options = types.PublisherOptions(

242

enable_message_ordering=True,

243

enable_open_telemetry_tracing=True

244

)

245

246

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

247

248

# Publish ordered messages

249

topic_path = publisher.topic_path("my-project", "my-topic")

250

for i in range(10):

251

future = publisher.publish(

252

topic_path,

253

f"Message {i}".encode(),

254

ordering_key="user-123"

255

)

256

```

257

258

### Publisher Flow Control

259

260

```python

261

from google.cloud import pubsub_v1

262

from google.cloud.pubsub_v1 import types

263

264

# Configure publisher flow control

265

flow_control = types.PublishFlowControl(

266

message_limit=500, # Max 500 pending messages

267

byte_limit=5000000, # Max 5MB pending bytes

268

limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK

269

)

270

271

publisher_options = types.PublisherOptions(

272

flow_control=flow_control

273

)

274

275

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

276

```

277

278

### Subscriber Flow Control

279

280

```python

281

from google.cloud import pubsub_v1

282

from google.cloud.pubsub_v1 import types

283

284

# Configure subscriber flow control

285

flow_control = types.FlowControl(

286

max_messages=50, # Process max 50 messages concurrently

287

max_bytes=50 * 1024 * 1024, # Max 50MB outstanding

288

max_lease_duration=300, # 5 minute max lease

289

min_duration_per_lease_extension=60, # Min 60s lease extensions

290

max_duration_per_lease_extension=120 # Max 120s lease extensions

291

)

292

293

subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)

294

```

295

296

### Retry and Timeout Configuration

297

298

```python

299

from google.cloud import pubsub_v1

300

from google.cloud.pubsub_v1 import types

301

from google.api_core import retry

302

from google.api_core.timeout import ConstantTimeout

303

304

# Configure custom retry policy

305

custom_retry = retry.Retry(

306

initial=1.0, # Initial delay

307

maximum=60.0, # Maximum delay

308

multiplier=2.0, # Backoff multiplier

309

deadline=300.0 # Total deadline

310

)

311

312

# Configure custom timeout

313

custom_timeout = ConstantTimeout(120)

314

315

publisher_options = types.PublisherOptions(

316

retry=custom_retry,

317

timeout=custom_timeout

318

)

319

320

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

321

```

322

323

### High-Throughput Configuration

324

325

```python

326

from google.cloud import pubsub_v1

327

from google.cloud.pubsub_v1 import types

328

329

# High-throughput publisher configuration

330

batch_settings = types.BatchSettings(

331

max_bytes=5000000, # 5MB batches

332

max_latency=0.1, # 100ms max wait

333

max_messages=1000 # Large batches

334

)

335

336

flow_control = types.PublishFlowControl(

337

message_limit=10000, # High message limit

338

byte_limit=100000000, # 100MB limit

339

limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK

340

)

341

342

publisher_options = types.PublisherOptions(

343

flow_control=flow_control

344

)

345

346

publisher = pubsub_v1.PublisherClient(

347

batch_settings=batch_settings,

348

publisher_options=publisher_options

349

)

350

351

# High-throughput subscriber configuration

352

subscriber_flow_control = types.FlowControl(

353

max_messages=1000, # High concurrency

354

max_bytes=200 * 1024 * 1024, # 200MB outstanding

355

max_lease_duration=1800 # 30 minute max lease

356

)

357

358

subscriber = pubsub_v1.SubscriberClient(flow_control=subscriber_flow_control)

359

```

360

361

### Low-Latency Configuration

362

363

```python

364

from google.cloud import pubsub_v1

365

from google.cloud.pubsub_v1 import types

366

367

# Low-latency publisher configuration

368

batch_settings = types.BatchSettings(

369

max_bytes=100000, # Small batches for low latency

370

max_latency=0.001, # 1ms max wait

371

max_messages=10 # Small batches

372

)

373

374

publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)

375

376

# Low-latency subscriber configuration

377

flow_control = types.FlowControl(

378

max_messages=10, # Low concurrency for predictable latency

379

max_bytes=1024 * 1024, # 1MB outstanding

380

max_lease_duration=60 # Short lease duration

381

)

382

383

subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)

384

```