0
# Protocol Buffers Integration
1
2
Runtime loading and compilation of Protocol Buffer definitions from .proto files, enabling dynamic service discovery, client generation without pre-compilation, and flexible protobuf-based service development with comprehensive type support.
3
4
## Capabilities
5
6
### Runtime Proto Loading
7
8
Load and compile Protocol Buffer definitions at runtime without requiring pre-compilation, enabling dynamic service discovery and flexible development workflows.
9
10
```python { .api }
11
def protos(protobuf_path: str):
12
"""
13
Loads protobuf definitions from .proto files.
14
15
Parameters:
16
- protobuf_path: Path to .proto files or directory containing .proto files
17
18
Returns:
19
Module-like object containing protobuf message classes and enums
20
"""
21
22
def services(protobuf_path: str):
23
"""
24
Loads service definitions from .proto files.
25
26
Parameters:
27
- protobuf_path: Path to .proto files or directory containing .proto files
28
29
Returns:
30
Module-like object containing service stub classes and servicer base classes
31
"""
32
33
def protos_and_services(protobuf_path: str):
34
"""
35
Loads both protobuf and service definitions from .proto files.
36
37
Parameters:
38
- protobuf_path: Path to .proto files or directory containing .proto files
39
40
Returns:
41
Tuple of (protos_module, services_module)
42
"""
43
```
44
45
**Usage Examples:**
46
47
```python
48
import grpc
49
50
# Load protobuf definitions from a single file
51
protos = grpc.protos("my_service.proto")
52
53
# Access message classes
54
request = protos.MyRequest(message="Hello", count=5)
55
print(f"Request: {request}")
56
57
# Load service definitions
58
services = grpc.services("my_service.proto")
59
60
# Create client stub
61
channel = grpc.insecure_channel('localhost:50051')
62
stub = services.MyServiceStub(channel)
63
64
# Make RPC call
65
response = stub.UnaryMethod(request)
66
print(f"Response: {response.reply}")
67
68
# Load both protos and services together
69
protos, services = grpc.protos_and_services("my_service.proto")
70
71
# Use both message classes and service stubs
72
request = protos.MyRequest(message="Hello")
73
stub = services.MyServiceStub(channel)
74
response = stub.UnaryMethod(request)
75
76
# Load from directory containing multiple .proto files
77
protos = grpc.protos("proto_directory/")
78
services = grpc.services("proto_directory/")
79
80
# Access types from different proto files
81
user_request = protos.user_pb2.GetUserRequest(user_id="123")
82
order_request = protos.order_pb2.CreateOrderRequest(user_id="123", items=[])
83
```
84
85
### Dynamic Service Implementation
86
87
Implement gRPC services dynamically using runtime-loaded protobuf definitions without requiring pre-generated code.
88
89
**Usage Examples:**
90
91
```python
92
# Load service definitions
93
protos, services = grpc.protos_and_services("my_service.proto")
94
95
# Implement servicer using runtime-loaded definitions
96
class MyServiceServicer(services.MyServiceServicer):
97
def UnaryMethod(self, request, context):
98
# Access request fields dynamically
99
message = request.message
100
count = request.count
101
102
# Create response using runtime-loaded message class
103
response = protos.MyResponse()
104
response.reply = f"Processed: {message} (count: {count})"
105
response.timestamp = int(time.time())
106
107
return response
108
109
def StreamingMethod(self, request, context):
110
for i in range(request.count):
111
response = protos.MyStreamResponse()
112
response.index = i
113
response.data = f"Item {i} for {request.message}"
114
yield response
115
116
def BidirectionalMethod(self, request_iterator, context):
117
for request in request_iterator:
118
# Process each request
119
response = protos.MyBidirectionalResponse()
120
response.echo = f"Echo: {request.data}"
121
response.processed_at = time.time()
122
yield response
123
124
# Set up server with dynamic servicer
125
def serve():
126
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
127
128
# Add servicer to server using runtime-loaded service definition
129
services.add_MyServiceServicer_to_server(MyServiceServicer(), server)
130
131
server.add_insecure_port('[::]:50051')
132
server.start()
133
print("Server started with dynamically loaded service")
134
server.wait_for_termination()
135
136
if __name__ == '__main__':
137
serve()
138
```
139
140
### Multi-File Proto Projects
141
142
Handle complex protobuf projects with multiple .proto files, imports, and dependencies.
143
144
**Example Proto Structure:**
145
```
146
protos/
147
├── common/
148
│ ├── types.proto
149
│ └── errors.proto
150
├── user/
151
│ └── user_service.proto
152
└── order/
153
└── order_service.proto
154
```
155
156
**Usage Examples:**
157
158
```python
159
# Load entire proto directory structure
160
protos, services = grpc.protos_and_services("protos/")
161
162
# Access types from different proto files
163
# Assuming types.proto defines common types
164
common_timestamp = protos.common.types_pb2.Timestamp()
165
common_timestamp.seconds = int(time.time())
166
167
# User service types and stubs
168
user_request = protos.user.user_service_pb2.GetUserRequest(user_id="123")
169
user_stub = services.user.user_service_pb2_grpc.UserServiceStub(channel)
170
171
# Order service types and stubs
172
order_request = protos.order.order_service_pb2.CreateOrderRequest(
173
user_id="123",
174
timestamp=common_timestamp
175
)
176
order_stub = services.order.order_service_pb2_grpc.OrderServiceStub(channel)
177
178
# Make calls to different services
179
user = user_stub.GetUser(user_request)
180
order = order_stub.CreateOrder(order_request)
181
```
182
183
### Dynamic Type Inspection
184
185
Inspect and work with protobuf message types dynamically at runtime.
186
187
**Usage Examples:**
188
189
```python
190
# Load protos
191
protos = grpc.protos("my_service.proto")
192
193
# Get message class
194
MyRequest = protos.MyRequest
195
196
# Inspect message fields
197
print("MyRequest fields:")
198
for field in MyRequest.DESCRIPTOR.fields:
199
print(f" {field.name}: {field.type}")
200
201
# Create message with dynamic field access
202
request = MyRequest()
203
204
# Set fields dynamically
205
for field in MyRequest.DESCRIPTOR.fields:
206
if field.name == "message":
207
setattr(request, field.name, "Dynamic message")
208
elif field.name == "count":
209
setattr(request, field.name, 42)
210
211
print(f"Dynamic request: {request}")
212
213
# Check if field exists
214
if hasattr(request, 'optional_field'):
215
print(f"Optional field value: {request.optional_field}")
216
217
# Handle repeated fields
218
if hasattr(request, 'items'):
219
request.items.extend(["item1", "item2", "item3"])
220
221
# Handle nested messages
222
if hasattr(request, 'metadata'):
223
request.metadata.created_by = "dynamic_client"
224
request.metadata.created_at = int(time.time())
225
```
226
227
### Protobuf Serialization and Deserialization
228
229
Work with protobuf serialization at runtime for custom transport or storage scenarios.
230
231
**Usage Examples:**
232
233
```python
234
# Load protos
235
protos = grpc.protos("my_service.proto")
236
237
# Create message
238
request = protos.MyRequest(message="Hello", count=5)
239
240
# Serialize to bytes
241
serialized_data = request.SerializeToString()
242
print(f"Serialized size: {len(serialized_data)} bytes")
243
244
# Deserialize from bytes
245
new_request = protos.MyRequest()
246
new_request.ParseFromString(serialized_data)
247
print(f"Deserialized: {new_request}")
248
249
# JSON serialization (requires protobuf JSON support)
250
from google.protobuf import json_format
251
252
# Convert to JSON
253
json_data = json_format.MessageToJson(request)
254
print(f"JSON: {json_data}")
255
256
# Convert from JSON
257
json_request = protos.MyRequest()
258
json_format.Parse(json_data, json_request)
259
print(f"From JSON: {json_request}")
260
261
# Custom serialization for storage
262
def store_message(message, filename):
263
with open(filename, 'wb') as f:
264
f.write(message.SerializeToString())
265
266
def load_message(message_class, filename):
267
message = message_class()
268
with open(filename, 'rb') as f:
269
message.ParseFromString(f.read())
270
return message
271
272
# Store and load messages
273
store_message(request, "request.bin")
274
loaded_request = load_message(protos.MyRequest, "request.bin")
275
```
276
277
### Error Handling with Dynamic Protos
278
279
Handle errors and edge cases when working with runtime-loaded protobuf definitions.
280
281
**Usage Examples:**
282
283
```python
284
import os
285
import sys
286
287
def load_protos_safely(proto_path):
288
"""Safely load protobuf definitions with error handling."""
289
try:
290
if not os.path.exists(proto_path):
291
raise FileNotFoundError(f"Proto path not found: {proto_path}")
292
293
# Load protos and services
294
protos, services = grpc.protos_and_services(proto_path)
295
return protos, services
296
297
except Exception as e:
298
print(f"Error loading protos from {proto_path}: {e}")
299
sys.exit(1)
300
301
def validate_proto_structure(protos, expected_messages):
302
"""Validate that expected message types are available."""
303
missing_messages = []
304
305
for message_name in expected_messages:
306
if not hasattr(protos, message_name):
307
missing_messages.append(message_name)
308
309
if missing_messages:
310
raise ValueError(f"Missing expected messages: {missing_messages}")
311
312
# Safe proto loading
313
try:
314
protos, services = load_protos_safely("my_service.proto")
315
316
# Validate expected structure
317
expected_messages = ["MyRequest", "MyResponse", "StreamRequest"]
318
validate_proto_structure(protos, expected_messages)
319
320
print("Protos loaded and validated successfully")
321
322
except Exception as e:
323
print(f"Failed to load or validate protos: {e}")
324
sys.exit(1)
325
326
# Handle missing fields gracefully
327
def create_request_safely(protos, **kwargs):
328
"""Create request message with safe field setting."""
329
request = protos.MyRequest()
330
331
# Get available fields
332
available_fields = {field.name for field in request.DESCRIPTOR.fields}
333
334
# Set only available fields
335
for field_name, value in kwargs.items():
336
if field_name in available_fields:
337
setattr(request, field_name, value)
338
else:
339
print(f"Warning: Field '{field_name}' not available in MyRequest")
340
341
return request
342
343
# Usage
344
request = create_request_safely(
345
protos,
346
message="Hello",
347
count=5,
348
unknown_field="ignored" # Will be ignored with warning
349
)
350
```
351
352
### Advanced Proto Features
353
354
Work with advanced protobuf features like oneof fields, maps, and any types.
355
356
**Usage Examples:**
357
358
```python
359
# Load protos with advanced features
360
protos = grpc.protos("advanced_service.proto")
361
362
# Handle oneof fields
363
request = protos.AdvancedRequest()
364
365
# Set oneof field (only one can be set at a time)
366
request.text_data = "Hello, world!"
367
# request.binary_data = b"Binary data" # Would clear text_data
368
369
# Check which oneof field is set
370
which_data = request.WhichOneof('data')
371
print(f"Active data field: {which_data}")
372
373
# Handle map fields
374
if hasattr(request, 'metadata'):
375
request.metadata['key1'] = 'value1'
376
request.metadata['key2'] = 'value2'
377
378
# Iterate over map
379
for key, value in request.metadata.items():
380
print(f"Metadata: {key} = {value}")
381
382
# Handle repeated fields
383
if hasattr(request, 'tags'):
384
request.tags.extend(['tag1', 'tag2', 'tag3'])
385
386
# Modify repeated field
387
request.tags.append('tag4')
388
del request.tags[0] # Remove first tag
389
390
# Handle nested messages
391
if hasattr(request, 'config'):
392
request.config.timeout = 30
393
request.config.retries = 3
394
request.config.debug = True
395
396
print(f"Advanced request: {request}")
397
```
398
399
## Integration Patterns
400
401
### Service Discovery Pattern
402
403
```python
404
def discover_services(proto_directory):
405
"""Discover all available services from proto directory."""
406
protos, services = grpc.protos_and_services(proto_directory)
407
408
discovered_services = {}
409
410
# Inspect services module for available stubs
411
for attr_name in dir(services):
412
if attr_name.endswith('Stub'):
413
service_name = attr_name[:-4] # Remove 'Stub' suffix
414
stub_class = getattr(services, attr_name)
415
servicer_name = f"{service_name}Servicer"
416
417
if hasattr(services, servicer_name):
418
servicer_class = getattr(services, servicer_name)
419
discovered_services[service_name] = {
420
'stub': stub_class,
421
'servicer': servicer_class,
422
'methods': [method for method in dir(servicer_class)
423
if not method.startswith('_')]
424
}
425
426
return discovered_services, protos, services
427
428
# Usage
429
services_info, protos, services = discover_services("protos/")
430
for service_name, info in services_info.items():
431
print(f"Service: {service_name}")
432
print(f" Methods: {info['methods']}")
433
```
434
435
### Configuration-Driven Client
436
437
```python
438
import yaml
439
440
def create_clients_from_config(config_file):
441
"""Create gRPC clients based on configuration file."""
442
with open(config_file, 'r') as f:
443
config = yaml.safe_load(f)
444
445
clients = {}
446
447
for service_config in config['services']:
448
service_name = service_config['name']
449
proto_path = service_config['proto_path']
450
server_address = service_config['address']
451
452
# Load protos for this service
453
_, services = grpc.protos_and_services(proto_path)
454
455
# Create channel
456
if service_config.get('secure', False):
457
credentials = grpc.ssl_channel_credentials()
458
channel = grpc.secure_channel(server_address, credentials)
459
else:
460
channel = grpc.insecure_channel(server_address)
461
462
# Create stub
463
stub_name = f"{service_name}Stub"
464
if hasattr(services, stub_name):
465
stub_class = getattr(services, stub_name)
466
clients[service_name] = stub_class(channel)
467
468
return clients
469
470
# Example config.yaml:
471
# services:
472
# - name: UserService
473
# proto_path: protos/user_service.proto
474
# address: localhost:50051
475
# secure: false
476
# - name: OrderService
477
# proto_path: protos/order_service.proto
478
# address: secure-orders.example.com:443
479
# secure: true
480
481
# Usage
482
clients = create_clients_from_config("config.yaml")
483
user_client = clients['UserService']
484
order_client = clients['OrderService']
485
```
486
487
## Types
488
489
```python { .api }
490
# The protos() and services() functions return module-like objects
491
# that contain the dynamically loaded protobuf classes and service definitions
492
493
# Example of what's available after loading:
494
# protos = grpc.protos("my_service.proto")
495
# - protos.MyRequest (message class)
496
# - protos.MyResponse (message class)
497
# - protos.StatusEnum (enum class)
498
499
# services = grpc.services("my_service.proto")
500
# - services.MyServiceStub (client stub class)
501
# - services.MyServiceServicer (server servicer base class)
502
# - services.add_MyServiceServicer_to_server (function to add servicer to server)
503
504
# The exact contents depend on what's defined in the .proto files
505
```