0
# Testing Framework
1
2
Comprehensive testing utilities for unit testing services, mocking dependencies, integration testing with real message brokers, and end-to-end testing patterns.
3
4
## Capabilities
5
6
### Worker Factory
7
8
Creates service worker instances for isolated unit testing without requiring full service containers or message brokers.
9
10
```python { .api }
11
def worker_factory(service_cls, **dependencies):
12
"""
13
Create a service worker instance for testing.
14
15
Parameters:
16
- service_cls: The service class to instantiate
17
- **dependencies: Keyword arguments to override service dependencies
18
19
Returns:
20
Service worker instance with mocked or provided dependencies
21
"""
22
```
23
24
**Usage Example:**
25
26
```python
27
from nameko.testing.services import worker_factory
28
from unittest.mock import Mock
29
30
class UserService:
31
name = "user_service"
32
33
database = DatabaseProvider()
34
cache = CacheProvider()
35
36
@rpc
37
def create_user(self, user_data):
38
user_id = self.database.save_user(user_data)
39
self.cache.set(f'user:{user_id}', user_data)
40
return {'user_id': user_id}
41
42
def test_create_user():
43
# Mock dependencies
44
mock_database = Mock()
45
mock_database.save_user.return_value = 123
46
mock_cache = Mock()
47
48
# Create worker with mocked dependencies
49
worker = worker_factory(UserService, database=mock_database, cache=mock_cache)
50
51
# Test the service method
52
result = worker.create_user({'name': 'John', 'email': 'john@example.com'})
53
54
# Verify behavior
55
assert result['user_id'] == 123
56
mock_database.save_user.assert_called_once_with({'name': 'John', 'email': 'john@example.com'})
57
mock_cache.set.assert_called_once_with('user:123', {'name': 'John', 'email': 'john@example.com'})
58
```
59
60
### Entrypoint Hook
61
62
Provides hooks for testing specific entrypoints (RPC methods, event handlers, HTTP endpoints) in isolation.
63
64
```python { .api }
65
def entrypoint_hook(container, method_name):
66
"""
67
Create a hook for testing a specific entrypoint method.
68
69
Parameters:
70
- container: Service container instance
71
- method_name: Name of the service method to hook
72
73
Returns:
74
Callable that can invoke the hooked method
75
"""
76
```
77
78
**Usage Example:**
79
80
```python
81
from nameko.testing.services import entrypoint_hook
82
from nameko.containers import ServiceContainer
83
84
class EmailService:
85
name = "email_service"
86
87
@event_handler('user_service', 'user_registered')
88
def send_welcome_email(self, payload):
89
email = payload['email']
90
# Send email logic
91
return f"Welcome email sent to {email}"
92
93
def test_welcome_email_handler():
94
# Create service container
95
container = ServiceContainer(EmailService, config={})
96
container.start()
97
98
try:
99
# Hook the event handler method
100
send_welcome_email = entrypoint_hook(container, 'send_welcome_email')
101
102
# Test the event handler directly
103
result = send_welcome_email({'email': 'user@example.com'})
104
105
assert result == "Welcome email sent to user@example.com"
106
finally:
107
container.stop()
108
```
109
110
### Entrypoint Waiter
111
112
Utility for testing asynchronous operations and waiting for entrypoints to complete execution.
113
114
```python { .api }
115
def entrypoint_waiter(container, method_name, timeout=None):
116
"""
117
Wait for an entrypoint method to be called and complete.
118
119
Parameters:
120
- container: Service container instance
121
- method_name: Name of the service method to wait for
122
- timeout: Maximum time to wait in seconds
123
124
Returns:
125
Context manager that waits for method completion
126
"""
127
```
128
129
**Usage Example:**
130
131
```python
132
from nameko.testing.services import entrypoint_waiter
133
import threading
134
135
class AsyncProcessingService:
136
name = "async_service"
137
138
@event_handler('data_service', 'data_received')
139
def process_data_async(self, payload):
140
# Simulate async processing
141
time.sleep(0.1)
142
return f"Processed {payload['data_id']}"
143
144
def test_async_processing():
145
container = ServiceContainer(AsyncProcessingService, config={})
146
container.start()
147
148
try:
149
# Wait for the event handler to complete
150
with entrypoint_waiter(container, 'process_data_async', timeout=5):
151
# Trigger the event handler
152
# (In real test, this would be triggered by actual event)
153
hook = entrypoint_hook(container, 'process_data_async')
154
155
# Run in separate thread to simulate async behavior
156
def trigger_event():
157
hook({'data_id': 'test-123'})
158
159
thread = threading.Thread(target=trigger_event)
160
thread.start()
161
162
# If we reach here, the handler completed successfully
163
assert True
164
finally:
165
container.stop()
166
```
167
168
### Dependency Replacement
169
170
Utility for replacing service dependencies with test doubles, mocks, or alternative implementations.
171
172
```python { .api }
173
def replace_dependencies(container, **dependencies):
174
"""
175
Replace dependencies in a service container for testing.
176
177
Parameters:
178
- container: Service container instance
179
- **dependencies: Keyword arguments mapping dependency names to replacement objects
180
"""
181
```
182
183
**Usage Example:**
184
185
```python
186
from nameko.testing.services import replace_dependencies
187
from unittest.mock import Mock
188
189
class OrderService:
190
name = "order_service"
191
192
payment_service = RpcProxy('payment_service')
193
database = DatabaseProvider()
194
195
@rpc
196
def create_order(self, order_data):
197
# Save order to database
198
order_id = self.database.save_order(order_data)
199
200
# Process payment
201
payment_result = self.payment_service.process_payment(order_data['payment'])
202
203
return {
204
'order_id': order_id,
205
'payment_status': payment_result['status']
206
}
207
208
def test_create_order_with_mocked_dependencies():
209
# Create service container
210
container = ServiceContainer(OrderService, config={})
211
212
# Create mocks
213
mock_payment_service = Mock()
214
mock_payment_service.process_payment.return_value = {'status': 'success'}
215
216
mock_database = Mock()
217
mock_database.save_order.return_value = 'order-123'
218
219
# Replace dependencies
220
replace_dependencies(
221
container,
222
payment_service=mock_payment_service,
223
database=mock_database
224
)
225
226
container.start()
227
228
try:
229
# Get service worker and test
230
worker = container.service
231
result = worker.create_order({
232
'items': [{'id': 1, 'quantity': 2}],
233
'payment': {'method': 'credit_card', 'amount': 100}
234
})
235
236
assert result['order_id'] == 'order-123'
237
assert result['payment_status'] == 'success'
238
239
# Verify mock calls
240
mock_database.save_order.assert_called_once()
241
mock_payment_service.process_payment.assert_called_once()
242
finally:
243
container.stop()
244
```
245
246
### Entrypoint Restriction
247
248
Utility for limiting which entrypoints are active during testing, useful for isolating specific functionality.
249
250
```python { .api }
251
def restrict_entrypoints(container, *entrypoints):
252
"""
253
Restrict active entrypoints in a service container to specified ones.
254
255
Parameters:
256
- container: Service container instance
257
- *entrypoints: Names of entrypoints to keep active (all others disabled)
258
"""
259
```
260
261
**Usage Example:**
262
263
```python
264
from nameko.testing.services import restrict_entrypoints
265
266
class MultiEntrypointService:
267
name = "multi_service"
268
269
@rpc
270
def rpc_method(self):
271
return "rpc response"
272
273
@http('GET', '/api/data')
274
def http_method(self, request):
275
return "http response"
276
277
@event_handler('other_service', 'test_event')
278
def event_method(self, payload):
279
return "event processed"
280
281
@timer(interval=60)
282
def timer_method(self):
283
return "timer executed"
284
285
def test_only_rpc_entrypoint():
286
"""Test with only RPC entrypoint active"""
287
container = ServiceContainer(MultiEntrypointService, config={})
288
289
# Restrict to only RPC entrypoint
290
restrict_entrypoints(container, 'rpc_method')
291
292
container.start()
293
294
try:
295
# RPC method should work
296
hook = entrypoint_hook(container, 'rpc_method')
297
result = hook()
298
assert result == "rpc response"
299
300
# Other entrypoints should be disabled
301
# (HTTP, event, timer entrypoints won't be active)
302
303
finally:
304
container.stop()
305
306
def test_multiple_entrypoints():
307
"""Test with multiple specific entrypoints active"""
308
container = ServiceContainer(MultiEntrypointService, config={})
309
310
# Keep both RPC and event entrypoints active
311
restrict_entrypoints(container, 'rpc_method', 'event_method')
312
313
container.start()
314
315
try:
316
# Both RPC and event methods should work
317
rpc_hook = entrypoint_hook(container, 'rpc_method')
318
event_hook = entrypoint_hook(container, 'event_method')
319
320
assert rpc_hook() == "rpc response"
321
assert event_hook({'test': 'data'}) == "event processed"
322
323
# HTTP and timer entrypoints are disabled
324
325
finally:
326
container.stop()
327
```
328
329
### Mock Entrypoints and Extensions
330
331
Testing utilities for creating mock entrypoints and dependency providers.
332
333
```python { .api }
334
class MockDependencyProvider:
335
"""
336
Mock dependency provider for testing.
337
338
Parameters:
339
- attr_name: Name of the dependency attribute on the service
340
- dependency: Mock object to inject (defaults to Mock())
341
"""
342
343
def __init__(self, attr_name, dependency=None): ...
344
345
def once(*args, **kwargs):
346
"""
347
Decorator that creates an entrypoint that fires only once for testing.
348
Useful for testing specific execution paths without ongoing triggers.
349
"""
350
351
@once
352
def test_entrypoint(self):
353
"""Example once-only entrypoint"""
354
...
355
356
def dummy(*args, **kwargs):
357
"""
358
Decorator that creates a dummy entrypoint for testing.
359
The entrypoint is registered but does nothing, useful for testing
360
service structure without external triggers.
361
"""
362
363
@dummy
364
def placeholder_entrypoint(self):
365
"""Example dummy entrypoint"""
366
...
367
```
368
369
**Mock Usage Example:**
370
371
```python
372
from nameko.testing.services import MockDependencyProvider, worker_factory
373
from unittest.mock import Mock
374
375
class ServiceWithDependencies:
376
name = "test_service"
377
378
database = DatabaseProvider()
379
cache = CacheProvider()
380
external_api = HttpClient()
381
382
@rpc
383
def complex_operation(self, data):
384
# Use multiple dependencies
385
db_result = self.database.query(data['query'])
386
cached_data = self.cache.get(data['cache_key'])
387
api_response = self.external_api.post('/endpoint', data)
388
389
return {
390
'db_result': db_result,
391
'cached_data': cached_data,
392
'api_response': api_response
393
}
394
395
def test_with_mock_dependencies():
396
# Create specific mocks for each dependency
397
mock_db = Mock()
398
mock_db.query.return_value = {'id': 1, 'name': 'test'}
399
400
mock_cache = Mock()
401
mock_cache.get.return_value = 'cached_value'
402
403
mock_api = Mock()
404
mock_api.post.return_value = {'status': 'success'}
405
406
# Create worker with all mocked dependencies
407
worker = worker_factory(
408
ServiceWithDependencies,
409
database=mock_db,
410
cache=mock_cache,
411
external_api=mock_api
412
)
413
414
# Test the service method
415
result = worker.complex_operation({
416
'query': 'SELECT * FROM users',
417
'cache_key': 'user:123'
418
})
419
420
# Verify results
421
assert result['db_result']['name'] == 'test'
422
assert result['cached_data'] == 'cached_value'
423
assert result['api_response']['status'] == 'success'
424
425
# Verify mock interactions
426
mock_db.query.assert_called_once_with('SELECT * FROM users')
427
mock_cache.get.assert_called_once_with('user:123')
428
mock_api.post.assert_called_once()
429
```
430
431
### Integration Testing
432
433
Tools for integration testing with real message brokers and external services.
434
435
```python { .api }
436
from nameko.testing.pytest import NamekoTestEnvironment
437
438
class NamekoTestEnvironment:
439
"""
440
Test environment for integration testing with real AMQP broker.
441
442
Provides utilities for running services with real message passing
443
while maintaining test isolation.
444
"""
445
446
def __init__(self, config=None): ...
447
448
def start_service(self, service_cls): ...
449
450
def stop_all_services(self): ...
451
452
def get_rpc_proxy(self, service_name): ...
453
454
def dispatch_event(self, source_service, event_type, event_data): ...
455
```
456
457
**Integration Test Example:**
458
459
```python
460
import pytest
461
from nameko.testing.pytest import NamekoTestEnvironment
462
463
@pytest.fixture
464
def test_env():
465
"""Create test environment with real AMQP broker"""
466
config = {
467
'AMQP_URI': 'amqp://guest:guest@localhost:5672/test_vhost'
468
}
469
env = NamekoTestEnvironment(config)
470
yield env
471
env.stop_all_services()
472
473
def test_service_integration(test_env):
474
# Start multiple services
475
test_env.start_service(UserService)
476
test_env.start_service(EmailService)
477
test_env.start_service(OrderService)
478
479
# Get RPC proxy for testing
480
user_rpc = test_env.get_rpc_proxy('user_service')
481
482
# Test inter-service communication
483
user_result = user_rpc.create_user({
484
'name': 'Test User',
485
'email': 'test@example.com'
486
})
487
488
assert user_result['user_id'] is not None
489
490
# Test event-driven behavior
491
test_env.dispatch_event('user_service', 'user_created', {
492
'user_id': user_result['user_id'],
493
'email': 'test@example.com'
494
})
495
496
# Verify that event was processed (check side effects)
497
# This would typically involve checking database state,
498
# file system, or other observable effects
499
```
500
501
### HTTP Endpoint Testing
502
503
Testing utilities specifically for HTTP endpoints and web interfaces.
504
505
```python
506
from nameko.testing.services import worker_factory
507
from werkzeug.test import Client
508
from werkzeug.wrappers import Response
509
510
class APIService:
511
name = "api_service"
512
513
@http('GET', '/users/<int:user_id>')
514
def get_user(self, request, user_id):
515
return {'user_id': user_id, 'name': 'Test User'}
516
517
@http('POST', '/users')
518
def create_user(self, request):
519
data = request.get_json()
520
return {'user_id': 123, 'name': data['name']}, 201
521
522
def test_http_endpoints():
523
# Create worker for HTTP service
524
worker = worker_factory(APIService)
525
526
# Test GET endpoint
527
from werkzeug.test import EnvironBuilder
528
from werkzeug.wrappers import Request
529
530
# Create mock request
531
builder = EnvironBuilder(path='/users/456', method='GET')
532
request = Request(builder.get_environ())
533
534
response = worker.get_user(request, user_id=456)
535
assert response['user_id'] == 456
536
537
# Test POST endpoint
538
builder = EnvironBuilder(
539
path='/users',
540
method='POST',
541
data='{"name": "New User"}',
542
content_type='application/json'
543
)
544
request = Request(builder.get_environ())
545
546
response, status_code = worker.create_user(request)
547
assert status_code == 201
548
assert response['name'] == 'New User'
549
```
550
551
### Test Configuration
552
553
Best practices for test configuration and environment management.
554
555
```python
556
# conftest.py - pytest configuration
557
import pytest
558
from nameko.testing.pytest import NamekoTestEnvironment
559
560
@pytest.fixture(scope='session')
561
def test_config():
562
"""Test configuration with isolated resources"""
563
return {
564
'AMQP_URI': 'amqp://guest:guest@localhost:5672/test_vhost',
565
'DATABASE_URL': 'sqlite:///:memory:',
566
'REDIS_URL': 'redis://localhost:6379/15', # Use test database
567
'DEBUG': True,
568
'TESTING': True
569
}
570
571
@pytest.fixture
572
def test_env(test_config):
573
"""Isolated test environment per test"""
574
env = NamekoTestEnvironment(test_config)
575
yield env
576
env.stop_all_services()
577
578
# Test database setup
579
@pytest.fixture
580
def clean_database():
581
"""Ensure clean database state for each test"""
582
# Setup test database
583
setup_test_database()
584
yield
585
# Cleanup test database
586
cleanup_test_database()
587
```
588
589
### Performance Testing
590
591
Utilities for load testing and performance measurement.
592
593
```python
594
import time
595
from concurrent.futures import ThreadPoolExecutor, as_completed
596
597
def load_test_rpc_service(service_name, method_name, payload, num_requests=100, concurrency=10):
598
"""Load test an RPC service method"""
599
600
def make_request():
601
start_time = time.time()
602
try:
603
# Make RPC call
604
rpc_proxy = get_rpc_proxy(service_name)
605
result = getattr(rpc_proxy, method_name)(payload)
606
return time.time() - start_time, True, result
607
except Exception as e:
608
return time.time() - start_time, False, str(e)
609
610
# Execute concurrent requests
611
with ThreadPoolExecutor(max_workers=concurrency) as executor:
612
futures = [executor.submit(make_request) for _ in range(num_requests)]
613
614
response_times = []
615
success_count = 0
616
617
for future in as_completed(futures):
618
duration, success, result = future.result()
619
response_times.append(duration)
620
if success:
621
success_count += 1
622
623
# Calculate statistics
624
avg_response_time = sum(response_times) / len(response_times)
625
success_rate = success_count / num_requests
626
627
return {
628
'avg_response_time': avg_response_time,
629
'success_rate': success_rate,
630
'total_requests': num_requests,
631
'concurrency': concurrency
632
}
633
634
def test_service_performance():
635
"""Test service performance under load"""
636
stats = load_test_rpc_service(
637
'user_service',
638
'get_user',
639
{'user_id': 123},
640
num_requests=1000,
641
concurrency=50
642
)
643
644
# Assert performance requirements
645
assert stats['avg_response_time'] < 0.1 # Less than 100ms average
646
assert stats['success_rate'] > 0.99 # 99% success rate
647
```