or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-api.mdchannel-management.mderror-handling.mdindex.mdinterceptors.mdprotobuf-integration.mdrpc-patterns.mdsecurity-authentication.mdserver-implementation.md

server-implementation.mddocs/

0

# Server Implementation

1

2

Server implementation provides comprehensive server creation and lifecycle management with support for multiple service registration, port configuration, thread pool management, graceful shutdown, and both synchronous and asynchronous execution models.

3

4

## Capabilities

5

6

### Server Creation

7

8

Creates gRPC servers with configurable thread pools, interceptors, and advanced options for production deployments.

9

10

```python { .api }

11

def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False) -> Server:

12

"""

13

Creates a Server with which RPCs can be serviced.

14

15

Parameters:

16

- thread_pool: A futures.ThreadPoolExecutor for executing RPC handlers

17

- handlers: Optional list of GenericRpcHandlers for initial service registration

18

- interceptors: Optional list of ServerInterceptors for middleware

19

- options: Optional list of key-value pairs for server configuration

20

- maximum_concurrent_rpcs: Maximum concurrent RPCs before returning RESOURCE_EXHAUSTED

21

- compression: Default compression algorithm for the server lifetime

22

- xds: If True, retrieves server configuration via xDS (EXPERIMENTAL)

23

24

Returns:

25

Server: A Server object ready for service registration and startup

26

"""

27

```

28

29

**Usage Examples:**

30

31

```python

32

from concurrent import futures

33

34

# Basic server with thread pool

35

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

36

37

# Server with interceptors

38

class LoggingInterceptor(grpc.ServerInterceptor):

39

def intercept_service(self, continuation, handler_call_details):

40

print(f"Handling {handler_call_details.method}")

41

return continuation(handler_call_details)

42

43

server = grpc.server(

44

futures.ThreadPoolExecutor(max_workers=10),

45

interceptors=[LoggingInterceptor()]

46

)

47

48

# Server with advanced options

49

options = [

50

('grpc.keepalive_time_ms', 30000),

51

('grpc.max_concurrent_streams', 100),

52

]

53

server = grpc.server(

54

futures.ThreadPoolExecutor(max_workers=20),

55

options=options,

56

maximum_concurrent_rpcs=1000,

57

compression=grpc.compression.Gzip

58

)

59

```

60

61

### Server Interface

62

63

Core server interface providing service registration, port management, and lifecycle control.

64

65

```python { .api }

66

class Server(abc.ABC):

67

"""Services RPCs with comprehensive lifecycle management."""

68

69

def add_generic_rpc_handlers(self, generic_rpc_handlers):

70

"""

71

Registers GenericRpcHandlers with this Server.

72

Must be called before server is started.

73

74

Parameters:

75

- generic_rpc_handlers: Iterable of GenericRpcHandlers

76

"""

77

78

def add_registered_method_handlers(self, service_name: str, method_handlers):

79

"""

80

Registers method handlers for a specific service.

81

Registered handlers take precedence over generic handlers.

82

83

Parameters:

84

- service_name: The service name

85

- method_handlers: Dictionary mapping method names to RpcMethodHandlers

86

"""

87

88

def add_insecure_port(self, address: str) -> int:

89

"""

90

Opens an insecure port for accepting RPCs.

91

Must be called before starting the server.

92

93

Parameters:

94

- address: Address to bind (e.g., '[::]:50051', 'localhost:0')

95

96

Returns:

97

int: The actual port number where server will accept requests

98

"""

99

100

def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int:

101

"""

102

Opens a secure port for accepting RPCs.

103

Must be called before starting the server.

104

105

Parameters:

106

- address: Address to bind

107

- server_credentials: ServerCredentials object for SSL/TLS

108

109

Returns:

110

int: The actual port number where server will accept requests

111

"""

112

113

def start(self):

114

"""

115

Starts this Server.

116

May only be called once (not idempotent).

117

"""

118

119

def stop(self, grace) -> threading.Event:

120

"""

121

Stops this Server with optional grace period.

122

123

Parameters:

124

- grace: Duration in seconds to wait for active RPCs, or None for immediate stop

125

126

Returns:

127

threading.Event: Event that will be set when server completely stops

128

"""

129

130

def wait_for_termination(self, timeout=None) -> bool:

131

"""

132

Block current thread until the server stops.

133

134

Parameters:

135

- timeout: Optional timeout in seconds

136

137

Returns:

138

bool: True if server stopped normally, False if timeout occurred

139

"""

140

```

141

142

**Usage Examples:**

143

144

```python

145

from concurrent import futures

146

import time

147

148

# Complete server lifecycle

149

def run_server():

150

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

151

152

# Register services

153

add_MyServiceServicer_to_server(MyServiceServicer(), server)

154

155

# Add ports

156

insecure_port = server.add_insecure_port('[::]:0')

157

print(f"Server will listen on insecure port: {insecure_port}")

158

159

# Start server

160

server.start()

161

print("Server started")

162

163

try:

164

# Keep server running

165

server.wait_for_termination()

166

except KeyboardInterrupt:

167

print("Shutting down server...")

168

server.stop(grace=5.0).wait()

169

print("Server stopped")

170

171

# Secure server setup

172

def run_secure_server():

173

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

174

175

# Load SSL credentials

176

with open('server.key', 'rb') as f:

177

private_key = f.read()

178

with open('server.crt', 'rb') as f:

179

certificate_chain = f.read()

180

181

credentials = grpc.ssl_server_credentials([(private_key, certificate_chain)])

182

183

# Add secure port

184

secure_port = server.add_secure_port('[::]:50051', credentials)

185

print(f"Secure server listening on port: {secure_port}")

186

187

server.start()

188

server.wait_for_termination()

189

```

190

191

### Service Context

192

193

Provides RPC-specific context and control for server-side method implementations.

194

195

```python { .api }

196

class ServicerContext(RpcContext):

197

"""Context object passed to method implementations."""

198

199

def invocation_metadata(self):

200

"""

201

Accesses the metadata sent by the client.

202

203

Returns:

204

Metadata: The invocation metadata key-value pairs

205

"""

206

207

def peer(self) -> str:

208

"""

209

Identifies the peer that invoked the RPC.

210

211

Returns:

212

str: String identifying the peer (format determined by gRPC runtime)

213

"""

214

215

def peer_identities(self):

216

"""

217

Gets one or more peer identity(s).

218

219

Returns:

220

Iterable of bytes or None: Peer identities if authenticated, None otherwise

221

"""

222

223

def peer_identity_key(self):

224

"""

225

The auth property used to identify the peer.

226

227

Returns:

228

str or None: Auth property name or None if not authenticated

229

"""

230

231

def auth_context(self):

232

"""

233

Gets the auth context for the call.

234

235

Returns:

236

dict: Map of auth properties to iterables of bytes

237

"""

238

239

def send_initial_metadata(self, initial_metadata):

240

"""

241

Sends the initial metadata value to the client.

242

243

Parameters:

244

- initial_metadata: The initial metadata key-value pairs

245

"""

246

247

def set_trailing_metadata(self, trailing_metadata):

248

"""

249

Sets the trailing metadata for the RPC.

250

251

Parameters:

252

- trailing_metadata: The trailing metadata key-value pairs

253

"""

254

255

def abort(self, code: StatusCode, details: str):

256

"""

257

Raises an exception to terminate the RPC with a non-OK status.

258

259

Parameters:

260

- code: A StatusCode object (must not be StatusCode.OK)

261

- details: A UTF-8-encodable string for the client

262

263

Raises:

264

Exception: Always raised to signal RPC abortion

265

"""

266

267

def abort_with_status(self, status):

268

"""

269

Raises an exception to terminate the RPC with a status object.

270

271

Parameters:

272

- status: A grpc.Status object (EXPERIMENTAL)

273

274

Raises:

275

Exception: Always raised to signal RPC abortion

276

"""

277

278

def set_code(self, code: StatusCode):

279

"""

280

Sets the value to be used as status code upon RPC completion.

281

282

Parameters:

283

- code: A StatusCode object to be sent to the client

284

"""

285

286

def set_details(self, details: str):

287

"""

288

Sets the value to be used as detail string upon RPC completion.

289

290

Parameters:

291

- details: A UTF-8-encodable string to be sent to the client

292

"""

293

294

def set_compression(self, compression):

295

"""

296

Set the compression algorithm to be used for the entire call.

297

298

Parameters:

299

- compression: An element of grpc.compression (e.g., grpc.compression.Gzip)

300

"""

301

302

def disable_next_message_compression(self):

303

"""

304

Disables compression for the next response message.

305

Overrides any compression configuration.

306

"""

307

```

308

309

**Usage Examples:**

310

311

```python

312

class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):

313

def UnaryMethod(self, request, context):

314

# Access client metadata

315

metadata = dict(context.invocation_metadata())

316

user_agent = metadata.get('user-agent', 'unknown')

317

318

# Check authentication

319

if not self.is_authenticated(context):

320

context.abort(grpc.StatusCode.UNAUTHENTICATED, 'Authentication required')

321

322

# Send initial metadata

323

context.send_initial_metadata([('server-version', '1.0')])

324

325

# Process request

326

try:

327

result = self.process_request(request)

328

329

# Set trailing metadata

330

context.set_trailing_metadata([('processed-count', str(len(result)))])

331

332

return my_service_pb2.MyResponse(data=result)

333

except ValueError as e:

334

context.set_code(grpc.StatusCode.INVALID_ARGUMENT)

335

context.set_details(f'Invalid request: {str(e)}')

336

raise

337

except Exception as e:

338

context.abort(grpc.StatusCode.INTERNAL, 'Internal server error')

339

340

def StreamingMethod(self, request_iterator, context):

341

try:

342

for request in request_iterator:

343

# Check if client cancelled

344

if not context.is_active():

345

break

346

347

# Process and yield response

348

response = self.process_item(request)

349

yield my_service_pb2.MyResponse(data=response)

350

351

except Exception as e:

352

context.abort(grpc.StatusCode.INTERNAL, f'Processing error: {str(e)}')

353

```

354

355

### Handler Creation

356

357

Creates RPC method handlers for different RPC patterns with serialization support.

358

359

```python { .api }

360

def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:

361

"""

362

Creates an RpcMethodHandler for a unary-unary RPC method.

363

364

Parameters:

365

- behavior: Implementation accepting one request and returning one response

366

- request_deserializer: Optional deserializer for request deserialization

367

- response_serializer: Optional serializer for response serialization

368

369

Returns:

370

RpcMethodHandler: Handler object for use with grpc.Server

371

"""

372

373

def unary_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:

374

"""Creates an RpcMethodHandler for a unary-stream RPC method."""

375

376

def stream_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:

377

"""Creates an RpcMethodHandler for a stream-unary RPC method."""

378

379

def stream_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:

380

"""Creates an RpcMethodHandler for a stream-stream RPC method."""

381

382

def method_handlers_generic_handler(service: str, method_handlers) -> GenericRpcHandler:

383

"""

384

Creates a GenericRpcHandler from RpcMethodHandlers.

385

386

Parameters:

387

- service: The name of the service implemented by the method handlers

388

- method_handlers: Dictionary mapping method names to RpcMethodHandlers

389

390

Returns:

391

GenericRpcHandler: Handler for adding to grpc.Server

392

"""

393

```

394

395

**Usage Example:**

396

397

```python

398

def my_unary_unary(request, context):

399

return my_service_pb2.MyResponse(message=f"Echo: {request.message}")

400

401

def my_unary_stream(request, context):

402

for i in range(request.count):

403

yield my_service_pb2.MyResponse(message=f"Item {i}")

404

405

# Create method handlers

406

handlers = {

407

'UnaryMethod': grpc.unary_unary_rpc_method_handler(

408

my_unary_unary,

409

request_deserializer=my_service_pb2.MyRequest.FromString,

410

response_serializer=my_service_pb2.MyResponse.SerializeToString,

411

),

412

'StreamMethod': grpc.unary_stream_rpc_method_handler(

413

my_unary_stream,

414

request_deserializer=my_service_pb2.MyRequest.FromString,

415

response_serializer=my_service_pb2.MyResponse.SerializeToString,

416

),

417

}

418

419

# Create generic handler

420

generic_handler = grpc.method_handlers_generic_handler('MyService', handlers)

421

422

# Add to server

423

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

424

server.add_generic_rpc_handlers([generic_handler])

425

```

426

427

## Types

428

429

```python { .api }

430

class RpcMethodHandler(abc.ABC):

431

"""

432

An implementation of a single RPC method.

433

434

Attributes:

435

- request_streaming: Whether RPC supports multiple request messages

436

- response_streaming: Whether RPC supports multiple response messages

437

- request_deserializer: Callable for request deserialization

438

- response_serializer: Callable for response serialization

439

- unary_unary: Business logic for unary-unary pattern

440

- unary_stream: Business logic for unary-stream pattern

441

- stream_unary: Business logic for stream-unary pattern

442

- stream_stream: Business logic for stream-stream pattern

443

"""

444

445

class HandlerCallDetails(abc.ABC):

446

"""

447

Describes an RPC that has just arrived for service.

448

449

Attributes:

450

- method: The method name of the RPC

451

- invocation_metadata: The metadata sent by the client

452

"""

453

454

class GenericRpcHandler(abc.ABC):

455

"""An implementation of arbitrarily many RPC methods."""

456

457

def service(self, handler_call_details: HandlerCallDetails):

458

"""

459

Returns the handler for servicing the RPC.

460

461

Parameters:

462

- handler_call_details: HandlerCallDetails describing the RPC

463

464

Returns:

465

RpcMethodHandler or None: Handler if this implementation services the RPC

466

"""

467

468

class ServiceRpcHandler(GenericRpcHandler):

469

"""

470

An implementation of RPC methods belonging to a service.

471

Handles RPCs with structured names: '/Service.Name/Service.Method'

472

"""

473

474

def service_name(self) -> str:

475

"""Returns this service's name."""

476

```