0
# Server Implementation
1
2
Server implementation provides comprehensive server creation and lifecycle management with support for multiple service registration, port configuration, thread pool management, graceful shutdown, and both synchronous and asynchronous execution models.
3
4
## Capabilities
5
6
### Server Creation
7
8
Creates gRPC servers with configurable thread pools, interceptors, and advanced options for production deployments.
9
10
```python { .api }
11
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False) -> Server:
12
"""
13
Creates a Server with which RPCs can be serviced.
14
15
Parameters:
16
- thread_pool: A futures.ThreadPoolExecutor for executing RPC handlers
17
- handlers: Optional list of GenericRpcHandlers for initial service registration
18
- interceptors: Optional list of ServerInterceptors for middleware
19
- options: Optional list of key-value pairs for server configuration
20
- maximum_concurrent_rpcs: Maximum concurrent RPCs before returning RESOURCE_EXHAUSTED
21
- compression: Default compression algorithm for the server lifetime
22
- xds: If True, retrieves server configuration via xDS (EXPERIMENTAL)
23
24
Returns:
25
Server: A Server object ready for service registration and startup
26
"""
27
```
28
29
**Usage Examples:**
30
31
```python
32
from concurrent import futures
33
34
# Basic server with thread pool
35
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
36
37
# Server with interceptors
38
class LoggingInterceptor(grpc.ServerInterceptor):
39
def intercept_service(self, continuation, handler_call_details):
40
print(f"Handling {handler_call_details.method}")
41
return continuation(handler_call_details)
42
43
server = grpc.server(
44
futures.ThreadPoolExecutor(max_workers=10),
45
interceptors=[LoggingInterceptor()]
46
)
47
48
# Server with advanced options
49
options = [
50
('grpc.keepalive_time_ms', 30000),
51
('grpc.max_concurrent_streams', 100),
52
]
53
server = grpc.server(
54
futures.ThreadPoolExecutor(max_workers=20),
55
options=options,
56
maximum_concurrent_rpcs=1000,
57
compression=grpc.compression.Gzip
58
)
59
```
60
61
### Server Interface
62
63
Core server interface providing service registration, port management, and lifecycle control.
64
65
```python { .api }
66
class Server(abc.ABC):
67
"""Services RPCs with comprehensive lifecycle management."""
68
69
def add_generic_rpc_handlers(self, generic_rpc_handlers):
70
"""
71
Registers GenericRpcHandlers with this Server.
72
Must be called before server is started.
73
74
Parameters:
75
- generic_rpc_handlers: Iterable of GenericRpcHandlers
76
"""
77
78
def add_registered_method_handlers(self, service_name: str, method_handlers):
79
"""
80
Registers method handlers for a specific service.
81
Registered handlers take precedence over generic handlers.
82
83
Parameters:
84
- service_name: The service name
85
- method_handlers: Dictionary mapping method names to RpcMethodHandlers
86
"""
87
88
def add_insecure_port(self, address: str) -> int:
89
"""
90
Opens an insecure port for accepting RPCs.
91
Must be called before starting the server.
92
93
Parameters:
94
- address: Address to bind (e.g., '[::]:50051', 'localhost:0')
95
96
Returns:
97
int: The actual port number where server will accept requests
98
"""
99
100
def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int:
101
"""
102
Opens a secure port for accepting RPCs.
103
Must be called before starting the server.
104
105
Parameters:
106
- address: Address to bind
107
- server_credentials: ServerCredentials object for SSL/TLS
108
109
Returns:
110
int: The actual port number where server will accept requests
111
"""
112
113
def start(self):
114
"""
115
Starts this Server.
116
May only be called once (not idempotent).
117
"""
118
119
def stop(self, grace) -> threading.Event:
120
"""
121
Stops this Server with optional grace period.
122
123
Parameters:
124
- grace: Duration in seconds to wait for active RPCs, or None for immediate stop
125
126
Returns:
127
threading.Event: Event that will be set when server completely stops
128
"""
129
130
def wait_for_termination(self, timeout=None) -> bool:
131
"""
132
Block current thread until the server stops.
133
134
Parameters:
135
- timeout: Optional timeout in seconds
136
137
Returns:
138
bool: True if server stopped normally, False if timeout occurred
139
"""
140
```
141
142
**Usage Examples:**
143
144
```python
145
from concurrent import futures
146
import time
147
148
# Complete server lifecycle
149
def run_server():
150
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
151
152
# Register services
153
add_MyServiceServicer_to_server(MyServiceServicer(), server)
154
155
# Add ports
156
insecure_port = server.add_insecure_port('[::]:0')
157
print(f"Server will listen on insecure port: {insecure_port}")
158
159
# Start server
160
server.start()
161
print("Server started")
162
163
try:
164
# Keep server running
165
server.wait_for_termination()
166
except KeyboardInterrupt:
167
print("Shutting down server...")
168
server.stop(grace=5.0).wait()
169
print("Server stopped")
170
171
# Secure server setup
172
def run_secure_server():
173
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
174
175
# Load SSL credentials
176
with open('server.key', 'rb') as f:
177
private_key = f.read()
178
with open('server.crt', 'rb') as f:
179
certificate_chain = f.read()
180
181
credentials = grpc.ssl_server_credentials([(private_key, certificate_chain)])
182
183
# Add secure port
184
secure_port = server.add_secure_port('[::]:50051', credentials)
185
print(f"Secure server listening on port: {secure_port}")
186
187
server.start()
188
server.wait_for_termination()
189
```
190
191
### Service Context
192
193
Provides RPC-specific context and control for server-side method implementations.
194
195
```python { .api }
196
class ServicerContext(RpcContext):
197
"""Context object passed to method implementations."""
198
199
def invocation_metadata(self):
200
"""
201
Accesses the metadata sent by the client.
202
203
Returns:
204
Metadata: The invocation metadata key-value pairs
205
"""
206
207
def peer(self) -> str:
208
"""
209
Identifies the peer that invoked the RPC.
210
211
Returns:
212
str: String identifying the peer (format determined by gRPC runtime)
213
"""
214
215
def peer_identities(self):
216
"""
217
Gets one or more peer identity(s).
218
219
Returns:
220
Iterable of bytes or None: Peer identities if authenticated, None otherwise
221
"""
222
223
def peer_identity_key(self):
224
"""
225
The auth property used to identify the peer.
226
227
Returns:
228
str or None: Auth property name or None if not authenticated
229
"""
230
231
def auth_context(self):
232
"""
233
Gets the auth context for the call.
234
235
Returns:
236
dict: Map of auth properties to iterables of bytes
237
"""
238
239
def send_initial_metadata(self, initial_metadata):
240
"""
241
Sends the initial metadata value to the client.
242
243
Parameters:
244
- initial_metadata: The initial metadata key-value pairs
245
"""
246
247
def set_trailing_metadata(self, trailing_metadata):
248
"""
249
Sets the trailing metadata for the RPC.
250
251
Parameters:
252
- trailing_metadata: The trailing metadata key-value pairs
253
"""
254
255
def abort(self, code: StatusCode, details: str):
256
"""
257
Raises an exception to terminate the RPC with a non-OK status.
258
259
Parameters:
260
- code: A StatusCode object (must not be StatusCode.OK)
261
- details: A UTF-8-encodable string for the client
262
263
Raises:
264
Exception: Always raised to signal RPC abortion
265
"""
266
267
def abort_with_status(self, status):
268
"""
269
Raises an exception to terminate the RPC with a status object.
270
271
Parameters:
272
- status: A grpc.Status object (EXPERIMENTAL)
273
274
Raises:
275
Exception: Always raised to signal RPC abortion
276
"""
277
278
def set_code(self, code: StatusCode):
279
"""
280
Sets the value to be used as status code upon RPC completion.
281
282
Parameters:
283
- code: A StatusCode object to be sent to the client
284
"""
285
286
def set_details(self, details: str):
287
"""
288
Sets the value to be used as detail string upon RPC completion.
289
290
Parameters:
291
- details: A UTF-8-encodable string to be sent to the client
292
"""
293
294
def set_compression(self, compression):
295
"""
296
Set the compression algorithm to be used for the entire call.
297
298
Parameters:
299
- compression: An element of grpc.compression (e.g., grpc.compression.Gzip)
300
"""
301
302
def disable_next_message_compression(self):
303
"""
304
Disables compression for the next response message.
305
Overrides any compression configuration.
306
"""
307
```
308
309
**Usage Examples:**
310
311
```python
312
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
313
def UnaryMethod(self, request, context):
314
# Access client metadata
315
metadata = dict(context.invocation_metadata())
316
user_agent = metadata.get('user-agent', 'unknown')
317
318
# Check authentication
319
if not self.is_authenticated(context):
320
context.abort(grpc.StatusCode.UNAUTHENTICATED, 'Authentication required')
321
322
# Send initial metadata
323
context.send_initial_metadata([('server-version', '1.0')])
324
325
# Process request
326
try:
327
result = self.process_request(request)
328
329
# Set trailing metadata
330
context.set_trailing_metadata([('processed-count', str(len(result)))])
331
332
return my_service_pb2.MyResponse(data=result)
333
except ValueError as e:
334
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
335
context.set_details(f'Invalid request: {str(e)}')
336
raise
337
except Exception as e:
338
context.abort(grpc.StatusCode.INTERNAL, 'Internal server error')
339
340
def StreamingMethod(self, request_iterator, context):
341
try:
342
for request in request_iterator:
343
# Check if client cancelled
344
if not context.is_active():
345
break
346
347
# Process and yield response
348
response = self.process_item(request)
349
yield my_service_pb2.MyResponse(data=response)
350
351
except Exception as e:
352
context.abort(grpc.StatusCode.INTERNAL, f'Processing error: {str(e)}')
353
```
354
355
### Handler Creation
356
357
Creates RPC method handlers for different RPC patterns with serialization support.
358
359
```python { .api }
360
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:
361
"""
362
Creates an RpcMethodHandler for a unary-unary RPC method.
363
364
Parameters:
365
- behavior: Implementation accepting one request and returning one response
366
- request_deserializer: Optional deserializer for request deserialization
367
- response_serializer: Optional serializer for response serialization
368
369
Returns:
370
RpcMethodHandler: Handler object for use with grpc.Server
371
"""
372
373
def unary_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:
374
"""Creates an RpcMethodHandler for a unary-stream RPC method."""
375
376
def stream_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:
377
"""Creates an RpcMethodHandler for a stream-unary RPC method."""
378
379
def stream_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:
380
"""Creates an RpcMethodHandler for a stream-stream RPC method."""
381
382
def method_handlers_generic_handler(service: str, method_handlers) -> GenericRpcHandler:
383
"""
384
Creates a GenericRpcHandler from RpcMethodHandlers.
385
386
Parameters:
387
- service: The name of the service implemented by the method handlers
388
- method_handlers: Dictionary mapping method names to RpcMethodHandlers
389
390
Returns:
391
GenericRpcHandler: Handler for adding to grpc.Server
392
"""
393
```
394
395
**Usage Example:**
396
397
```python
398
def my_unary_unary(request, context):
399
return my_service_pb2.MyResponse(message=f"Echo: {request.message}")
400
401
def my_unary_stream(request, context):
402
for i in range(request.count):
403
yield my_service_pb2.MyResponse(message=f"Item {i}")
404
405
# Create method handlers
406
handlers = {
407
'UnaryMethod': grpc.unary_unary_rpc_method_handler(
408
my_unary_unary,
409
request_deserializer=my_service_pb2.MyRequest.FromString,
410
response_serializer=my_service_pb2.MyResponse.SerializeToString,
411
),
412
'StreamMethod': grpc.unary_stream_rpc_method_handler(
413
my_unary_stream,
414
request_deserializer=my_service_pb2.MyRequest.FromString,
415
response_serializer=my_service_pb2.MyResponse.SerializeToString,
416
),
417
}
418
419
# Create generic handler
420
generic_handler = grpc.method_handlers_generic_handler('MyService', handlers)
421
422
# Add to server
423
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
424
server.add_generic_rpc_handlers([generic_handler])
425
```
426
427
## Types
428
429
```python { .api }
430
class RpcMethodHandler(abc.ABC):
431
"""
432
An implementation of a single RPC method.
433
434
Attributes:
435
- request_streaming: Whether RPC supports multiple request messages
436
- response_streaming: Whether RPC supports multiple response messages
437
- request_deserializer: Callable for request deserialization
438
- response_serializer: Callable for response serialization
439
- unary_unary: Business logic for unary-unary pattern
440
- unary_stream: Business logic for unary-stream pattern
441
- stream_unary: Business logic for stream-unary pattern
442
- stream_stream: Business logic for stream-stream pattern
443
"""
444
445
class HandlerCallDetails(abc.ABC):
446
"""
447
Describes an RPC that has just arrived for service.
448
449
Attributes:
450
- method: The method name of the RPC
451
- invocation_metadata: The metadata sent by the client
452
"""
453
454
class GenericRpcHandler(abc.ABC):
455
"""An implementation of arbitrarily many RPC methods."""
456
457
def service(self, handler_call_details: HandlerCallDetails):
458
"""
459
Returns the handler for servicing the RPC.
460
461
Parameters:
462
- handler_call_details: HandlerCallDetails describing the RPC
463
464
Returns:
465
RpcMethodHandler or None: Handler if this implementation services the RPC
466
"""
467
468
class ServiceRpcHandler(GenericRpcHandler):
469
"""
470
An implementation of RPC methods belonging to a service.
471
Handles RPCs with structured names: '/Service.Name/Service.Method'
472
"""
473
474
def service_name(self) -> str:
475
"""Returns this service's name."""
476
```