or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mddependency-injection.mdevent-system.mdhttp-interface.mdindex.mdrpc-communication.mdservice-management.mdstandalone-clients.mdtesting-framework.mdtimer-scheduling.md

dependency-injection.mddocs/

0

# Dependency Injection

1

2

System for providing services with access to external resources like databases, configuration, caches, and other services through clean dependency injection patterns and lifecycle management.

3

4

## Capabilities

5

6

### Config Dependency Provider

7

8

Provides access to service configuration with support for nested values, environment variables, and default fallbacks.

9

10

```python { .api }

11

class Config:

12

"""

13

Dependency provider for accessing configuration values.

14

15

Parameters:

16

- key: Optional specific configuration key to bind to

17

"""

18

19

def __init__(self, key=None): ...

20

```

21

22

**Usage Example:**

23

24

```python

25

from nameko.dependency_providers import Config

26

27

class DatabaseService:

28

name = "database_service"

29

30

# Inject entire config

31

config = Config()

32

33

# Inject specific config key

34

db_config = Config('DATABASE')

35

api_key = Config('API_KEY')

36

37

@rpc

38

def connect_to_database(self):

39

# Access nested configuration

40

host = self.db_config['host']

41

port = self.db_config['port']

42

username = self.db_config['username']

43

password = self.db_config['password']

44

45

connection_string = f"postgresql://{username}:{password}@{host}:{port}"

46

return self._connect(connection_string)

47

48

@rpc

49

def get_api_settings(self):

50

# Access configuration with defaults

51

timeout = self.config.get('API_TIMEOUT', 30)

52

retries = self.config.get('API_RETRIES', 3)

53

54

return {

55

'api_key': self.api_key,

56

'timeout': timeout,

57

'retries': retries

58

}

59

```

60

61

### Context Data Providers

62

63

Built-in dependency providers for accessing request context data like user information, authentication tokens, and request metadata.

64

65

```python { .api }

66

class Language:

67

"""

68

Dependency provider for accessing request language context.

69

"""

70

71

def __init__(self): ...

72

73

class UserId:

74

"""

75

Dependency provider for accessing user ID from request context.

76

"""

77

78

def __init__(self): ...

79

80

class UserAgent:

81

"""

82

Dependency provider for accessing user agent from request context.

83

"""

84

85

def __init__(self): ...

86

87

class AuthToken:

88

"""

89

Dependency provider for accessing authentication token from request context.

90

"""

91

92

def __init__(self): ...

93

```

94

95

**Usage Example:**

96

97

```python

98

from nameko.contextdata import Language, UserId, UserAgent, AuthToken

99

from nameko.rpc import rpc

100

101

class UserService:

102

name = "user_service"

103

104

# Context data providers

105

language = Language()

106

user_id = UserId()

107

user_agent = UserAgent()

108

auth_token = AuthToken()

109

110

@rpc

111

def get_user_preferences(self):

112

"""Get user preferences with context awareness"""

113

current_user_id = self.user_id

114

preferred_language = self.language

115

client_info = self.user_agent

116

token = self.auth_token

117

118

return {

119

'user_id': current_user_id,

120

'language': preferred_language,

121

'client': client_info,

122

'authenticated': bool(token)

123

}

124

125

@rpc

126

def log_user_action(self, action):

127

"""Log user action with full context"""

128

context = {

129

'user_id': self.user_id,

130

'language': self.language,

131

'user_agent': self.user_agent,

132

'action': action,

133

'timestamp': time.time()

134

}

135

136

# Log with context information

137

self._log_action(context)

138

return {'status': 'logged'}

139

```

140

141

### Custom Dependency Providers

142

143

Create custom dependency providers for databases, caches, external APIs, and other resources.

144

145

```python { .api }

146

from nameko.extensions import DependencyProvider

147

148

class DatabaseProvider(DependencyProvider):

149

"""

150

Custom dependency provider for database connections.

151

"""

152

153

def setup(self):

154

"""Called when the service container is started"""

155

self.connection_pool = self._create_connection_pool()

156

157

def stop(self):

158

"""Called when the service container is stopped"""

159

self.connection_pool.close()

160

161

def get_dependency(self, worker_ctx):

162

"""Called for each service method invocation"""

163

return self.connection_pool.get_connection()

164

```

165

166

**Usage Example:**

167

168

```python

169

import redis

170

from nameko.extensions import DependencyProvider

171

172

class RedisProvider(DependencyProvider):

173

"""Redis connection provider with connection pooling"""

174

175

def setup(self):

176

config = self.container.config

177

redis_url = config.get('REDIS_URL', 'redis://localhost:6379/0')

178

self.connection_pool = redis.ConnectionPool.from_url(redis_url)

179

180

def stop(self):

181

self.connection_pool.disconnect()

182

183

def get_dependency(self, worker_ctx):

184

return redis.Redis(connection_pool=self.connection_pool)

185

186

class CacheService:

187

name = "cache_service"

188

189

# Custom dependency injection

190

redis = RedisProvider()

191

config = Config()

192

193

@rpc

194

def get_cached_value(self, key):

195

"""Get value from Redis cache"""

196

value = self.redis.get(key)

197

return value.decode('utf-8') if value else None

198

199

@rpc

200

def set_cached_value(self, key, value, ttl=None):

201

"""Set value in Redis cache with optional TTL"""

202

ttl = ttl or self.config.get('DEFAULT_CACHE_TTL', 3600)

203

self.redis.setex(key, ttl, value)

204

return True

205

206

@rpc

207

def clear_cache_pattern(self, pattern):

208

"""Clear cache entries matching pattern"""

209

keys = self.redis.keys(pattern)

210

if keys:

211

self.redis.delete(*keys)

212

return len(keys)

213

```

214

215

### Database Integration

216

217

Common patterns for database integration with connection management and transaction handling.

218

219

```python

220

import sqlalchemy as sa

221

from sqlalchemy.orm import sessionmaker

222

from nameko.extensions import DependencyProvider

223

224

class DatabaseSession(DependencyProvider):

225

"""SQLAlchemy database session provider"""

226

227

def setup(self):

228

config = self.container.config

229

db_url = config['DATABASE_URL']

230

231

self.engine = sa.create_engine(db_url, pool_pre_ping=True)

232

self.Session = sessionmaker(bind=self.engine)

233

234

def stop(self):

235

self.engine.dispose()

236

237

def get_dependency(self, worker_ctx):

238

return self.Session()

239

240

class UserService:

241

name = "user_service"

242

243

db = DatabaseSession()

244

245

@rpc

246

def create_user(self, user_data):

247

"""Create user with automatic session management"""

248

session = self.db

249

try:

250

user = User(**user_data)

251

session.add(user)

252

session.commit()

253

return {'id': user.id, 'email': user.email}

254

except Exception:

255

session.rollback()

256

raise

257

finally:

258

session.close()

259

260

@rpc

261

def get_users_by_status(self, status):

262

"""Query users with session cleanup"""

263

session = self.db

264

try:

265

users = session.query(User).filter(User.status == status).all()

266

return [{'id': u.id, 'email': u.email} for u in users]

267

finally:

268

session.close()

269

```

270

271

### Shared Dependencies

272

273

Dependencies that are shared across multiple services for resource efficiency.

274

275

```python { .api }

276

from nameko.extensions import SharedExtension

277

278

class SharedCache(SharedExtension):

279

"""

280

Shared cache instance across multiple services.

281

Only one instance is created per service container.

282

"""

283

284

def setup(self):

285

self.cache = {}

286

self.max_size = 1000

287

288

def get(self, key):

289

return self.cache.get(key)

290

291

def set(self, key, value):

292

if len(self.cache) >= self.max_size:

293

# Simple LRU eviction

294

oldest_key = next(iter(self.cache))

295

del self.cache[oldest_key]

296

self.cache[key] = value

297

```

298

299

**Usage Example:**

300

301

```python

302

class UserService:

303

name = "user_service"

304

305

cache = SharedCache()

306

307

@rpc

308

def get_user(self, user_id):

309

# Check cache first

310

cached_user = self.cache.get(f'user:{user_id}')

311

if cached_user:

312

return cached_user

313

314

# Fetch from database

315

user = self._fetch_user_from_db(user_id)

316

317

# Cache the result

318

self.cache.set(f'user:{user_id}', user)

319

return user

320

321

class OrderService:

322

name = "order_service"

323

324

# Same shared cache instance

325

cache = SharedCache()

326

327

@rpc

328

def invalidate_user_cache(self, user_id):

329

# Invalidate cached user data

330

cache_key = f'user:{user_id}'

331

if self.cache.get(cache_key):

332

del self.cache.cache[cache_key]

333

```

334

335

### Lifecycle Management

336

337

Dependency providers support full lifecycle management with setup, teardown, and worker context handling.

338

339

```python

340

class ManagedResource(DependencyProvider):

341

"""Example of full lifecycle management"""

342

343

def setup(self):

344

"""Called once when service container starts"""

345

print("Setting up managed resource...")

346

self.resource = self._initialize_resource()

347

self.health_checker = self._start_health_checker()

348

349

def stop(self):

350

"""Called once when service container stops"""

351

print("Stopping managed resource...")

352

self.health_checker.stop()

353

self.resource.close()

354

355

def worker_setup(self, worker_ctx):

356

"""Called when a new worker starts"""

357

worker_ctx.managed_resource_session = self._create_session()

358

359

def worker_teardown(self, worker_ctx):

360

"""Called when a worker finishes"""

361

if hasattr(worker_ctx, 'managed_resource_session'):

362

worker_ctx.managed_resource_session.close()

363

364

def get_dependency(self, worker_ctx):

365

"""Called for each service method invocation"""

366

return worker_ctx.managed_resource_session

367

```

368

369

### Dependency Testing

370

371

Testing utilities for mocking and replacing dependencies during testing.

372

373

```python

374

from nameko.testing.services import worker_factory, replace_dependencies

375

376

class TestUserService:

377

378

def test_create_user_with_mock_db(self):

379

# Mock database dependency

380

mock_db = Mock()

381

mock_db.save_user.return_value = {'id': 123}

382

383

# Create service worker with mocked dependency

384

service = worker_factory(UserService, db=mock_db)

385

386

# Test service method

387

result = service.create_user({'email': 'test@example.com'})

388

389

assert result['id'] == 123

390

mock_db.save_user.assert_called_once()

391

392

def test_service_with_replaced_dependencies(self):

393

# Create container with real service

394

container = ServiceContainer(UserService, config={})

395

396

# Replace dependencies for testing

397

mock_cache = Mock()

398

replace_dependencies(container, cache=mock_cache)

399

400

# Test with replaced dependencies

401

container.start()

402

# ... test logic

403

container.stop()

404

```

405

406

### Configuration-Based Dependencies

407

408

Dynamic dependency configuration based on environment and configuration.

409

410

```python

411

class ConditionalDependency(DependencyProvider):

412

"""Dependency that changes behavior based on configuration"""

413

414

def setup(self):

415

config = self.container.config

416

417

if config.get('USE_REDIS_CACHE'):

418

self.backend = RedisCache(config['REDIS_URL'])

419

elif config.get('USE_MEMORY_CACHE'):

420

self.backend = MemoryCache(config.get('CACHE_SIZE', 1000))

421

else:

422

self.backend = NoOpCache()

423

424

def get_dependency(self, worker_ctx):

425

return self.backend

426

427

class FlexibleService:

428

name = "flexible_service"

429

430

# Dependency behavior changes based on config

431

cache = ConditionalDependency()

432

433

@rpc

434

def cache_value(self, key, value):

435

"""Caching works regardless of backend"""

436

self.cache.set(key, value)

437

return "cached"

438

```

439

440

### Error Handling in Dependencies

441

442

Proper error handling and recovery in dependency providers.

443

444

```python

445

class ResilientDatabaseProvider(DependencyProvider):

446

"""Database provider with connection retry and recovery"""

447

448

def setup(self):

449

self.max_retries = 3

450

self.retry_delay = 1

451

self._connect_with_retry()

452

453

def _connect_with_retry(self):

454

for attempt in range(self.max_retries):

455

try:

456

config = self.container.config

457

self.connection = self._create_connection(config['DATABASE_URL'])

458

return

459

except Exception as e:

460

if attempt == self.max_retries - 1:

461

raise ConnectionError(f"Failed to connect after {self.max_retries} attempts")

462

time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff

463

464

def get_dependency(self, worker_ctx):

465

# Check connection health before returning

466

if not self._is_connection_healthy():

467

self._reconnect()

468

return self.connection

469

470

def _is_connection_healthy(self):

471

try:

472

self.connection.ping()

473

return True

474

except:

475

return False

476

477

def _reconnect(self):

478

try:

479

self.connection.close()

480

except:

481

pass

482

self._connect_with_retry()

483

```