0
# Standalone Clients
1
2
Client libraries for interacting with nameko services from non-nameko applications, supporting both RPC calls and event publishing with connection management and error handling.
3
4
## Capabilities
5
6
### Service RPC Proxy
7
8
Client for making RPC calls to specific nameko services from standalone applications.
9
10
```python { .api }
11
class ServiceRpcProxy:
12
"""
13
Standalone RPC client for calling a specific nameko service.
14
15
Parameters:
16
- service_name: Name of the target service
17
- config: Configuration dictionary with AMQP connection details
18
- context_data: Optional context data to pass with all calls
19
- timeout: Request timeout in seconds
20
"""
21
22
def __init__(self, service_name, config, context_data=None, timeout=None): ...
23
24
def __enter__(self): ...
25
26
def __exit__(self, exc_type, exc_val, exc_tb): ...
27
```
28
29
**Usage Example:**
30
31
```python
32
from nameko.standalone.rpc import ServiceRpcProxy
33
34
# Configuration for AMQP connection
35
config = {
36
'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
37
}
38
39
# Using context manager (recommended)
40
with ServiceRpcProxy('user_service', config) as user_rpc:
41
# Call service methods
42
user = user_rpc.get_user(user_id=123)
43
print(f"User: {user['name']} ({user['email']})")
44
45
# Create new user
46
new_user = user_rpc.create_user({
47
'name': 'John Doe',
48
'email': 'john@example.com'
49
})
50
print(f"Created user with ID: {new_user['user_id']}")
51
52
# Manual connection management
53
user_rpc = ServiceRpcProxy('user_service', config)
54
try:
55
user = user_rpc.get_user(user_id=456)
56
print(user)
57
finally:
58
user_rpc.stop() # Always close connection
59
```
60
61
### Cluster RPC Proxy
62
63
Client that can call multiple services with service discovery and connection pooling.
64
65
```python { .api }
66
class ClusterRpcProxy:
67
"""
68
Standalone RPC client for calling multiple nameko services.
69
70
Parameters:
71
- config: Configuration dictionary with AMQP connection details
72
- context_data: Optional context data to pass with all calls
73
- timeout: Request timeout in seconds
74
"""
75
76
def __init__(self, config, context_data=None, timeout=None): ...
77
78
def __enter__(self): ...
79
80
def __exit__(self, exc_type, exc_val, exc_tb): ...
81
82
def __getattr__(self, service_name): ...
83
```
84
85
**Usage Example:**
86
87
```python
88
from nameko.standalone.rpc import ClusterRpcProxy
89
90
config = {
91
'AMQP_URI': 'amqp://guest:guest@localhost:5672//',
92
'RPC_TIMEOUT': 30 # Default timeout for all calls
93
}
94
95
# Using context manager for multiple services
96
with ClusterRpcProxy(config) as cluster_rpc:
97
# Call different services through dynamic attributes
98
user = cluster_rpc.user_service.get_user(123)
99
order = cluster_rpc.order_service.create_order({
100
'user_id': user['user_id'],
101
'items': [{'product_id': 1, 'quantity': 2}]
102
})
103
104
# Send notification about the order
105
cluster_rpc.notification_service.send_notification({
106
'user_id': user['user_id'],
107
'message': f'Order {order["order_id"]} created successfully'
108
})
109
110
# With context data for request tracing
111
context_data = {
112
'correlation_id': 'web-request-456',
113
'user_id': 123
114
}
115
116
with ClusterRpcProxy(config, context_data=context_data) as cluster_rpc:
117
# All calls will include the context data
118
result = cluster_rpc.audit_service.log_action('user_login', {
119
'timestamp': time.time()
120
})
121
```
122
123
### Event Dispatcher
124
125
Function for publishing events to nameko services from standalone applications.
126
127
```python { .api }
128
def event_dispatcher(config):
129
"""
130
Create an event dispatcher for publishing events.
131
132
Parameters:
133
- config: Configuration dictionary with AMQP connection details
134
135
Returns:
136
Event dispatcher function that can publish events
137
"""
138
```
139
140
**Usage Example:**
141
142
```python
143
from nameko.standalone.events import event_dispatcher
144
145
config = {
146
'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
147
}
148
149
# Create event dispatcher
150
dispatch_event = event_dispatcher(config)
151
152
# Publish events to services
153
dispatch_event('external_system', 'data_imported', {
154
'file_path': '/data/import/users.csv',
155
'record_count': 1000,
156
'import_id': 'import-123',
157
'timestamp': time.time()
158
})
159
160
dispatch_event('payment_gateway', 'payment_received', {
161
'order_id': 'order-456',
162
'amount': 99.99,
163
'currency': 'USD',
164
'payment_method': 'credit_card',
165
'transaction_id': 'txn-789'
166
})
167
168
# The dispatcher automatically handles connection cleanup
169
```
170
171
### Event Exchange Management
172
173
Function for getting event exchanges for manual event publishing with more control.
174
175
```python { .api }
176
def get_event_exchange(config):
177
"""
178
Get event exchange for manual event publishing.
179
180
Parameters:
181
- config: Configuration dictionary with AMQP connection details
182
183
Returns:
184
Exchange object for publishing events with full control
185
"""
186
```
187
188
**Usage Example:**
189
190
```python
191
from nameko.standalone.events import get_event_exchange
192
import json
193
194
config = {
195
'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
196
}
197
198
# Get exchange for manual publishing
199
exchange = get_event_exchange(config)
200
201
try:
202
# Manual event publishing with full control
203
event_data = {
204
'user_id': 123,
205
'action': 'profile_updated',
206
'changes': ['email', 'phone'],
207
'timestamp': time.time()
208
}
209
210
# Publish with specific routing key and headers
211
exchange.publish(
212
message=json.dumps(event_data),
213
routing_key='user_service.profile_updated',
214
headers={
215
'source_service': 'web_app',
216
'event_type': 'profile_updated',
217
'correlation_id': 'web-session-789'
218
}
219
)
220
221
finally:
222
exchange.close()
223
```
224
225
### Web Application Integration
226
227
Common patterns for integrating nameko services with web frameworks.
228
229
**Flask Integration Example:**
230
231
```python
232
from flask import Flask, request, jsonify
233
from nameko.standalone.rpc import ClusterRpcProxy
234
235
app = Flask(__name__)
236
237
# Nameko configuration
238
NAMEKO_CONFIG = {
239
'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
240
}
241
242
@app.route('/api/users/<int:user_id>', methods=['GET'])
243
def get_user(user_id):
244
"""Get user via nameko service"""
245
with ClusterRpcProxy(NAMEKO_CONFIG) as rpc:
246
try:
247
user = rpc.user_service.get_user(user_id)
248
return jsonify(user)
249
except Exception as e:
250
return jsonify({'error': str(e)}), 404
251
252
@app.route('/api/users', methods=['POST'])
253
def create_user():
254
"""Create user via nameko service"""
255
user_data = request.get_json()
256
257
with ClusterRpcProxy(NAMEKO_CONFIG) as rpc:
258
try:
259
user = rpc.user_service.create_user(user_data)
260
return jsonify(user), 201
261
except Exception as e:
262
return jsonify({'error': str(e)}), 400
263
264
@app.route('/api/orders', methods=['POST'])
265
def create_order():
266
"""Create order and send notification"""
267
order_data = request.get_json()
268
269
# Add correlation ID for tracing
270
context_data = {
271
'correlation_id': f'web-{request.headers.get("X-Request-ID", "unknown")}',
272
'source': 'web_api'
273
}
274
275
with ClusterRpcProxy(NAMEKO_CONFIG, context_data=context_data) as rpc:
276
try:
277
# Create order
278
order = rpc.order_service.create_order(order_data)
279
280
# Send notification
281
rpc.notification_service.send_order_confirmation({
282
'order_id': order['order_id'],
283
'user_id': order_data['user_id']
284
})
285
286
return jsonify(order), 201
287
except Exception as e:
288
return jsonify({'error': str(e)}), 400
289
290
if __name__ == '__main__':
291
app.run(debug=True)
292
```
293
294
**Django Integration Example:**
295
296
```python
297
# django_app/services.py
298
from django.conf import settings
299
from nameko.standalone.rpc import ClusterRpcProxy
300
from nameko.standalone.events import event_dispatcher
301
302
class NamekoService:
303
"""Django service layer for nameko integration"""
304
305
def __init__(self):
306
self.config = {
307
'AMQP_URI': settings.NAMEKO_AMQP_URI
308
}
309
self.dispatch_event = event_dispatcher(self.config)
310
311
def get_user_data(self, user_id):
312
"""Get enriched user data from nameko services"""
313
with ClusterRpcProxy(self.config) as rpc:
314
user = rpc.user_service.get_user(user_id)
315
preferences = rpc.preference_service.get_user_preferences(user_id)
316
317
return {
318
**user,
319
'preferences': preferences
320
}
321
322
def notify_user_action(self, user_id, action, data):
323
"""Notify nameko services of user actions"""
324
self.dispatch_event('django_app', f'user_{action}', {
325
'user_id': user_id,
326
'action': action,
327
'data': data,
328
'timestamp': time.time()
329
})
330
331
# django_app/views.py
332
from django.http import JsonResponse
333
from django.views import View
334
from .services import NamekoService
335
336
class UserAPIView(View):
337
def __init__(self):
338
super().__init__()
339
self.nameko = NamekoService()
340
341
def get(self, request, user_id):
342
try:
343
user_data = self.nameko.get_user_data(user_id)
344
return JsonResponse(user_data)
345
except Exception as e:
346
return JsonResponse({'error': str(e)}, status=400)
347
348
def post(self, request, user_id):
349
# Handle user action
350
action_data = json.loads(request.body)
351
352
# Notify nameko services
353
self.nameko.notify_user_action(
354
user_id,
355
action_data['action'],
356
action_data.get('data', {})
357
)
358
359
return JsonResponse({'status': 'success'})
360
```
361
362
### Error Handling and Retries
363
364
Robust error handling patterns for standalone clients.
365
366
```python
367
from nameko.standalone.rpc import ServiceRpcProxy
368
from nameko.exceptions import RemoteError, ServiceNotFound
369
import time
370
import logging
371
372
class ResilientNamekoClient:
373
"""Wrapper for resilient nameko service calls"""
374
375
def __init__(self, config, max_retries=3, retry_delay=1):
376
self.config = config
377
self.max_retries = max_retries
378
self.retry_delay = retry_delay
379
self.logger = logging.getLogger(__name__)
380
381
def call_service(self, service_name, method_name, *args, **kwargs):
382
"""Call service method with automatic retries"""
383
384
for attempt in range(self.max_retries + 1):
385
try:
386
with ServiceRpcProxy(service_name, self.config) as rpc:
387
method = getattr(rpc, method_name)
388
return method(*args, **kwargs)
389
390
except ServiceNotFound:
391
self.logger.error(f"Service {service_name} not found")
392
raise # Don't retry for service not found
393
394
except RemoteError as e:
395
self.logger.warning(f"Remote error on attempt {attempt + 1}: {e}")
396
if attempt == self.max_retries:
397
raise
398
time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff
399
400
except Exception as e:
401
self.logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")
402
if attempt == self.max_retries:
403
raise
404
time.sleep(self.retry_delay)
405
406
# Usage
407
client = ResilientNamekoClient(config, max_retries=3)
408
409
try:
410
user = client.call_service('user_service', 'get_user', user_id=123)
411
print(user)
412
except Exception as e:
413
print(f"Failed to get user after retries: {e}")
414
```
415
416
### Connection Pooling
417
418
Advanced connection management for high-throughput applications.
419
420
```python
421
from nameko.standalone.rpc import ClusterRpcProxy
422
from contextlib import contextmanager
423
import threading
424
import queue
425
426
class NamekoConnectionPool:
427
"""Connection pool for nameko RPC clients"""
428
429
def __init__(self, config, pool_size=10):
430
self.config = config
431
self.pool_size = pool_size
432
self.pool = queue.Queue(maxsize=pool_size)
433
self.lock = threading.Lock()
434
435
# Pre-populate pool
436
for _ in range(pool_size):
437
proxy = ClusterRpcProxy(config)
438
proxy.start() # Initialize connection
439
self.pool.put(proxy)
440
441
@contextmanager
442
def get_proxy(self):
443
"""Get proxy from pool with automatic return"""
444
proxy = self.pool.get()
445
try:
446
yield proxy
447
finally:
448
self.pool.put(proxy)
449
450
def close_all(self):
451
"""Close all connections in pool"""
452
while not self.pool.empty():
453
proxy = self.pool.get()
454
proxy.stop()
455
456
# Usage in high-throughput application
457
pool = NamekoConnectionPool(config, pool_size=20)
458
459
def process_request(request_data):
460
with pool.get_proxy() as rpc:
461
# Process request using pooled connection
462
result = rpc.processor_service.process_data(request_data)
463
return result
464
465
# Cleanup on application shutdown
466
import atexit
467
atexit.register(pool.close_all)
468
```