or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md

schedulers.mddocs/

0

# Schedulers and Utilities

1

2

Google Cloud Pub/Sub provides scheduler classes and utility functions to control message processing behavior and resource management in subscriber operations.

3

4

## Capabilities

5

6

### Thread Scheduler

7

8

Custom scheduler for controlling how messages are processed in subscriber operations.

9

10

```python { .api }

11

class ThreadScheduler:

12

"""

13

A thread pool-based scheduler for processing subscriber messages.

14

15

This scheduler manages the execution of message callbacks using a

16

configurable thread pool, allowing control over concurrency and

17

resource usage in message processing.

18

"""

19

20

def __init__(self, executor: Optional[ThreadPoolExecutor] = None):

21

"""

22

Initialize the thread scheduler.

23

24

Parameters:

25

- executor: Optional pre-configured ThreadPoolExecutor.

26

If not provided, a default executor will be created.

27

"""

28

29

def schedule(self, callback: Callable, *args, **kwargs) -> Future:

30

"""

31

Schedule a callback function for execution.

32

33

Parameters:

34

- callback: Function to execute

35

- *args: Positional arguments for the callback

36

- **kwargs: Keyword arguments for the callback

37

38

Returns:

39

Future representing the scheduled execution

40

"""

41

42

def shutdown(self, wait: bool = True) -> None:

43

"""

44

Shutdown the scheduler and stop accepting new tasks.

45

46

Parameters:

47

- wait: Whether to wait for currently executing tasks to complete

48

"""

49

50

@property

51

def executor(self) -> ThreadPoolExecutor:

52

"""

53

Get the underlying thread pool executor.

54

55

Returns:

56

ThreadPoolExecutor instance used by this scheduler

57

"""

58

```

59

60

### Future Classes

61

62

Future objects for handling asynchronous operations in publisher and subscriber.

63

64

```python { .api }

65

class StreamingPullFuture:

66

"""

67

Future object for managing streaming pull operations.

68

69

This future controls the lifecycle of a streaming pull subscription

70

and allows monitoring and cancellation of the operation.

71

"""

72

73

def cancel(self) -> bool:

74

"""

75

Cancel the streaming pull operation.

76

77

Stops the streaming pull and releases associated resources.

78

79

Returns:

80

True if the operation was successfully cancelled

81

"""

82

83

def cancelled(self) -> bool:

84

"""

85

Check if the streaming pull operation was cancelled.

86

87

Returns:

88

True if the operation is cancelled

89

"""

90

91

def running(self) -> bool:

92

"""

93

Check if the streaming pull operation is currently running.

94

95

Returns:

96

True if the operation is active

97

"""

98

99

def result(self, timeout: Optional[float] = None) -> None:

100

"""

101

Wait for the streaming pull operation to complete.

102

103

This method blocks until the streaming pull stops due to

104

cancellation, error, or other termination condition.

105

106

Parameters:

107

- timeout: Maximum time to wait in seconds

108

109

Raises:

110

TimeoutError: If the timeout is exceeded

111

"""

112

113

def add_done_callback(self, callback: Callable[["StreamingPullFuture"], None]) -> None:

114

"""

115

Add a callback to be executed when the future completes.

116

117

Parameters:

118

- callback: Function to call when the future is done

119

"""

120

121

class PublisherFuture:

122

"""

123

Future object for publisher operations that return message IDs.

124

125

This future represents the result of a publish operation and

126

resolves to the server-assigned message ID.

127

"""

128

129

def result(self, timeout: Optional[float] = None) -> str:

130

"""

131

Get the message ID from the publish operation.

132

133

Parameters:

134

- timeout: Maximum time to wait in seconds

135

136

Returns:

137

Server-assigned message ID

138

139

Raises:

140

TimeoutError: If the timeout is exceeded

141

PublishError: If the publish operation failed

142

"""

143

144

def add_done_callback(self, callback: Callable[["PublisherFuture"], None]) -> None:

145

"""

146

Add a callback to be executed when the future completes.

147

148

Parameters:

149

- callback: Function to call with the future as argument

150

"""

151

152

def cancel(self) -> bool:

153

"""

154

Attempt to cancel the publish operation.

155

156

Returns:

157

Always False (Pub/Sub publish operations cannot be cancelled)

158

"""

159

160

def cancelled(self) -> bool:

161

"""

162

Check if the publish operation was cancelled.

163

164

Returns:

165

Always False (Pub/Sub publish operations cannot be cancelled)

166

"""

167

168

class AcknowledgeFuture:

169

"""

170

Future object for acknowledgment operations in exactly-once delivery.

171

172

This future represents the result of an ack/nack operation and

173

resolves to an AcknowledgeStatus indicating the result.

174

"""

175

176

def result(self, timeout: Optional[float] = None) -> AcknowledgeStatus:

177

"""

178

Get the acknowledgment status.

179

180

Parameters:

181

- timeout: Maximum time to wait in seconds

182

183

Returns:

184

AcknowledgeStatus indicating the result

185

186

Raises:

187

TimeoutError: If the timeout is exceeded

188

AcknowledgeError: If the acknowledgment operation failed

189

"""

190

191

def add_done_callback(self, callback: Callable[["AcknowledgeFuture"], None]) -> None:

192

"""

193

Add a callback to be executed when the future completes.

194

195

Parameters:

196

- callback: Function to call with the future as argument

197

"""

198

```

199

200

### Utility Functions

201

202

Helper functions for working with Pub/Sub resources and operations.

203

204

```python { .api }

205

def common_project_path(project: str) -> str:

206

"""

207

Construct a project path string.

208

209

Parameters:

210

- project: Project ID

211

212

Returns:

213

Project path in the format "projects/{project}"

214

"""

215

216

def common_location_path(project: str, location: str) -> str:

217

"""

218

Construct a location path string.

219

220

Parameters:

221

- project: Project ID

222

- location: Location/region name

223

224

Returns:

225

Location path in the format "projects/{project}/locations/{location}"

226

"""

227

```

228

229

## Usage Examples

230

231

### Custom Thread Scheduler

232

233

```python

234

import concurrent.futures

235

from google.cloud import pubsub_v1

236

from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler

237

238

# Create custom thread pool executor

239

executor = concurrent.futures.ThreadPoolExecutor(

240

max_workers=20,

241

thread_name_prefix="pubsub-callback"

242

)

243

244

# Create scheduler with custom executor

245

scheduler = ThreadScheduler(executor=executor)

246

247

# Create subscriber with custom scheduler

248

subscriber = pubsub_v1.SubscriberClient()

249

subscription_path = subscriber.subscription_path("my-project", "my-subscription")

250

251

def message_callback(message):

252

print(f"Processing message: {message.message_id}")

253

# Simulate processing work

254

time.sleep(1)

255

message.ack()

256

257

# Use custom scheduler in subscription

258

streaming_pull_future = subscriber.subscribe(

259

subscription_path,

260

callback=message_callback,

261

scheduler=scheduler

262

)

263

264

try:

265

# Let it run for 60 seconds

266

streaming_pull_future.result(timeout=60)

267

except KeyboardInterrupt:

268

streaming_pull_future.cancel()

269

finally:

270

# Shutdown scheduler

271

scheduler.shutdown(wait=True)

272

```

273

274

### Managing Streaming Pull Future

275

276

```python

277

from google.cloud import pubsub_v1

278

import threading

279

import time

280

281

subscriber = pubsub_v1.SubscriberClient()

282

subscription_path = subscriber.subscription_path("my-project", "my-subscription")

283

284

def callback(message):

285

print(f"Received: {message.data.decode('utf-8')}")

286

message.ack()

287

288

# Start streaming pull

289

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

290

291

print(f"Streaming pull running: {streaming_pull_future.running()}")

292

print(f"Streaming pull cancelled: {streaming_pull_future.cancelled()}")

293

294

# Add completion callback

295

def on_done(future):

296

print("Streaming pull completed")

297

print(f"Was cancelled: {future.cancelled()}")

298

299

streaming_pull_future.add_done_callback(on_done)

300

301

# Run in background thread to allow monitoring

302

def monitor_future():

303

try:

304

# Wait for streaming pull to complete

305

streaming_pull_future.result()

306

except Exception as e:

307

print(f"Streaming pull error: {e}")

308

309

monitor_thread = threading.Thread(target=monitor_future)

310

monitor_thread.start()

311

312

# Let it run for 30 seconds, then cancel

313

time.sleep(30)

314

print("Cancelling streaming pull...")

315

streaming_pull_future.cancel()

316

317

# Wait for monitor thread to complete

318

monitor_thread.join()

319

```

320

321

### Publisher Future Handling

322

323

```python

324

from google.cloud import pubsub_v1

325

import concurrent.futures

326

327

publisher = pubsub_v1.PublisherClient()

328

topic_path = publisher.topic_path("my-project", "my-topic")

329

330

# Publish multiple messages and collect futures

331

futures = []

332

for i in range(10):

333

future = publisher.publish(topic_path, f"Message {i}".encode())

334

futures.append(future)

335

336

# Add callbacks to futures

337

def on_publish_complete(future):

338

try:

339

message_id = future.result()

340

print(f"Published successfully: {message_id}")

341

except Exception as e:

342

print(f"Publish failed: {e}")

343

344

for future in futures:

345

future.add_done_callback(on_publish_complete)

346

347

# Wait for all futures to complete

348

try:

349

# Use concurrent.futures.as_completed for efficient waiting

350

for future in concurrent.futures.as_completed(futures, timeout=30):

351

message_id = future.result()

352

print(f"Completed: {message_id}")

353

354

except concurrent.futures.TimeoutError:

355

print("Some publishes did not complete within timeout")

356

```

357

358

### Exactly-Once Delivery with Acknowledge Futures

359

360

```python

361

from google.cloud import pubsub_v1

362

from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus

363

364

subscriber = pubsub_v1.SubscriberClient()

365

subscription_path = subscriber.subscription_path("my-project", "my-subscription")

366

367

def callback(message):

368

try:

369

# Process the message

370

result = process_message(message.data)

371

372

# Acknowledge with response for exactly-once delivery

373

ack_future = message.ack_with_response()

374

375

# Add callback to handle ack result

376

def on_ack_complete(ack_future):

377

try:

378

ack_status = ack_future.result()

379

if ack_status == AcknowledgeStatus.SUCCESS:

380

print(f"Message {message.message_id} acknowledged successfully")

381

else:

382

print(f"Ack failed with status: {ack_status}")

383

# Handle failed acknowledgment

384

385

except Exception as e:

386

print(f"Ack operation failed: {e}")

387

388

ack_future.add_done_callback(on_ack_complete)

389

390

except Exception as e:

391

print(f"Message processing failed: {e}")

392

# Nack with response

393

nack_future = message.nack_with_response()

394

nack_future.add_done_callback(

395

lambda f: print(f"Nack status: {f.result()}")

396

)

397

398

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

399

```

400

401

### Resource Path Utilities

402

403

```python

404

from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient

405

406

# Using path construction utilities

407

project_id = "my-project"

408

topic_name = "my-topic"

409

subscription_name = "my-subscription"

410

411

# Construct paths

412

topic_path = PublisherClient.topic_path(project_id, topic_name)

413

subscription_path = SubscriberClient.subscription_path(project_id, subscription_name)

414

project_path = PublisherClient.common_project_path(project_id)

415

416

print(f"Topic path: {topic_path}")

417

print(f"Subscription path: {subscription_path}")

418

print(f"Project path: {project_path}")

419

420

# Parse paths back to components

421

topic_components = PublisherClient.parse_topic_path(topic_path)

422

print(f"Topic components: {topic_components}")

423

# Output: {'project': 'my-project', 'topic': 'my-topic'}

424

425

subscription_components = SubscriberClient.parse_subscription_path(subscription_path)

426

print(f"Subscription components: {subscription_components}")

427

# Output: {'project': 'my-project', 'subscription': 'my-subscription'}

428

```

429

430

### Thread Pool Configuration

431

432

```python

433

import concurrent.futures

434

from google.cloud import pubsub_v1

435

from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler

436

437

# Configure different thread pools for different workloads

438

def create_scheduler_for_workload(workload_type: str) -> ThreadScheduler:

439

if workload_type == "io_intensive":

440

# More threads for I/O bound work

441

executor = concurrent.futures.ThreadPoolExecutor(

442

max_workers=50,

443

thread_name_prefix=f"pubsub-io"

444

)

445

elif workload_type == "cpu_intensive":

446

# Limit threads for CPU bound work

447

executor = concurrent.futures.ThreadPoolExecutor(

448

max_workers=4, # Match CPU cores

449

thread_name_prefix=f"pubsub-cpu"

450

)

451

else:

452

# Default configuration

453

executor = concurrent.futures.ThreadPoolExecutor(

454

max_workers=10,

455

thread_name_prefix=f"pubsub-default"

456

)

457

458

return ThreadScheduler(executor=executor)

459

460

# Use different schedulers for different subscriptions

461

io_scheduler = create_scheduler_for_workload("io_intensive")

462

cpu_scheduler = create_scheduler_for_workload("cpu_intensive")

463

464

subscriber = pubsub_v1.SubscriberClient()

465

466

# I/O intensive subscription (e.g., API calls, database operations)

467

io_subscription = subscriber.subscription_path("my-project", "io-intensive-sub")

468

io_future = subscriber.subscribe(

469

io_subscription,

470

callback=io_intensive_callback,

471

scheduler=io_scheduler

472

)

473

474

# CPU intensive subscription (e.g., data processing, calculations)

475

cpu_subscription = subscriber.subscription_path("my-project", "cpu-intensive-sub")

476

cpu_future = subscriber.subscribe(

477

cpu_subscription,

478

callback=cpu_intensive_callback,

479

scheduler=cpu_scheduler

480

)

481

```