or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mddependency-injection.mdevent-system.mdhttp-interface.mdindex.mdrpc-communication.mdservice-management.mdstandalone-clients.mdtesting-framework.mdtimer-scheduling.md

standalone-clients.mddocs/

0

# Standalone Clients

1

2

Client libraries for interacting with nameko services from non-nameko applications, supporting both RPC calls and event publishing with connection management and error handling.

3

4

## Capabilities

5

6

### Service RPC Proxy

7

8

Client for making RPC calls to specific nameko services from standalone applications.

9

10

```python { .api }

11

class ServiceRpcProxy:

12

"""

13

Standalone RPC client for calling a specific nameko service.

14

15

Parameters:

16

- service_name: Name of the target service

17

- config: Configuration dictionary with AMQP connection details

18

- context_data: Optional context data to pass with all calls

19

- timeout: Request timeout in seconds

20

"""

21

22

def __init__(self, service_name, config, context_data=None, timeout=None): ...

23

24

def __enter__(self): ...

25

26

def __exit__(self, exc_type, exc_val, exc_tb): ...

27

```

28

29

**Usage Example:**

30

31

```python

32

from nameko.standalone.rpc import ServiceRpcProxy

33

34

# Configuration for AMQP connection

35

config = {

36

'AMQP_URI': 'amqp://guest:guest@localhost:5672//'

37

}

38

39

# Using context manager (recommended)

40

with ServiceRpcProxy('user_service', config) as user_rpc:

41

# Call service methods

42

user = user_rpc.get_user(user_id=123)

43

print(f"User: {user['name']} ({user['email']})")

44

45

# Create new user

46

new_user = user_rpc.create_user({

47

'name': 'John Doe',

48

'email': 'john@example.com'

49

})

50

print(f"Created user with ID: {new_user['user_id']}")

51

52

# Manual connection management

53

user_rpc = ServiceRpcProxy('user_service', config)

54

try:

55

user = user_rpc.get_user(user_id=456)

56

print(user)

57

finally:

58

user_rpc.stop() # Always close connection

59

```

60

61

### Cluster RPC Proxy

62

63

Client that can call multiple services with service discovery and connection pooling.

64

65

```python { .api }

66

class ClusterRpcProxy:

67

"""

68

Standalone RPC client for calling multiple nameko services.

69

70

Parameters:

71

- config: Configuration dictionary with AMQP connection details

72

- context_data: Optional context data to pass with all calls

73

- timeout: Request timeout in seconds

74

"""

75

76

def __init__(self, config, context_data=None, timeout=None): ...

77

78

def __enter__(self): ...

79

80

def __exit__(self, exc_type, exc_val, exc_tb): ...

81

82

def __getattr__(self, service_name): ...

83

```

84

85

**Usage Example:**

86

87

```python

88

from nameko.standalone.rpc import ClusterRpcProxy

89

90

config = {

91

'AMQP_URI': 'amqp://guest:guest@localhost:5672//',

92

'RPC_TIMEOUT': 30 # Default timeout for all calls

93

}

94

95

# Using context manager for multiple services

96

with ClusterRpcProxy(config) as cluster_rpc:

97

# Call different services through dynamic attributes

98

user = cluster_rpc.user_service.get_user(123)

99

order = cluster_rpc.order_service.create_order({

100

'user_id': user['user_id'],

101

'items': [{'product_id': 1, 'quantity': 2}]

102

})

103

104

# Send notification about the order

105

cluster_rpc.notification_service.send_notification({

106

'user_id': user['user_id'],

107

'message': f'Order {order["order_id"]} created successfully'

108

})

109

110

# With context data for request tracing

111

context_data = {

112

'correlation_id': 'web-request-456',

113

'user_id': 123

114

}

115

116

with ClusterRpcProxy(config, context_data=context_data) as cluster_rpc:

117

# All calls will include the context data

118

result = cluster_rpc.audit_service.log_action('user_login', {

119

'timestamp': time.time()

120

})

121

```

122

123

### Event Dispatcher

124

125

Function for publishing events to nameko services from standalone applications.

126

127

```python { .api }

128

def event_dispatcher(config):

129

"""

130

Create an event dispatcher for publishing events.

131

132

Parameters:

133

- config: Configuration dictionary with AMQP connection details

134

135

Returns:

136

Event dispatcher function that can publish events

137

"""

138

```

139

140

**Usage Example:**

141

142

```python

143

from nameko.standalone.events import event_dispatcher

144

145

config = {

146

'AMQP_URI': 'amqp://guest:guest@localhost:5672//'

147

}

148

149

# Create event dispatcher

150

dispatch_event = event_dispatcher(config)

151

152

# Publish events to services

153

dispatch_event('external_system', 'data_imported', {

154

'file_path': '/data/import/users.csv',

155

'record_count': 1000,

156

'import_id': 'import-123',

157

'timestamp': time.time()

158

})

159

160

dispatch_event('payment_gateway', 'payment_received', {

161

'order_id': 'order-456',

162

'amount': 99.99,

163

'currency': 'USD',

164

'payment_method': 'credit_card',

165

'transaction_id': 'txn-789'

166

})

167

168

# The dispatcher automatically handles connection cleanup

169

```

170

171

### Event Exchange Management

172

173

Function for getting event exchanges for manual event publishing with more control.

174

175

```python { .api }

176

def get_event_exchange(config):

177

"""

178

Get event exchange for manual event publishing.

179

180

Parameters:

181

- config: Configuration dictionary with AMQP connection details

182

183

Returns:

184

Exchange object for publishing events with full control

185

"""

186

```

187

188

**Usage Example:**

189

190

```python

191

from nameko.standalone.events import get_event_exchange

192

import json

193

194

config = {

195

'AMQP_URI': 'amqp://guest:guest@localhost:5672//'

196

}

197

198

# Get exchange for manual publishing

199

exchange = get_event_exchange(config)

200

201

try:

202

# Manual event publishing with full control

203

event_data = {

204

'user_id': 123,

205

'action': 'profile_updated',

206

'changes': ['email', 'phone'],

207

'timestamp': time.time()

208

}

209

210

# Publish with specific routing key and headers

211

exchange.publish(

212

message=json.dumps(event_data),

213

routing_key='user_service.profile_updated',

214

headers={

215

'source_service': 'web_app',

216

'event_type': 'profile_updated',

217

'correlation_id': 'web-session-789'

218

}

219

)

220

221

finally:

222

exchange.close()

223

```

224

225

### Web Application Integration

226

227

Common patterns for integrating nameko services with web frameworks.

228

229

**Flask Integration Example:**

230

231

```python

232

from flask import Flask, request, jsonify

233

from nameko.standalone.rpc import ClusterRpcProxy

234

235

app = Flask(__name__)

236

237

# Nameko configuration

238

NAMEKO_CONFIG = {

239

'AMQP_URI': 'amqp://guest:guest@localhost:5672//'

240

}

241

242

@app.route('/api/users/<int:user_id>', methods=['GET'])

243

def get_user(user_id):

244

"""Get user via nameko service"""

245

with ClusterRpcProxy(NAMEKO_CONFIG) as rpc:

246

try:

247

user = rpc.user_service.get_user(user_id)

248

return jsonify(user)

249

except Exception as e:

250

return jsonify({'error': str(e)}), 404

251

252

@app.route('/api/users', methods=['POST'])

253

def create_user():

254

"""Create user via nameko service"""

255

user_data = request.get_json()

256

257

with ClusterRpcProxy(NAMEKO_CONFIG) as rpc:

258

try:

259

user = rpc.user_service.create_user(user_data)

260

return jsonify(user), 201

261

except Exception as e:

262

return jsonify({'error': str(e)}), 400

263

264

@app.route('/api/orders', methods=['POST'])

265

def create_order():

266

"""Create order and send notification"""

267

order_data = request.get_json()

268

269

# Add correlation ID for tracing

270

context_data = {

271

'correlation_id': f'web-{request.headers.get("X-Request-ID", "unknown")}',

272

'source': 'web_api'

273

}

274

275

with ClusterRpcProxy(NAMEKO_CONFIG, context_data=context_data) as rpc:

276

try:

277

# Create order

278

order = rpc.order_service.create_order(order_data)

279

280

# Send notification

281

rpc.notification_service.send_order_confirmation({

282

'order_id': order['order_id'],

283

'user_id': order_data['user_id']

284

})

285

286

return jsonify(order), 201

287

except Exception as e:

288

return jsonify({'error': str(e)}), 400

289

290

if __name__ == '__main__':

291

app.run(debug=True)

292

```

293

294

**Django Integration Example:**

295

296

```python

297

# django_app/services.py

298

from django.conf import settings

299

from nameko.standalone.rpc import ClusterRpcProxy

300

from nameko.standalone.events import event_dispatcher

301

302

class NamekoService:

303

"""Django service layer for nameko integration"""

304

305

def __init__(self):

306

self.config = {

307

'AMQP_URI': settings.NAMEKO_AMQP_URI

308

}

309

self.dispatch_event = event_dispatcher(self.config)

310

311

def get_user_data(self, user_id):

312

"""Get enriched user data from nameko services"""

313

with ClusterRpcProxy(self.config) as rpc:

314

user = rpc.user_service.get_user(user_id)

315

preferences = rpc.preference_service.get_user_preferences(user_id)

316

317

return {

318

**user,

319

'preferences': preferences

320

}

321

322

def notify_user_action(self, user_id, action, data):

323

"""Notify nameko services of user actions"""

324

self.dispatch_event('django_app', f'user_{action}', {

325

'user_id': user_id,

326

'action': action,

327

'data': data,

328

'timestamp': time.time()

329

})

330

331

# django_app/views.py

332

from django.http import JsonResponse

333

from django.views import View

334

from .services import NamekoService

335

336

class UserAPIView(View):

337

def __init__(self):

338

super().__init__()

339

self.nameko = NamekoService()

340

341

def get(self, request, user_id):

342

try:

343

user_data = self.nameko.get_user_data(user_id)

344

return JsonResponse(user_data)

345

except Exception as e:

346

return JsonResponse({'error': str(e)}, status=400)

347

348

def post(self, request, user_id):

349

# Handle user action

350

action_data = json.loads(request.body)

351

352

# Notify nameko services

353

self.nameko.notify_user_action(

354

user_id,

355

action_data['action'],

356

action_data.get('data', {})

357

)

358

359

return JsonResponse({'status': 'success'})

360

```

361

362

### Error Handling and Retries

363

364

Robust error handling patterns for standalone clients.

365

366

```python

367

from nameko.standalone.rpc import ServiceRpcProxy

368

from nameko.exceptions import RemoteError, ServiceNotFound

369

import time

370

import logging

371

372

class ResilientNamekoClient:

373

"""Wrapper for resilient nameko service calls"""

374

375

def __init__(self, config, max_retries=3, retry_delay=1):

376

self.config = config

377

self.max_retries = max_retries

378

self.retry_delay = retry_delay

379

self.logger = logging.getLogger(__name__)

380

381

def call_service(self, service_name, method_name, *args, **kwargs):

382

"""Call service method with automatic retries"""

383

384

for attempt in range(self.max_retries + 1):

385

try:

386

with ServiceRpcProxy(service_name, self.config) as rpc:

387

method = getattr(rpc, method_name)

388

return method(*args, **kwargs)

389

390

except ServiceNotFound:

391

self.logger.error(f"Service {service_name} not found")

392

raise # Don't retry for service not found

393

394

except RemoteError as e:

395

self.logger.warning(f"Remote error on attempt {attempt + 1}: {e}")

396

if attempt == self.max_retries:

397

raise

398

time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff

399

400

except Exception as e:

401

self.logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")

402

if attempt == self.max_retries:

403

raise

404

time.sleep(self.retry_delay)

405

406

# Usage

407

client = ResilientNamekoClient(config, max_retries=3)

408

409

try:

410

user = client.call_service('user_service', 'get_user', user_id=123)

411

print(user)

412

except Exception as e:

413

print(f"Failed to get user after retries: {e}")

414

```

415

416

### Connection Pooling

417

418

Advanced connection management for high-throughput applications.

419

420

```python

421

from nameko.standalone.rpc import ClusterRpcProxy

422

from contextlib import contextmanager

423

import threading

424

import queue

425

426

class NamekoConnectionPool:

427

"""Connection pool for nameko RPC clients"""

428

429

def __init__(self, config, pool_size=10):

430

self.config = config

431

self.pool_size = pool_size

432

self.pool = queue.Queue(maxsize=pool_size)

433

self.lock = threading.Lock()

434

435

# Pre-populate pool

436

for _ in range(pool_size):

437

proxy = ClusterRpcProxy(config)

438

proxy.start() # Initialize connection

439

self.pool.put(proxy)

440

441

@contextmanager

442

def get_proxy(self):

443

"""Get proxy from pool with automatic return"""

444

proxy = self.pool.get()

445

try:

446

yield proxy

447

finally:

448

self.pool.put(proxy)

449

450

def close_all(self):

451

"""Close all connections in pool"""

452

while not self.pool.empty():

453

proxy = self.pool.get()

454

proxy.stop()

455

456

# Usage in high-throughput application

457

pool = NamekoConnectionPool(config, pool_size=20)

458

459

def process_request(request_data):

460

with pool.get_proxy() as rpc:

461

# Process request using pooled connection

462

result = rpc.processor_service.process_data(request_data)

463

return result

464

465

# Cleanup on application shutdown

466

import atexit

467

atexit.register(pool.close_all)

468

```