or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-api.mdchannel-management.mderror-handling.mdindex.mdinterceptors.mdprotobuf-integration.mdrpc-patterns.mdsecurity-authentication.mdserver-implementation.md

rpc-patterns.mddocs/

0

# RPC Patterns and Multi-Callables

1

2

Support for all four RPC patterns (unary-unary, unary-stream, stream-unary, stream-stream) with synchronous and asynchronous invocation methods, comprehensive timeout handling, metadata passing, and credential specification for flexible client-side RPC execution.

3

4

## Capabilities

5

6

### Unary-Unary Pattern

7

8

Single request to single response RPC pattern with synchronous, asynchronous, and callback-based invocation modes.

9

10

```python { .api }

11

class UnaryUnaryMultiCallable(abc.ABC):

12

"""Affords invoking a unary-unary RPC from client-side."""

13

14

def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):

15

"""

16

Synchronously invokes the underlying RPC.

17

18

Parameters:

19

- request: The request value for the RPC

20

- timeout: Optional duration in seconds to allow for the RPC

21

- metadata: Optional metadata to be transmitted to the service-side

22

- credentials: Optional CallCredentials for the RPC (secure Channel only)

23

- wait_for_ready: Optional flag to enable wait_for_ready mechanism

24

- compression: Optional compression element (e.g., grpc.compression.Gzip)

25

26

Returns:

27

The response value for the RPC

28

29

Raises:

30

RpcError: Indicating that the RPC terminated with non-OK status

31

"""

32

33

def with_call(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):

34

"""

35

Synchronously invokes the underlying RPC and returns Call object.

36

37

Parameters:

38

Same as __call__()

39

40

Returns:

41

tuple: (response_value, Call) - Response and Call object for RPC metadata

42

43

Raises:

44

RpcError: Indicating that the RPC terminated with non-OK status

45

"""

46

47

def future(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):

48

"""

49

Asynchronously invokes the underlying RPC.

50

51

Parameters:

52

Same as __call__()

53

54

Returns:

55

Future: Call-Future object; result() returns response, exception() returns RpcError

56

"""

57

```

58

59

**Usage Examples:**

60

61

```python

62

# Create channel and stub

63

channel = grpc.insecure_channel('localhost:50051')

64

stub = my_service_pb2_grpc.MyServiceStub(channel)

65

66

# Synchronous call

67

request = my_service_pb2.MyRequest(message="Hello")

68

response = stub.UnaryMethod(request, timeout=5.0)

69

print(response.reply)

70

71

# Synchronous call with metadata and call info

72

metadata = [('user-agent', 'my-client/1.0')]

73

response, call = stub.UnaryMethod.with_call(

74

request,

75

timeout=5.0,

76

metadata=metadata

77

)

78

print(f"Response: {response.reply}")

79

print(f"Status code: {call.code()}")

80

print(f"Status details: {call.details()}")

81

82

# Asynchronous call with Future

83

future = stub.UnaryMethod.future(request, timeout=5.0)

84

try:

85

response = future.result(timeout=10.0)

86

print(response.reply)

87

except grpc.RpcError as e:

88

print(f"RPC failed: {e.code()} - {e.details()}")

89

except grpc.FutureTimeoutError:

90

print("Future timed out")

91

92

# With credentials and compression

93

ssl_creds = grpc.ssl_channel_credentials()

94

call_creds = grpc.access_token_call_credentials("token")

95

channel_creds = grpc.composite_channel_credentials(ssl_creds, call_creds)

96

secure_channel = grpc.secure_channel('secure-server.com:443', channel_creds)

97

secure_stub = my_service_pb2_grpc.MyServiceStub(secure_channel)

98

99

response = secure_stub.UnaryMethod(

100

request,

101

timeout=30.0,

102

compression=grpc.compression.Gzip,

103

metadata=[('request-id', 'req-123')]

104

)

105

```

106

107

### Unary-Stream Pattern

108

109

Single request to multiple response RPC pattern with iterator-based response consumption.

110

111

```python { .api }

112

class UnaryStreamMultiCallable(abc.ABC):

113

"""Affords invoking a unary-stream RPC from client-side."""

114

115

def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):

116

"""

117

Invokes the underlying RPC.

118

119

Parameters:

120

- request: The request value for the RPC

121

- timeout: Optional duration in seconds (None for infinite)

122

- metadata: Optional metadata to be transmitted to the service-side

123

- credentials: Optional CallCredentials for the RPC (secure Channel only)

124

- wait_for_ready: Optional flag to enable wait_for_ready mechanism

125

- compression: Optional compression element

126

127

Returns:

128

Iterator: Call-iterator for response values and Future for RPC completion

129

130

Note:

131

Drawing response values may raise RpcError indicating non-OK termination

132

"""

133

```

134

135

**Usage Examples:**

136

137

```python

138

# Unary-stream call

139

request = my_service_pb2.StreamRequest(count=5)

140

response_iterator = stub.UnaryStreamMethod(request, timeout=30.0)

141

142

# Iterate over responses

143

try:

144

for response in response_iterator:

145

print(f"Received: {response.message}")

146

# Can break early if needed

147

if should_stop():

148

break

149

except grpc.RpcError as e:

150

print(f"Stream failed: {e.code()} - {e.details()}")

151

152

# Access call information

153

response_iterator = stub.UnaryStreamMethod(request)

154

print(f"Initial metadata: {response_iterator.initial_metadata()}")

155

156

try:

157

responses = list(response_iterator) # Consume all responses

158

print(f"Trailing metadata: {response_iterator.trailing_metadata()}")

159

print(f"Final status: {response_iterator.code()}")

160

except grpc.RpcError as e:

161

print(f"Stream terminated with error: {e}")

162

```

163

164

### Stream-Unary Pattern

165

166

Multiple request to single response RPC pattern with iterator-based request sending.

167

168

```python { .api }

169

class StreamUnaryMultiCallable(abc.ABC):

170

"""Affords invoking a stream-unary RPC from client-side."""

171

172

def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):

173

"""

174

Synchronously invokes the underlying RPC.

175

176

Parameters:

177

- request_iterator: An iterator that yields request values for the RPC

178

- timeout: Optional duration in seconds (None for infinite)

179

- metadata: Optional metadata to be transmitted to the service-side

180

- credentials: Optional CallCredentials for the RPC (secure Channel only)

181

- wait_for_ready: Optional flag to enable wait_for_ready mechanism

182

- compression: Optional compression element

183

184

Returns:

185

The response value for the RPC

186

187

Raises:

188

RpcError: Indicating that the RPC terminated with non-OK status

189

"""

190

191

def with_call(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):

192

"""

193

Synchronously invokes the underlying RPC and returns Call object.

194

195

Returns:

196

tuple: (response_value, Call) - Response and Call object for RPC metadata

197

"""

198

199

def future(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):

200

"""

201

Asynchronously invokes the underlying RPC.

202

203

Returns:

204

Future: Call-Future object for asynchronous result retrieval

205

"""

206

```

207

208

**Usage Examples:**

209

210

```python

211

# Stream-unary call with generator

212

def request_generator():

213

for i in range(10):

214

yield my_service_pb2.StreamRequest(data=f"item-{i}")

215

time.sleep(0.1) # Simulate processing time

216

217

response = stub.StreamUnaryMethod(request_generator(), timeout=30.0)

218

print(f"Final result: {response.summary}")

219

220

# Stream-unary with list of requests

221

requests = [

222

my_service_pb2.StreamRequest(data="first"),

223

my_service_pb2.StreamRequest(data="second"),

224

my_service_pb2.StreamRequest(data="third"),

225

]

226

227

response, call = stub.StreamUnaryMethod.with_call(iter(requests))

228

print(f"Response: {response.summary}")

229

print(f"Metadata: {dict(call.trailing_metadata())}")

230

231

# Asynchronous stream-unary

232

def async_request_generator():

233

for i in range(100):

234

yield my_service_pb2.StreamRequest(data=f"batch-{i}")

235

236

future = stub.StreamUnaryMethod.future(async_request_generator(), timeout=60.0)

237

238

# Do other work while RPC executes

239

do_other_work()

240

241

# Get result when ready

242

try:

243

response = future.result(timeout=10.0)

244

print(f"Async result: {response.summary}")

245

except grpc.FutureTimeoutError:

246

print("Still waiting for result...")

247

response = future.result() # Wait indefinitely

248

```

249

250

### Stream-Stream Pattern

251

252

Bidirectional streaming RPC pattern with full-duplex communication capabilities.

253

254

```python { .api }

255

class StreamStreamMultiCallable(abc.ABC):

256

"""Affords invoking a stream-stream RPC on client-side."""

257

258

def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):

259

"""

260

Invokes the underlying RPC on the client.

261

262

Parameters:

263

- request_iterator: An iterator that yields request values for the RPC

264

- timeout: Optional duration in seconds (None for infinite)

265

- metadata: Optional metadata to be transmitted to the service-side

266

- credentials: Optional CallCredentials for the RPC (secure Channel only)

267

- wait_for_ready: Optional flag to enable wait_for_ready mechanism

268

- compression: Optional compression element

269

270

Returns:

271

Iterator: Call-iterator for response values and Future for RPC completion

272

273

Note:

274

Drawing response values may raise RpcError indicating non-OK termination

275

"""

276

```

277

278

**Usage Examples:**

279

280

```python

281

# Bidirectional streaming

282

def request_generator():

283

for i in range(5):

284

yield my_service_pb2.ChatMessage(user="client", message=f"Message {i}")

285

time.sleep(1)

286

287

response_iterator = stub.StreamStreamMethod(request_generator())

288

289

# Process responses as they arrive

290

try:

291

for response in response_iterator:

292

print(f"Server says: {response.message}")

293

except grpc.RpcError as e:

294

print(f"Stream error: {e}")

295

296

# Chat-like bidirectional streaming

297

import threading

298

import queue

299

300

def chat_client():

301

# Queue for sending messages

302

message_queue = queue.Queue()

303

304

def request_generator():

305

while True:

306

try:

307

message = message_queue.get(timeout=1.0)

308

if message is None: # Sentinel to stop

309

break

310

yield my_service_pb2.ChatMessage(user="client", message=message)

311

except queue.Empty:

312

continue

313

314

# Start streaming RPC

315

response_iterator = stub.ChatMethod(request_generator())

316

317

# Thread to handle incoming messages

318

def handle_responses():

319

try:

320

for response in response_iterator:

321

print(f"[{response.user}]: {response.message}")

322

except grpc.RpcError as e:

323

print(f"Chat ended: {e}")

324

325

response_thread = threading.Thread(target=handle_responses)

326

response_thread.start()

327

328

# Send messages from user input

329

try:

330

while True:

331

user_input = input("You: ")

332

if user_input.lower() == '/quit':

333

break

334

message_queue.put(user_input)

335

finally:

336

message_queue.put(None) # Signal to stop request generator

337

response_thread.join()

338

339

# Advanced bidirectional streaming with flow control

340

class FlowControlledStreaming:

341

def __init__(self, stub):

342

self.stub = stub

343

self.request_queue = queue.Queue(maxsize=10) # Limit pending requests

344

self.stop_event = threading.Event()

345

346

def request_generator(self):

347

while not self.stop_event.is_set():

348

try:

349

request = self.request_queue.get(timeout=0.5)

350

yield request

351

except queue.Empty:

352

continue

353

354

def start_streaming(self):

355

response_iterator = self.stub.StreamStreamMethod(

356

self.request_generator(),

357

timeout=300.0

358

)

359

360

for response in response_iterator:

361

self.process_response(response)

362

363

# Flow control: only send new requests after processing response

364

if not self.request_queue.full():

365

self.maybe_send_request()

366

367

def send_request(self, request):

368

if not self.request_queue.full():

369

self.request_queue.put(request)

370

else:

371

print("Request queue full, dropping request")

372

373

def stop(self):

374

self.stop_event.set()

375

```

376

377

### Call Objects and Context

378

379

Access to RPC metadata, status, and control information through Call objects.

380

381

```python { .api }

382

class Call(RpcContext):

383

"""Invocation-side utility object for an RPC."""

384

385

def initial_metadata(self):

386

"""

387

Accesses the initial metadata sent by the server.

388

This method blocks until the value is available.

389

390

Returns:

391

Metadata: The initial metadata key-value pairs

392

"""

393

394

def trailing_metadata(self):

395

"""

396

Accesses the trailing metadata sent by the server.

397

This method blocks until the value is available.

398

399

Returns:

400

Metadata: The trailing metadata key-value pairs

401

"""

402

403

def code(self) -> StatusCode:

404

"""

405

Accesses the status code sent by the server.

406

This method blocks until the value is available.

407

408

Returns:

409

StatusCode: The status code value for the RPC

410

"""

411

412

def details(self) -> str:

413

"""

414

Accesses the details sent by the server.

415

This method blocks until the value is available.

416

417

Returns:

418

str: The details string of the RPC

419

"""

420

421

def is_active(self) -> bool:

422

"""

423

Describes whether the RPC is active or has terminated.

424

425

Returns:

426

bool: True if RPC is active, False otherwise

427

"""

428

429

def time_remaining(self):

430

"""

431

Describes the length of allowed time remaining for the RPC.

432

433

Returns:

434

float or None: Seconds remaining for RPC completion, or None if no deadline

435

"""

436

437

def cancel(self):

438

"""

439

Cancels the RPC.

440

Idempotent and has no effect if the RPC has already terminated.

441

"""

442

443

def add_callback(self, callback) -> bool:

444

"""

445

Registers a callback to be called on RPC termination.

446

447

Parameters:

448

- callback: A no-parameter callable to be called on RPC termination

449

450

Returns:

451

bool: True if callback was added, False if RPC already terminated

452

"""

453

```

454

455

**Usage Examples:**

456

457

```python

458

# Access call metadata and status

459

response, call = stub.UnaryMethod.with_call(request)

460

461

print(f"Initial metadata: {dict(call.initial_metadata())}")

462

print(f"Status code: {call.code()}")

463

print(f"Status details: {call.details()}")

464

print(f"Trailing metadata: {dict(call.trailing_metadata())}")

465

466

# RPC cancellation

467

future = stub.LongRunningMethod.future(request)

468

469

# Cancel after 5 seconds if not done

470

def cancel_if_needed():

471

time.sleep(5)

472

if not future.done():

473

future.cancel()

474

print("RPC cancelled due to timeout")

475

476

threading.Thread(target=cancel_if_needed).start()

477

478

try:

479

response = future.result()

480

except grpc.FutureCancelledError:

481

print("RPC was cancelled")

482

483

# Streaming with call control

484

response_iterator = stub.UnaryStreamMethod(request)

485

486

def handle_cancellation():

487

time.sleep(10)

488

if response_iterator.is_active():

489

response_iterator.cancel()

490

print("Stream cancelled")

491

492

threading.Thread(target=handle_cancellation).start()

493

494

try:

495

for response in response_iterator:

496

print(f"Response: {response}")

497

if not response_iterator.is_active():

498

break

499

except grpc.RpcError as e:

500

if e.code() == grpc.StatusCode.CANCELLED:

501

print("Stream was cancelled")

502

```

503

504

## Types

505

506

```python { .api }

507

class ClientCallDetails(abc.ABC):

508

"""

509

Describes an RPC to be invoked.

510

511

Attributes:

512

- method: The method name of the RPC

513

- timeout: Optional duration of time in seconds to allow for the RPC

514

- metadata: Optional metadata to be transmitted to the service-side

515

- credentials: Optional CallCredentials for the RPC

516

- wait_for_ready: Optional flag to enable wait_for_ready mechanism

517

- compression: Optional compression element

518

"""

519

520

class RpcContext(abc.ABC):

521

"""Provides RPC-related information and action."""

522

523

def is_active(self) -> bool:

524

"""Returns True if RPC is active, False otherwise."""

525

526

def time_remaining(self):

527

"""Returns seconds remaining for RPC or None if no deadline."""

528

529

def cancel(self):

530

"""Cancels the RPC. Idempotent."""

531

532

def add_callback(self, callback) -> bool:

533

"""Registers callback for RPC termination."""

534

```