0
# Dependency Injection
1
2
System for providing services with access to external resources like databases, configuration, caches, and other services through clean dependency injection patterns and lifecycle management.
3
4
## Capabilities
5
6
### Config Dependency Provider
7
8
Provides access to service configuration with support for nested values, environment variables, and default fallbacks.
9
10
```python { .api }
11
class Config:
12
"""
13
Dependency provider for accessing configuration values.
14
15
Parameters:
16
- key: Optional specific configuration key to bind to
17
"""
18
19
def __init__(self, key=None): ...
20
```
21
22
**Usage Example:**
23
24
```python
25
from nameko.dependency_providers import Config
26
27
class DatabaseService:
28
name = "database_service"
29
30
# Inject entire config
31
config = Config()
32
33
# Inject specific config key
34
db_config = Config('DATABASE')
35
api_key = Config('API_KEY')
36
37
@rpc
38
def connect_to_database(self):
39
# Access nested configuration
40
host = self.db_config['host']
41
port = self.db_config['port']
42
username = self.db_config['username']
43
password = self.db_config['password']
44
45
connection_string = f"postgresql://{username}:{password}@{host}:{port}"
46
return self._connect(connection_string)
47
48
@rpc
49
def get_api_settings(self):
50
# Access configuration with defaults
51
timeout = self.config.get('API_TIMEOUT', 30)
52
retries = self.config.get('API_RETRIES', 3)
53
54
return {
55
'api_key': self.api_key,
56
'timeout': timeout,
57
'retries': retries
58
}
59
```
60
61
### Context Data Providers
62
63
Built-in dependency providers for accessing request context data like user information, authentication tokens, and request metadata.
64
65
```python { .api }
66
class Language:
67
"""
68
Dependency provider for accessing request language context.
69
"""
70
71
def __init__(self): ...
72
73
class UserId:
74
"""
75
Dependency provider for accessing user ID from request context.
76
"""
77
78
def __init__(self): ...
79
80
class UserAgent:
81
"""
82
Dependency provider for accessing user agent from request context.
83
"""
84
85
def __init__(self): ...
86
87
class AuthToken:
88
"""
89
Dependency provider for accessing authentication token from request context.
90
"""
91
92
def __init__(self): ...
93
```
94
95
**Usage Example:**
96
97
```python
98
from nameko.contextdata import Language, UserId, UserAgent, AuthToken
99
from nameko.rpc import rpc
100
101
class UserService:
102
name = "user_service"
103
104
# Context data providers
105
language = Language()
106
user_id = UserId()
107
user_agent = UserAgent()
108
auth_token = AuthToken()
109
110
@rpc
111
def get_user_preferences(self):
112
"""Get user preferences with context awareness"""
113
current_user_id = self.user_id
114
preferred_language = self.language
115
client_info = self.user_agent
116
token = self.auth_token
117
118
return {
119
'user_id': current_user_id,
120
'language': preferred_language,
121
'client': client_info,
122
'authenticated': bool(token)
123
}
124
125
@rpc
126
def log_user_action(self, action):
127
"""Log user action with full context"""
128
context = {
129
'user_id': self.user_id,
130
'language': self.language,
131
'user_agent': self.user_agent,
132
'action': action,
133
'timestamp': time.time()
134
}
135
136
# Log with context information
137
self._log_action(context)
138
return {'status': 'logged'}
139
```
140
141
### Custom Dependency Providers
142
143
Create custom dependency providers for databases, caches, external APIs, and other resources.
144
145
```python { .api }
146
from nameko.extensions import DependencyProvider
147
148
class DatabaseProvider(DependencyProvider):
149
"""
150
Custom dependency provider for database connections.
151
"""
152
153
def setup(self):
154
"""Called when the service container is started"""
155
self.connection_pool = self._create_connection_pool()
156
157
def stop(self):
158
"""Called when the service container is stopped"""
159
self.connection_pool.close()
160
161
def get_dependency(self, worker_ctx):
162
"""Called for each service method invocation"""
163
return self.connection_pool.get_connection()
164
```
165
166
**Usage Example:**
167
168
```python
169
import redis
170
from nameko.extensions import DependencyProvider
171
172
class RedisProvider(DependencyProvider):
173
"""Redis connection provider with connection pooling"""
174
175
def setup(self):
176
config = self.container.config
177
redis_url = config.get('REDIS_URL', 'redis://localhost:6379/0')
178
self.connection_pool = redis.ConnectionPool.from_url(redis_url)
179
180
def stop(self):
181
self.connection_pool.disconnect()
182
183
def get_dependency(self, worker_ctx):
184
return redis.Redis(connection_pool=self.connection_pool)
185
186
class CacheService:
187
name = "cache_service"
188
189
# Custom dependency injection
190
redis = RedisProvider()
191
config = Config()
192
193
@rpc
194
def get_cached_value(self, key):
195
"""Get value from Redis cache"""
196
value = self.redis.get(key)
197
return value.decode('utf-8') if value else None
198
199
@rpc
200
def set_cached_value(self, key, value, ttl=None):
201
"""Set value in Redis cache with optional TTL"""
202
ttl = ttl or self.config.get('DEFAULT_CACHE_TTL', 3600)
203
self.redis.setex(key, ttl, value)
204
return True
205
206
@rpc
207
def clear_cache_pattern(self, pattern):
208
"""Clear cache entries matching pattern"""
209
keys = self.redis.keys(pattern)
210
if keys:
211
self.redis.delete(*keys)
212
return len(keys)
213
```
214
215
### Database Integration
216
217
Common patterns for database integration with connection management and transaction handling.
218
219
```python
220
import sqlalchemy as sa
221
from sqlalchemy.orm import sessionmaker
222
from nameko.extensions import DependencyProvider
223
224
class DatabaseSession(DependencyProvider):
225
"""SQLAlchemy database session provider"""
226
227
def setup(self):
228
config = self.container.config
229
db_url = config['DATABASE_URL']
230
231
self.engine = sa.create_engine(db_url, pool_pre_ping=True)
232
self.Session = sessionmaker(bind=self.engine)
233
234
def stop(self):
235
self.engine.dispose()
236
237
def get_dependency(self, worker_ctx):
238
return self.Session()
239
240
class UserService:
241
name = "user_service"
242
243
db = DatabaseSession()
244
245
@rpc
246
def create_user(self, user_data):
247
"""Create user with automatic session management"""
248
session = self.db
249
try:
250
user = User(**user_data)
251
session.add(user)
252
session.commit()
253
return {'id': user.id, 'email': user.email}
254
except Exception:
255
session.rollback()
256
raise
257
finally:
258
session.close()
259
260
@rpc
261
def get_users_by_status(self, status):
262
"""Query users with session cleanup"""
263
session = self.db
264
try:
265
users = session.query(User).filter(User.status == status).all()
266
return [{'id': u.id, 'email': u.email} for u in users]
267
finally:
268
session.close()
269
```
270
271
### Shared Dependencies
272
273
Dependencies that are shared across multiple services for resource efficiency.
274
275
```python { .api }
276
from nameko.extensions import SharedExtension
277
278
class SharedCache(SharedExtension):
279
"""
280
Shared cache instance across multiple services.
281
Only one instance is created per service container.
282
"""
283
284
def setup(self):
285
self.cache = {}
286
self.max_size = 1000
287
288
def get(self, key):
289
return self.cache.get(key)
290
291
def set(self, key, value):
292
if len(self.cache) >= self.max_size:
293
# Simple LRU eviction
294
oldest_key = next(iter(self.cache))
295
del self.cache[oldest_key]
296
self.cache[key] = value
297
```
298
299
**Usage Example:**
300
301
```python
302
class UserService:
303
name = "user_service"
304
305
cache = SharedCache()
306
307
@rpc
308
def get_user(self, user_id):
309
# Check cache first
310
cached_user = self.cache.get(f'user:{user_id}')
311
if cached_user:
312
return cached_user
313
314
# Fetch from database
315
user = self._fetch_user_from_db(user_id)
316
317
# Cache the result
318
self.cache.set(f'user:{user_id}', user)
319
return user
320
321
class OrderService:
322
name = "order_service"
323
324
# Same shared cache instance
325
cache = SharedCache()
326
327
@rpc
328
def invalidate_user_cache(self, user_id):
329
# Invalidate cached user data
330
cache_key = f'user:{user_id}'
331
if self.cache.get(cache_key):
332
del self.cache.cache[cache_key]
333
```
334
335
### Lifecycle Management
336
337
Dependency providers support full lifecycle management with setup, teardown, and worker context handling.
338
339
```python
340
class ManagedResource(DependencyProvider):
341
"""Example of full lifecycle management"""
342
343
def setup(self):
344
"""Called once when service container starts"""
345
print("Setting up managed resource...")
346
self.resource = self._initialize_resource()
347
self.health_checker = self._start_health_checker()
348
349
def stop(self):
350
"""Called once when service container stops"""
351
print("Stopping managed resource...")
352
self.health_checker.stop()
353
self.resource.close()
354
355
def worker_setup(self, worker_ctx):
356
"""Called when a new worker starts"""
357
worker_ctx.managed_resource_session = self._create_session()
358
359
def worker_teardown(self, worker_ctx):
360
"""Called when a worker finishes"""
361
if hasattr(worker_ctx, 'managed_resource_session'):
362
worker_ctx.managed_resource_session.close()
363
364
def get_dependency(self, worker_ctx):
365
"""Called for each service method invocation"""
366
return worker_ctx.managed_resource_session
367
```
368
369
### Dependency Testing
370
371
Testing utilities for mocking and replacing dependencies during testing.
372
373
```python
374
from nameko.testing.services import worker_factory, replace_dependencies
375
376
class TestUserService:
377
378
def test_create_user_with_mock_db(self):
379
# Mock database dependency
380
mock_db = Mock()
381
mock_db.save_user.return_value = {'id': 123}
382
383
# Create service worker with mocked dependency
384
service = worker_factory(UserService, db=mock_db)
385
386
# Test service method
387
result = service.create_user({'email': 'test@example.com'})
388
389
assert result['id'] == 123
390
mock_db.save_user.assert_called_once()
391
392
def test_service_with_replaced_dependencies(self):
393
# Create container with real service
394
container = ServiceContainer(UserService, config={})
395
396
# Replace dependencies for testing
397
mock_cache = Mock()
398
replace_dependencies(container, cache=mock_cache)
399
400
# Test with replaced dependencies
401
container.start()
402
# ... test logic
403
container.stop()
404
```
405
406
### Configuration-Based Dependencies
407
408
Dynamic dependency configuration based on environment and configuration.
409
410
```python
411
class ConditionalDependency(DependencyProvider):
412
"""Dependency that changes behavior based on configuration"""
413
414
def setup(self):
415
config = self.container.config
416
417
if config.get('USE_REDIS_CACHE'):
418
self.backend = RedisCache(config['REDIS_URL'])
419
elif config.get('USE_MEMORY_CACHE'):
420
self.backend = MemoryCache(config.get('CACHE_SIZE', 1000))
421
else:
422
self.backend = NoOpCache()
423
424
def get_dependency(self, worker_ctx):
425
return self.backend
426
427
class FlexibleService:
428
name = "flexible_service"
429
430
# Dependency behavior changes based on config
431
cache = ConditionalDependency()
432
433
@rpc
434
def cache_value(self, key, value):
435
"""Caching works regardless of backend"""
436
self.cache.set(key, value)
437
return "cached"
438
```
439
440
### Error Handling in Dependencies
441
442
Proper error handling and recovery in dependency providers.
443
444
```python
445
class ResilientDatabaseProvider(DependencyProvider):
446
"""Database provider with connection retry and recovery"""
447
448
def setup(self):
449
self.max_retries = 3
450
self.retry_delay = 1
451
self._connect_with_retry()
452
453
def _connect_with_retry(self):
454
for attempt in range(self.max_retries):
455
try:
456
config = self.container.config
457
self.connection = self._create_connection(config['DATABASE_URL'])
458
return
459
except Exception as e:
460
if attempt == self.max_retries - 1:
461
raise ConnectionError(f"Failed to connect after {self.max_retries} attempts")
462
time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff
463
464
def get_dependency(self, worker_ctx):
465
# Check connection health before returning
466
if not self._is_connection_healthy():
467
self._reconnect()
468
return self.connection
469
470
def _is_connection_healthy(self):
471
try:
472
self.connection.ping()
473
return True
474
except:
475
return False
476
477
def _reconnect(self):
478
try:
479
self.connection.close()
480
except:
481
pass
482
self._connect_with_retry()
483
```