0
# RPC Communication
1
2
Remote procedure call system that enables synchronous inter-service communication with automatic serialization, load balancing, error handling, and distributed tracing support.
3
4
## Capabilities
5
6
### RPC Entrypoint Decorator
7
8
Decorator that exposes service methods as RPC endpoints, making them callable from other services or standalone clients.
9
10
```python { .api }
11
def rpc(fn=None, expected_exceptions=()):
12
"""
13
Decorator to expose a service method as an RPC endpoint.
14
15
Parameters:
16
- fn: The function to decorate (used internally)
17
- expected_exceptions: Tuple of exception types that should be propagated to callers
18
19
Returns:
20
Decorated method that can be called remotely
21
"""
22
```
23
24
**Usage Example:**
25
26
```python
27
from nameko.rpc import rpc
28
from nameko.exceptions import BadRequest
29
30
class CalculatorService:
31
name = "calculator"
32
33
@rpc
34
def add(self, a, b):
35
return a + b
36
37
@rpc(expected_exceptions=(ValueError, BadRequest))
38
def divide(self, a, b):
39
if b == 0:
40
raise ValueError("Division by zero")
41
return a / b
42
```
43
44
### RPC Proxy
45
46
Dependency provider that enables services to make RPC calls to other services with automatic service discovery and load balancing.
47
48
```python { .api }
49
class RpcProxy:
50
"""
51
Dependency provider for making RPC calls to other services.
52
53
Parameters:
54
- target_service: Name of the target service to call
55
- **options: Additional options for the RPC proxy
56
"""
57
58
def __init__(self, target_service, **options): ...
59
```
60
61
**Usage Example:**
62
63
```python
64
from nameko.rpc import rpc, RpcProxy
65
66
class UserService:
67
name = "user_service"
68
69
# RPC proxy to calculator service
70
calculator_rpc = RpcProxy('calculator')
71
72
@rpc
73
def calculate_user_score(self, user_id, base_score, multiplier):
74
# Call remote service method
75
score = self.calculator_rpc.multiply(base_score, multiplier)
76
return {'user_id': user_id, 'score': score}
77
```
78
79
### Asynchronous RPC Calls
80
81
RPC proxies support asynchronous calls that return immediately with a reply object, allowing for non-blocking parallel service calls.
82
83
```python { .api }
84
class MethodProxy:
85
"""
86
Proxy for individual RPC methods with synchronous and asynchronous call support.
87
"""
88
89
def __call__(self, *args, **kwargs):
90
"""Make synchronous RPC call"""
91
...
92
93
def call_async(self, *args, **kwargs):
94
"""
95
Make asynchronous RPC call.
96
97
Returns:
98
RpcReply object that can be used to retrieve the result later
99
"""
100
...
101
102
class RpcReply:
103
"""
104
Represents a pending or completed asynchronous RPC call.
105
"""
106
107
def result(self):
108
"""
109
Get the result of the asynchronous RPC call.
110
Blocks until the result is available.
111
112
Returns:
113
The result of the RPC call
114
115
Raises:
116
Any exception that occurred during the remote call
117
"""
118
...
119
```
120
121
**Usage Example:**
122
123
```python
124
from nameko.rpc import rpc, RpcProxy
125
126
class OrderService:
127
name = "order_service"
128
129
payment_rpc = RpcProxy('payment_service')
130
inventory_rpc = RpcProxy('inventory_service')
131
email_rpc = RpcProxy('email_service')
132
133
@rpc
134
def process_order_parallel(self, order_data):
135
"""Process order with parallel service calls for better performance"""
136
137
# Start multiple async calls in parallel
138
payment_reply = self.payment_rpc.process_payment.call_async(
139
order_data['payment_info']
140
)
141
inventory_reply = self.inventory_rpc.reserve_items.call_async(
142
order_data['items']
143
)
144
email_reply = self.email_rpc.send_confirmation.call_async(
145
order_data['customer_email'],
146
order_data['order_id']
147
)
148
149
# Collect results (this blocks until all complete)
150
try:
151
payment_result = payment_reply.result()
152
inventory_result = inventory_reply.result()
153
email_result = email_reply.result()
154
155
return {
156
'status': 'success',
157
'payment': payment_result,
158
'inventory': inventory_result,
159
'email_sent': email_result
160
}
161
except Exception as e:
162
# Handle any failures in the parallel calls
163
return {'status': 'failed', 'error': str(e)}
164
165
@rpc
166
def get_order_summary_async(self, order_id):
167
"""Fetch order data from multiple services asynchronously"""
168
169
# Launch parallel requests
170
order_reply = self.order_db.get_order.call_async(order_id)
171
customer_reply = self.customer_rpc.get_customer.call_async(order_id)
172
shipping_reply = self.shipping_rpc.get_tracking.call_async(order_id)
173
174
# Wait for all results
175
order = order_reply.result()
176
customer = customer_reply.result()
177
shipping = shipping_reply.result()
178
179
return {
180
'order': order,
181
'customer': customer,
182
'shipping': shipping
183
}
184
```
185
186
### Error Handling
187
188
RPC calls automatically handle serialization and propagation of exceptions between services.
189
190
**Built-in Exception Types:**
191
192
```python { .api }
193
class RemoteError(NamekoException):
194
"""
195
Wraps exceptions that occurred in remote service calls.
196
197
Attributes:
198
- exc_type: Original exception type name
199
- exc_args: Original exception arguments
200
- exc_path: Service path where exception occurred
201
"""
202
203
class ServiceNotFound(NamekoException):
204
"""Raised when the target service cannot be found"""
205
206
class MethodNotFound(NamekoException):
207
"""Raised when the target method is not found on the service"""
208
```
209
210
**Exception Handling Example:**
211
212
```python
213
from nameko.rpc import rpc, RpcProxy
214
from nameko.exceptions import RemoteError, ServiceNotFound
215
216
class OrderService:
217
name = "order_service"
218
219
payment_rpc = RpcProxy('payment_service')
220
221
@rpc
222
def process_order(self, order_data):
223
try:
224
# This might raise a remote exception
225
payment_result = self.payment_rpc.process_payment(
226
order_data['payment_info']
227
)
228
return {'status': 'success', 'payment': payment_result}
229
except RemoteError as e:
230
# Handle remote service errors
231
return {'status': 'payment_failed', 'error': str(e)}
232
except ServiceNotFound:
233
# Handle service discovery failures
234
return {'status': 'service_unavailable'}
235
```
236
237
### Context Propagation
238
239
RPC calls automatically propagate context data like correlation IDs, user information, and tracing data across service boundaries.
240
241
**Context Data Access:**
242
243
```python
244
from nameko.contextdata import ContextDataProvider
245
246
class AuditService:
247
name = "audit_service"
248
249
context_data = ContextDataProvider()
250
251
@rpc
252
def log_action(self, action, data):
253
# Access context passed from calling service
254
user_id = self.context_data.get('user_id')
255
correlation_id = self.context_data.get('correlation_id')
256
257
return {
258
'action': action,
259
'user_id': user_id,
260
'correlation_id': correlation_id,
261
'timestamp': time.time()
262
}
263
```
264
265
### Performance Considerations
266
267
- **Connection Pooling**: RPC proxies automatically manage connection pools
268
- **Load Balancing**: Requests are distributed across available service instances
269
- **Timeouts**: Configure request timeouts to prevent hanging calls
270
- **Retries**: Built-in retry mechanisms for transient failures
271
272
**Configuration Example:**
273
274
```python
275
# config.yaml
276
AMQP_URI: 'amqp://guest:guest@localhost:5672//'
277
RPC_TIMEOUT: 30 # seconds
278
RPC_RETRY_POLICY:
279
max_retries: 3
280
interval_start: 0.1
281
interval_step: 0.2
282
```