or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-proxies.mdcontext-objects.mdexceptions.mdgrpc.mdindex.mdmessage-patterns.mdmodules.mdtransports.md

context-objects.mddocs/

0

# Context Objects

1

2

Transport-specific context objects providing access to underlying protocol details and metadata for message handling, enabling fine-grained control over microservice communication and transport-specific operations.

3

4

## Capabilities

5

6

### Base RPC Context

7

8

Abstract base class for all RPC context implementations providing common context operations.

9

10

```typescript { .api }

11

/**

12

* Base class for all RPC context implementations

13

*/

14

abstract class BaseRpcContext<T = any[]> {

15

/**

16

* Returns the arguments array from the context

17

* @returns Arguments array with type information

18

*/

19

getArgs(): T;

20

21

/**

22

* Returns specific argument by index position

23

* @param index - Zero-based index of the argument

24

* @returns Argument value at the specified index

25

*/

26

getArgByIndex<T = any>(index: number): T;

27

}

28

```

29

30

**Usage Examples:**

31

32

```typescript

33

import { Controller } from '@nestjs/common';

34

import { MessagePattern, Ctx } from '@nestjs/microservices';

35

import { BaseRpcContext } from '@nestjs/microservices';

36

37

@Controller()

38

export class BaseContextController {

39

@MessagePattern('process_args')

40

processWithArgs(@Ctx() context: BaseRpcContext): any {

41

const allArgs = context.getArgs();

42

const firstArg = context.getArgByIndex(0);

43

const secondArg = context.getArgByIndex(1);

44

45

return {

46

argCount: allArgs.length,

47

first: firstArg,

48

second: secondArg

49

};

50

}

51

}

52

```

53

54

### Kafka Context

55

56

Kafka-specific context providing access to Kafka message metadata, consumer, and producer instances.

57

58

```typescript { .api }

59

/**

60

* Kafka-specific context implementation

61

*/

62

class KafkaContext extends BaseRpcContext {

63

/**

64

* Returns the original Kafka message object

65

* @returns Kafka message with headers, key, value, etc.

66

*/

67

getMessage(): any;

68

69

/**

70

* Returns the partition number for the message

71

* @returns Partition number

72

*/

73

getPartition(): number;

74

75

/**

76

* Returns the topic name for the message

77

* @returns Topic name string

78

*/

79

getTopic(): string;

80

81

/**

82

* Returns reference to the Kafka consumer instance

83

* @returns Kafka consumer for manual operations

84

*/

85

getConsumer(): any;

86

87

/**

88

* Returns the heartbeat callback function

89

* @returns Heartbeat function for consumer group coordination

90

*/

91

getHeartbeat(): () => Promise<void>;

92

93

/**

94

* Returns reference to the Kafka producer instance

95

* @returns Kafka producer for sending messages

96

*/

97

getProducer(): any;

98

}

99

```

100

101

**Usage Examples:**

102

103

```typescript

104

import { Controller, Logger } from '@nestjs/common';

105

import { EventPattern, MessagePattern, Payload, Ctx } from '@nestjs/microservices';

106

import { KafkaContext } from '@nestjs/microservices';

107

108

@Controller()

109

export class KafkaController {

110

private readonly logger = new Logger(KafkaController.name);

111

112

@EventPattern('user.events.created')

113

async handleUserCreated(

114

@Payload() userData: any,

115

@Ctx() context: KafkaContext

116

): Promise<void> {

117

const message = context.getMessage();

118

const topic = context.getTopic();

119

const partition = context.getPartition();

120

121

this.logger.log(`Processing user creation from topic: ${topic}, partition: ${partition}`);

122

this.logger.debug(`Message headers:`, message.headers);

123

this.logger.debug(`Message key:`, message.key?.toString());

124

125

// Manual heartbeat to prevent session timeout for long processing

126

const heartbeat = context.getHeartbeat();

127

await heartbeat();

128

129

await this.processUserCreation(userData);

130

}

131

132

@MessagePattern('user.commands.process')

133

async processUserCommand(

134

@Payload() command: any,

135

@Ctx() context: KafkaContext

136

): Promise<any> {

137

const producer = context.getProducer();

138

const consumer = context.getConsumer();

139

140

try {

141

const result = await this.executeCommand(command);

142

143

// Send result to another topic using the producer

144

await producer.send({

145

topic: 'user.events.processed',

146

messages: [{

147

key: command.userId,

148

value: JSON.stringify(result),

149

headers: {

150

'correlation-id': context.getMessage().headers['correlation-id'],

151

'processed-at': Date.now().toString()

152

}

153

}]

154

});

155

156

return result;

157

} catch (error) {

158

// Manual offset management if needed

159

const { topic, partition, offset } = context.getMessage();

160

await consumer.commitOffsets([{

161

topic,

162

partition,

163

offset: (parseInt(offset) + 1).toString()

164

}]);

165

166

throw error;

167

}

168

}

169

}

170

```

171

172

### TCP Context

173

174

TCP-specific context providing access to the underlying socket connection and pattern information.

175

176

```typescript { .api }

177

/**

178

* TCP-specific context implementation

179

*/

180

class TcpContext extends BaseRpcContext {

181

/**

182

* Returns reference to the underlying JSON socket

183

* @returns JSON socket instance for direct socket operations

184

*/

185

getSocketRef(): any;

186

187

/**

188

* Returns the pattern name used for message routing

189

* @returns Pattern identifier

190

*/

191

getPattern(): string;

192

}

193

```

194

195

**Usage Examples:**

196

197

```typescript

198

import { Controller, Logger } from '@nestjs/common';

199

import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';

200

import { TcpContext } from '@nestjs/microservices';

201

202

@Controller()

203

export class TcpController {

204

private readonly logger = new Logger(TcpController.name);

205

206

@MessagePattern({ cmd: 'process_with_socket' })

207

processWithSocket(

208

@Payload() data: any,

209

@Ctx() context: TcpContext

210

): any {

211

const socket = context.getSocketRef();

212

const pattern = context.getPattern();

213

214

this.logger.log(`Processing command: ${JSON.stringify(pattern)}`);

215

216

// Access socket properties

217

const clientAddress = socket.socket.remoteAddress;

218

const clientPort = socket.socket.remotePort;

219

220

this.logger.log(`Request from client: ${clientAddress}:${clientPort}`);

221

222

// Direct socket operations if needed

223

socket.write(JSON.stringify({

224

type: 'ack',

225

pattern: pattern,

226

timestamp: Date.now()

227

}));

228

229

return this.processData(data, { clientAddress, pattern });

230

}

231

232

@EventPattern('tcp_notification')

233

handleTcpNotification(

234

@Payload() notification: any,

235

@Ctx() context: TcpContext

236

): void {

237

const socket = context.getSocketRef();

238

const pattern = context.getPattern();

239

240

// Log connection details

241

this.logger.log(`TCP notification: ${pattern} from ${socket.socket.remoteAddress}`);

242

243

// Send immediate acknowledgment

244

socket.write(JSON.stringify({

245

type: 'notification_received',

246

id: notification.id,

247

timestamp: Date.now()

248

}));

249

250

this.processNotification(notification);

251

}

252

}

253

```

254

255

### NATS Context

256

257

NATS-specific context providing access to subject and message headers.

258

259

```typescript { .api }

260

/**

261

* NATS-specific context implementation

262

*/

263

class NatsContext extends BaseRpcContext {

264

/**

265

* Returns the NATS subject name

266

* @returns Subject string used for message routing

267

*/

268

getSubject(): string;

269

270

/**

271

* Returns message headers if available

272

* @returns Message headers object

273

*/

274

getHeaders(): Record<string, any>;

275

}

276

```

277

278

**Usage Examples:**

279

280

```typescript

281

import { Controller, Logger } from '@nestjs/common';

282

import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';

283

import { NatsContext } from '@nestjs/microservices';

284

285

@Controller()

286

export class NatsController {

287

private readonly logger = new Logger(NatsController.name);

288

289

@MessagePattern('user.*.get')

290

getUserBySubject(

291

@Payload() request: any,

292

@Ctx() context: NatsContext

293

): any {

294

const subject = context.getSubject();

295

const headers = context.getHeaders();

296

297

// Extract user type from subject pattern

298

const userType = subject.split('.')[1]; // e.g., 'admin' from 'user.admin.get'

299

300

this.logger.log(`Getting ${userType} user with ID: ${request.id}`);

301

this.logger.debug(`Request headers:`, headers);

302

303

return this.userService.getByType(userType, request.id, {

304

requestId: headers?.['request-id'],

305

userAgent: headers?.['user-agent']

306

});

307

}

308

309

@EventPattern('events.*.created')

310

handleEntityCreated(

311

@Payload() entity: any,

312

@Ctx() context: NatsContext

313

): void {

314

const subject = context.getSubject();

315

const headers = context.getHeaders();

316

const entityType = subject.split('.')[1]; // Extract entity type from subject

317

318

this.logger.log(`${entityType} created with ID: ${entity.id}`);

319

320

// Use headers for tracking and correlation

321

if (headers?.['correlation-id']) {

322

this.trackingService.correlateEvent(headers['correlation-id'], entity);

323

}

324

325

this.processEntityCreation(entityType, entity);

326

}

327

}

328

```

329

330

### MQTT Context

331

332

MQTT-specific context providing access to topic and original MQTT packet information.

333

334

```typescript { .api }

335

/**

336

* MQTT-specific context implementation

337

*/

338

class MqttContext extends BaseRpcContext {

339

/**

340

* Returns the MQTT topic name

341

* @returns Topic string used for message routing

342

*/

343

getTopic(): string;

344

345

/**

346

* Returns the original MQTT packet

347

* @returns MQTT packet with QoS, retain flag, etc.

348

*/

349

getPacket(): any;

350

}

351

```

352

353

**Usage Examples:**

354

355

```typescript

356

import { Controller, Logger } from '@nestjs/common';

357

import { EventPattern, Payload, Ctx } from '@nestjs/microservices';

358

import { MqttContext } from '@nestjs/microservices';

359

360

@Controller()

361

export class MqttController {

362

private readonly logger = new Logger(MqttController.name);

363

364

@EventPattern('sensors/+/temperature')

365

handleTemperatureReading(

366

@Payload() reading: any,

367

@Ctx() context: MqttContext

368

): void {

369

const topic = context.getTopic();

370

const packet = context.getPacket();

371

372

// Extract sensor ID from topic (e.g., 'sensors/sensor001/temperature')

373

const sensorId = topic.split('/')[1];

374

375

this.logger.log(`Temperature reading from sensor ${sensorId}: ${reading.value}°C`);

376

this.logger.debug(`MQTT QoS: ${packet.qos}, Retain: ${packet.retain}`);

377

378

// Handle retained messages differently

379

if (packet.retain) {

380

this.logger.log('Processing retained temperature message');

381

this.sensorService.updateLastKnownValue(sensorId, reading);

382

} else {

383

this.sensorService.processRealtimeReading(sensorId, reading);

384

}

385

}

386

387

@EventPattern('devices/+/status/#')

388

handleDeviceStatus(

389

@Payload() status: any,

390

@Ctx() context: MqttContext

391

): void {

392

const topic = context.getTopic();

393

const packet = context.getPacket();

394

395

const topicParts = topic.split('/');

396

const deviceId = topicParts[1];

397

const statusType = topicParts.slice(3).join('/'); // Everything after 'status/'

398

399

this.logger.log(`Device ${deviceId} status update: ${statusType}`);

400

401

// Use packet information for processing decisions

402

const priority = packet.qos === 2 ? 'high' : 'normal';

403

404

this.deviceService.updateStatus(deviceId, statusType, status, { priority });

405

}

406

}

407

```

408

409

### Redis Context

410

411

Redis-specific context providing access to channel information for pub/sub operations.

412

413

```typescript { .api }

414

/**

415

* Redis-specific context implementation

416

*/

417

class RedisContext extends BaseRpcContext {

418

/**

419

* Returns the Redis channel name

420

* @returns Channel name used for pub/sub

421

*/

422

getChannel(): string;

423

}

424

```

425

426

**Usage Examples:**

427

428

```typescript

429

import { Controller, Logger } from '@nestjs/common';

430

import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';

431

import { RedisContext } from '@nestjs/microservices';

432

433

@Controller()

434

export class RedisController {

435

private readonly logger = new Logger(RedisController.name);

436

437

@MessagePattern('cache:get:*')

438

getCacheValue(

439

@Payload() request: any,

440

@Ctx() context: RedisContext

441

): any {

442

const channel = context.getChannel();

443

444

// Extract cache key from channel pattern

445

const cacheKey = channel.replace('cache:get:', '');

446

447

this.logger.log(`Cache request for key: ${cacheKey}`);

448

449

return this.cacheService.get(cacheKey, request.options);

450

}

451

452

@EventPattern('notifications:*')

453

handleNotification(

454

@Payload() notification: any,

455

@Ctx() context: RedisContext

456

): void {

457

const channel = context.getChannel();

458

const notificationType = channel.split(':')[1];

459

460

this.logger.log(`Notification received on channel: ${channel}`);

461

462

switch (notificationType) {

463

case 'user':

464

this.userNotificationService.handle(notification);

465

break;

466

case 'system':

467

this.systemNotificationService.handle(notification);

468

break;

469

default:

470

this.logger.warn(`Unknown notification type: ${notificationType}`);

471

}

472

}

473

}

474

```

475

476

### RabbitMQ Context

477

478

RabbitMQ-specific context providing access to message, channel reference, and pattern information.

479

480

```typescript { .api }

481

/**

482

* RabbitMQ-specific context implementation

483

*/

484

class RmqContext extends BaseRpcContext {

485

/**

486

* Returns the original RabbitMQ message

487

* @returns RabbitMQ message with properties and fields

488

*/

489

getMessage(): any;

490

491

/**

492

* Returns reference to the RabbitMQ channel

493

* @returns Channel instance for manual operations

494

*/

495

getChannelRef(): any;

496

497

/**

498

* Returns the pattern name used for routing

499

* @returns Pattern/routing key string

500

*/

501

getPattern(): string;

502

}

503

```

504

505

**Usage Examples:**

506

507

```typescript

508

import { Controller, Logger } from '@nestjs/common';

509

import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';

510

import { RmqContext } from '@nestjs/microservices';

511

512

@Controller()

513

export class RmqController {

514

private readonly logger = new Logger(RmqController.name);

515

516

@MessagePattern('user.commands.process')

517

async processUserCommand(

518

@Payload() command: any,

519

@Ctx() context: RmqContext

520

): Promise<any> {

521

const message = context.getMessage();

522

const channel = context.getChannelRef();

523

const pattern = context.getPattern();

524

525

this.logger.log(`Processing command with routing key: ${pattern}`);

526

this.logger.debug(`Message properties:`, message.properties);

527

528

try {

529

const result = await this.commandService.process(command);

530

531

// Manual acknowledgment after successful processing

532

channel.ack(message);

533

534

// Publish result to another exchange

535

await channel.publish(

536

'results_exchange',

537

'user.results.processed',

538

Buffer.from(JSON.stringify(result)),

539

{

540

correlationId: message.properties.correlationId,

541

timestamp: Date.now(),

542

headers: {

543

'processed-by': 'user-service',

544

'original-routing-key': pattern

545

}

546

}

547

);

548

549

return result;

550

} catch (error) {

551

this.logger.error(`Command processing failed:`, error);

552

553

// Reject message and potentially requeue

554

const requeue = error.retryable === true;

555

channel.nack(message, false, requeue);

556

557

throw error;

558

}

559

}

560

561

@EventPattern('user.events.*')

562

handleUserEvent(

563

@Payload() event: any,

564

@Ctx() context: RmqContext

565

): void {

566

const message = context.getMessage();

567

const channel = context.getChannelRef();

568

const pattern = context.getPattern();

569

570

const eventType = pattern.split('.').pop(); // Extract event type

571

572

this.logger.log(`Handling user event: ${eventType}`);

573

574

try {

575

// Process event based on type

576

this.eventService.handle(eventType, event);

577

578

// Manual acknowledgment

579

channel.ack(message);

580

581

// Publish follow-up events if needed

582

if (eventType === 'created') {

583

channel.publish(

584

'notifications_exchange',

585

'notifications.welcome',

586

Buffer.from(JSON.stringify({ userId: event.id })),

587

{ correlationId: message.properties.correlationId }

588

);

589

}

590

} catch (error) {

591

this.logger.error(`Event handling failed:`, error);

592

channel.nack(message, false, false); // Don't requeue events

593

}

594

}

595

}

596

```