or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mddependency-injection.mdevent-system.mdhttp-interface.mdindex.mdrpc-communication.mdservice-management.mdstandalone-clients.mdtesting-framework.mdtimer-scheduling.md

testing-framework.mddocs/

0

# Testing Framework

1

2

Comprehensive testing utilities for unit testing services, mocking dependencies, integration testing with real message brokers, and end-to-end testing patterns.

3

4

## Capabilities

5

6

### Worker Factory

7

8

Creates service worker instances for isolated unit testing without requiring full service containers or message brokers.

9

10

```python { .api }

11

def worker_factory(service_cls, **dependencies):

12

"""

13

Create a service worker instance for testing.

14

15

Parameters:

16

- service_cls: The service class to instantiate

17

- **dependencies: Keyword arguments to override service dependencies

18

19

Returns:

20

Service worker instance with mocked or provided dependencies

21

"""

22

```

23

24

**Usage Example:**

25

26

```python

27

from nameko.testing.services import worker_factory

28

from unittest.mock import Mock

29

30

class UserService:

31

name = "user_service"

32

33

database = DatabaseProvider()

34

cache = CacheProvider()

35

36

@rpc

37

def create_user(self, user_data):

38

user_id = self.database.save_user(user_data)

39

self.cache.set(f'user:{user_id}', user_data)

40

return {'user_id': user_id}

41

42

def test_create_user():

43

# Mock dependencies

44

mock_database = Mock()

45

mock_database.save_user.return_value = 123

46

mock_cache = Mock()

47

48

# Create worker with mocked dependencies

49

worker = worker_factory(UserService, database=mock_database, cache=mock_cache)

50

51

# Test the service method

52

result = worker.create_user({'name': 'John', 'email': 'john@example.com'})

53

54

# Verify behavior

55

assert result['user_id'] == 123

56

mock_database.save_user.assert_called_once_with({'name': 'John', 'email': 'john@example.com'})

57

mock_cache.set.assert_called_once_with('user:123', {'name': 'John', 'email': 'john@example.com'})

58

```

59

60

### Entrypoint Hook

61

62

Provides hooks for testing specific entrypoints (RPC methods, event handlers, HTTP endpoints) in isolation.

63

64

```python { .api }

65

def entrypoint_hook(container, method_name):

66

"""

67

Create a hook for testing a specific entrypoint method.

68

69

Parameters:

70

- container: Service container instance

71

- method_name: Name of the service method to hook

72

73

Returns:

74

Callable that can invoke the hooked method

75

"""

76

```

77

78

**Usage Example:**

79

80

```python

81

from nameko.testing.services import entrypoint_hook

82

from nameko.containers import ServiceContainer

83

84

class EmailService:

85

name = "email_service"

86

87

@event_handler('user_service', 'user_registered')

88

def send_welcome_email(self, payload):

89

email = payload['email']

90

# Send email logic

91

return f"Welcome email sent to {email}"

92

93

def test_welcome_email_handler():

94

# Create service container

95

container = ServiceContainer(EmailService, config={})

96

container.start()

97

98

try:

99

# Hook the event handler method

100

send_welcome_email = entrypoint_hook(container, 'send_welcome_email')

101

102

# Test the event handler directly

103

result = send_welcome_email({'email': 'user@example.com'})

104

105

assert result == "Welcome email sent to user@example.com"

106

finally:

107

container.stop()

108

```

109

110

### Entrypoint Waiter

111

112

Utility for testing asynchronous operations and waiting for entrypoints to complete execution.

113

114

```python { .api }

115

def entrypoint_waiter(container, method_name, timeout=None):

116

"""

117

Wait for an entrypoint method to be called and complete.

118

119

Parameters:

120

- container: Service container instance

121

- method_name: Name of the service method to wait for

122

- timeout: Maximum time to wait in seconds

123

124

Returns:

125

Context manager that waits for method completion

126

"""

127

```

128

129

**Usage Example:**

130

131

```python

132

from nameko.testing.services import entrypoint_waiter

133

import threading

134

135

class AsyncProcessingService:

136

name = "async_service"

137

138

@event_handler('data_service', 'data_received')

139

def process_data_async(self, payload):

140

# Simulate async processing

141

time.sleep(0.1)

142

return f"Processed {payload['data_id']}"

143

144

def test_async_processing():

145

container = ServiceContainer(AsyncProcessingService, config={})

146

container.start()

147

148

try:

149

# Wait for the event handler to complete

150

with entrypoint_waiter(container, 'process_data_async', timeout=5):

151

# Trigger the event handler

152

# (In real test, this would be triggered by actual event)

153

hook = entrypoint_hook(container, 'process_data_async')

154

155

# Run in separate thread to simulate async behavior

156

def trigger_event():

157

hook({'data_id': 'test-123'})

158

159

thread = threading.Thread(target=trigger_event)

160

thread.start()

161

162

# If we reach here, the handler completed successfully

163

assert True

164

finally:

165

container.stop()

166

```

167

168

### Dependency Replacement

169

170

Utility for replacing service dependencies with test doubles, mocks, or alternative implementations.

171

172

```python { .api }

173

def replace_dependencies(container, **dependencies):

174

"""

175

Replace dependencies in a service container for testing.

176

177

Parameters:

178

- container: Service container instance

179

- **dependencies: Keyword arguments mapping dependency names to replacement objects

180

"""

181

```

182

183

**Usage Example:**

184

185

```python

186

from nameko.testing.services import replace_dependencies

187

from unittest.mock import Mock

188

189

class OrderService:

190

name = "order_service"

191

192

payment_service = RpcProxy('payment_service')

193

database = DatabaseProvider()

194

195

@rpc

196

def create_order(self, order_data):

197

# Save order to database

198

order_id = self.database.save_order(order_data)

199

200

# Process payment

201

payment_result = self.payment_service.process_payment(order_data['payment'])

202

203

return {

204

'order_id': order_id,

205

'payment_status': payment_result['status']

206

}

207

208

def test_create_order_with_mocked_dependencies():

209

# Create service container

210

container = ServiceContainer(OrderService, config={})

211

212

# Create mocks

213

mock_payment_service = Mock()

214

mock_payment_service.process_payment.return_value = {'status': 'success'}

215

216

mock_database = Mock()

217

mock_database.save_order.return_value = 'order-123'

218

219

# Replace dependencies

220

replace_dependencies(

221

container,

222

payment_service=mock_payment_service,

223

database=mock_database

224

)

225

226

container.start()

227

228

try:

229

# Get service worker and test

230

worker = container.service

231

result = worker.create_order({

232

'items': [{'id': 1, 'quantity': 2}],

233

'payment': {'method': 'credit_card', 'amount': 100}

234

})

235

236

assert result['order_id'] == 'order-123'

237

assert result['payment_status'] == 'success'

238

239

# Verify mock calls

240

mock_database.save_order.assert_called_once()

241

mock_payment_service.process_payment.assert_called_once()

242

finally:

243

container.stop()

244

```

245

246

### Entrypoint Restriction

247

248

Utility for limiting which entrypoints are active during testing, useful for isolating specific functionality.

249

250

```python { .api }

251

def restrict_entrypoints(container, *entrypoints):

252

"""

253

Restrict active entrypoints in a service container to specified ones.

254

255

Parameters:

256

- container: Service container instance

257

- *entrypoints: Names of entrypoints to keep active (all others disabled)

258

"""

259

```

260

261

**Usage Example:**

262

263

```python

264

from nameko.testing.services import restrict_entrypoints

265

266

class MultiEntrypointService:

267

name = "multi_service"

268

269

@rpc

270

def rpc_method(self):

271

return "rpc response"

272

273

@http('GET', '/api/data')

274

def http_method(self, request):

275

return "http response"

276

277

@event_handler('other_service', 'test_event')

278

def event_method(self, payload):

279

return "event processed"

280

281

@timer(interval=60)

282

def timer_method(self):

283

return "timer executed"

284

285

def test_only_rpc_entrypoint():

286

"""Test with only RPC entrypoint active"""

287

container = ServiceContainer(MultiEntrypointService, config={})

288

289

# Restrict to only RPC entrypoint

290

restrict_entrypoints(container, 'rpc_method')

291

292

container.start()

293

294

try:

295

# RPC method should work

296

hook = entrypoint_hook(container, 'rpc_method')

297

result = hook()

298

assert result == "rpc response"

299

300

# Other entrypoints should be disabled

301

# (HTTP, event, timer entrypoints won't be active)

302

303

finally:

304

container.stop()

305

306

def test_multiple_entrypoints():

307

"""Test with multiple specific entrypoints active"""

308

container = ServiceContainer(MultiEntrypointService, config={})

309

310

# Keep both RPC and event entrypoints active

311

restrict_entrypoints(container, 'rpc_method', 'event_method')

312

313

container.start()

314

315

try:

316

# Both RPC and event methods should work

317

rpc_hook = entrypoint_hook(container, 'rpc_method')

318

event_hook = entrypoint_hook(container, 'event_method')

319

320

assert rpc_hook() == "rpc response"

321

assert event_hook({'test': 'data'}) == "event processed"

322

323

# HTTP and timer entrypoints are disabled

324

325

finally:

326

container.stop()

327

```

328

329

### Mock Entrypoints and Extensions

330

331

Testing utilities for creating mock entrypoints and dependency providers.

332

333

```python { .api }

334

class MockDependencyProvider:

335

"""

336

Mock dependency provider for testing.

337

338

Parameters:

339

- attr_name: Name of the dependency attribute on the service

340

- dependency: Mock object to inject (defaults to Mock())

341

"""

342

343

def __init__(self, attr_name, dependency=None): ...

344

345

def once(*args, **kwargs):

346

"""

347

Decorator that creates an entrypoint that fires only once for testing.

348

Useful for testing specific execution paths without ongoing triggers.

349

"""

350

351

@once

352

def test_entrypoint(self):

353

"""Example once-only entrypoint"""

354

...

355

356

def dummy(*args, **kwargs):

357

"""

358

Decorator that creates a dummy entrypoint for testing.

359

The entrypoint is registered but does nothing, useful for testing

360

service structure without external triggers.

361

"""

362

363

@dummy

364

def placeholder_entrypoint(self):

365

"""Example dummy entrypoint"""

366

...

367

```

368

369

**Mock Usage Example:**

370

371

```python

372

from nameko.testing.services import MockDependencyProvider, worker_factory

373

from unittest.mock import Mock

374

375

class ServiceWithDependencies:

376

name = "test_service"

377

378

database = DatabaseProvider()

379

cache = CacheProvider()

380

external_api = HttpClient()

381

382

@rpc

383

def complex_operation(self, data):

384

# Use multiple dependencies

385

db_result = self.database.query(data['query'])

386

cached_data = self.cache.get(data['cache_key'])

387

api_response = self.external_api.post('/endpoint', data)

388

389

return {

390

'db_result': db_result,

391

'cached_data': cached_data,

392

'api_response': api_response

393

}

394

395

def test_with_mock_dependencies():

396

# Create specific mocks for each dependency

397

mock_db = Mock()

398

mock_db.query.return_value = {'id': 1, 'name': 'test'}

399

400

mock_cache = Mock()

401

mock_cache.get.return_value = 'cached_value'

402

403

mock_api = Mock()

404

mock_api.post.return_value = {'status': 'success'}

405

406

# Create worker with all mocked dependencies

407

worker = worker_factory(

408

ServiceWithDependencies,

409

database=mock_db,

410

cache=mock_cache,

411

external_api=mock_api

412

)

413

414

# Test the service method

415

result = worker.complex_operation({

416

'query': 'SELECT * FROM users',

417

'cache_key': 'user:123'

418

})

419

420

# Verify results

421

assert result['db_result']['name'] == 'test'

422

assert result['cached_data'] == 'cached_value'

423

assert result['api_response']['status'] == 'success'

424

425

# Verify mock interactions

426

mock_db.query.assert_called_once_with('SELECT * FROM users')

427

mock_cache.get.assert_called_once_with('user:123')

428

mock_api.post.assert_called_once()

429

```

430

431

### Integration Testing

432

433

Tools for integration testing with real message brokers and external services.

434

435

```python { .api }

436

from nameko.testing.pytest import NamekoTestEnvironment

437

438

class NamekoTestEnvironment:

439

"""

440

Test environment for integration testing with real AMQP broker.

441

442

Provides utilities for running services with real message passing

443

while maintaining test isolation.

444

"""

445

446

def __init__(self, config=None): ...

447

448

def start_service(self, service_cls): ...

449

450

def stop_all_services(self): ...

451

452

def get_rpc_proxy(self, service_name): ...

453

454

def dispatch_event(self, source_service, event_type, event_data): ...

455

```

456

457

**Integration Test Example:**

458

459

```python

460

import pytest

461

from nameko.testing.pytest import NamekoTestEnvironment

462

463

@pytest.fixture

464

def test_env():

465

"""Create test environment with real AMQP broker"""

466

config = {

467

'AMQP_URI': 'amqp://guest:guest@localhost:5672/test_vhost'

468

}

469

env = NamekoTestEnvironment(config)

470

yield env

471

env.stop_all_services()

472

473

def test_service_integration(test_env):

474

# Start multiple services

475

test_env.start_service(UserService)

476

test_env.start_service(EmailService)

477

test_env.start_service(OrderService)

478

479

# Get RPC proxy for testing

480

user_rpc = test_env.get_rpc_proxy('user_service')

481

482

# Test inter-service communication

483

user_result = user_rpc.create_user({

484

'name': 'Test User',

485

'email': 'test@example.com'

486

})

487

488

assert user_result['user_id'] is not None

489

490

# Test event-driven behavior

491

test_env.dispatch_event('user_service', 'user_created', {

492

'user_id': user_result['user_id'],

493

'email': 'test@example.com'

494

})

495

496

# Verify that event was processed (check side effects)

497

# This would typically involve checking database state,

498

# file system, or other observable effects

499

```

500

501

### HTTP Endpoint Testing

502

503

Testing utilities specifically for HTTP endpoints and web interfaces.

504

505

```python

506

from nameko.testing.services import worker_factory

507

from werkzeug.test import Client

508

from werkzeug.wrappers import Response

509

510

class APIService:

511

name = "api_service"

512

513

@http('GET', '/users/<int:user_id>')

514

def get_user(self, request, user_id):

515

return {'user_id': user_id, 'name': 'Test User'}

516

517

@http('POST', '/users')

518

def create_user(self, request):

519

data = request.get_json()

520

return {'user_id': 123, 'name': data['name']}, 201

521

522

def test_http_endpoints():

523

# Create worker for HTTP service

524

worker = worker_factory(APIService)

525

526

# Test GET endpoint

527

from werkzeug.test import EnvironBuilder

528

from werkzeug.wrappers import Request

529

530

# Create mock request

531

builder = EnvironBuilder(path='/users/456', method='GET')

532

request = Request(builder.get_environ())

533

534

response = worker.get_user(request, user_id=456)

535

assert response['user_id'] == 456

536

537

# Test POST endpoint

538

builder = EnvironBuilder(

539

path='/users',

540

method='POST',

541

data='{"name": "New User"}',

542

content_type='application/json'

543

)

544

request = Request(builder.get_environ())

545

546

response, status_code = worker.create_user(request)

547

assert status_code == 201

548

assert response['name'] == 'New User'

549

```

550

551

### Test Configuration

552

553

Best practices for test configuration and environment management.

554

555

```python

556

# conftest.py - pytest configuration

557

import pytest

558

from nameko.testing.pytest import NamekoTestEnvironment

559

560

@pytest.fixture(scope='session')

561

def test_config():

562

"""Test configuration with isolated resources"""

563

return {

564

'AMQP_URI': 'amqp://guest:guest@localhost:5672/test_vhost',

565

'DATABASE_URL': 'sqlite:///:memory:',

566

'REDIS_URL': 'redis://localhost:6379/15', # Use test database

567

'DEBUG': True,

568

'TESTING': True

569

}

570

571

@pytest.fixture

572

def test_env(test_config):

573

"""Isolated test environment per test"""

574

env = NamekoTestEnvironment(test_config)

575

yield env

576

env.stop_all_services()

577

578

# Test database setup

579

@pytest.fixture

580

def clean_database():

581

"""Ensure clean database state for each test"""

582

# Setup test database

583

setup_test_database()

584

yield

585

# Cleanup test database

586

cleanup_test_database()

587

```

588

589

### Performance Testing

590

591

Utilities for load testing and performance measurement.

592

593

```python

594

import time

595

from concurrent.futures import ThreadPoolExecutor, as_completed

596

597

def load_test_rpc_service(service_name, method_name, payload, num_requests=100, concurrency=10):

598

"""Load test an RPC service method"""

599

600

def make_request():

601

start_time = time.time()

602

try:

603

# Make RPC call

604

rpc_proxy = get_rpc_proxy(service_name)

605

result = getattr(rpc_proxy, method_name)(payload)

606

return time.time() - start_time, True, result

607

except Exception as e:

608

return time.time() - start_time, False, str(e)

609

610

# Execute concurrent requests

611

with ThreadPoolExecutor(max_workers=concurrency) as executor:

612

futures = [executor.submit(make_request) for _ in range(num_requests)]

613

614

response_times = []

615

success_count = 0

616

617

for future in as_completed(futures):

618

duration, success, result = future.result()

619

response_times.append(duration)

620

if success:

621

success_count += 1

622

623

# Calculate statistics

624

avg_response_time = sum(response_times) / len(response_times)

625

success_rate = success_count / num_requests

626

627

return {

628

'avg_response_time': avg_response_time,

629

'success_rate': success_rate,

630

'total_requests': num_requests,

631

'concurrency': concurrency

632

}

633

634

def test_service_performance():

635

"""Test service performance under load"""

636

stats = load_test_rpc_service(

637

'user_service',

638

'get_user',

639

{'user_id': 123},

640

num_requests=1000,

641

concurrency=50

642

)

643

644

# Assert performance requirements

645

assert stats['avg_response_time'] < 0.1 # Less than 100ms average

646

assert stats['success_rate'] > 0.99 # 99% success rate

647

```