0
# Context Objects
1
2
Transport-specific context objects providing access to underlying protocol details and metadata for message handling, enabling fine-grained control over microservice communication and transport-specific operations.
3
4
## Capabilities
5
6
### Base RPC Context
7
8
Abstract base class for all RPC context implementations providing common context operations.
9
10
```typescript { .api }
11
/**
12
* Base class for all RPC context implementations
13
*/
14
abstract class BaseRpcContext<T = any[]> {
15
/**
16
* Returns the arguments array from the context
17
* @returns Arguments array with type information
18
*/
19
getArgs(): T;
20
21
/**
22
* Returns specific argument by index position
23
* @param index - Zero-based index of the argument
24
* @returns Argument value at the specified index
25
*/
26
getArgByIndex<T = any>(index: number): T;
27
}
28
```
29
30
**Usage Examples:**
31
32
```typescript
33
import { Controller } from '@nestjs/common';
34
import { MessagePattern, Ctx } from '@nestjs/microservices';
35
import { BaseRpcContext } from '@nestjs/microservices';
36
37
@Controller()
38
export class BaseContextController {
39
@MessagePattern('process_args')
40
processWithArgs(@Ctx() context: BaseRpcContext): any {
41
const allArgs = context.getArgs();
42
const firstArg = context.getArgByIndex(0);
43
const secondArg = context.getArgByIndex(1);
44
45
return {
46
argCount: allArgs.length,
47
first: firstArg,
48
second: secondArg
49
};
50
}
51
}
52
```
53
54
### Kafka Context
55
56
Kafka-specific context providing access to Kafka message metadata, consumer, and producer instances.
57
58
```typescript { .api }
59
/**
60
* Kafka-specific context implementation
61
*/
62
class KafkaContext extends BaseRpcContext {
63
/**
64
* Returns the original Kafka message object
65
* @returns Kafka message with headers, key, value, etc.
66
*/
67
getMessage(): any;
68
69
/**
70
* Returns the partition number for the message
71
* @returns Partition number
72
*/
73
getPartition(): number;
74
75
/**
76
* Returns the topic name for the message
77
* @returns Topic name string
78
*/
79
getTopic(): string;
80
81
/**
82
* Returns reference to the Kafka consumer instance
83
* @returns Kafka consumer for manual operations
84
*/
85
getConsumer(): any;
86
87
/**
88
* Returns the heartbeat callback function
89
* @returns Heartbeat function for consumer group coordination
90
*/
91
getHeartbeat(): () => Promise<void>;
92
93
/**
94
* Returns reference to the Kafka producer instance
95
* @returns Kafka producer for sending messages
96
*/
97
getProducer(): any;
98
}
99
```
100
101
**Usage Examples:**
102
103
```typescript
104
import { Controller, Logger } from '@nestjs/common';
105
import { EventPattern, MessagePattern, Payload, Ctx } from '@nestjs/microservices';
106
import { KafkaContext } from '@nestjs/microservices';
107
108
@Controller()
109
export class KafkaController {
110
private readonly logger = new Logger(KafkaController.name);
111
112
@EventPattern('user.events.created')
113
async handleUserCreated(
114
@Payload() userData: any,
115
@Ctx() context: KafkaContext
116
): Promise<void> {
117
const message = context.getMessage();
118
const topic = context.getTopic();
119
const partition = context.getPartition();
120
121
this.logger.log(`Processing user creation from topic: ${topic}, partition: ${partition}`);
122
this.logger.debug(`Message headers:`, message.headers);
123
this.logger.debug(`Message key:`, message.key?.toString());
124
125
// Manual heartbeat to prevent session timeout for long processing
126
const heartbeat = context.getHeartbeat();
127
await heartbeat();
128
129
await this.processUserCreation(userData);
130
}
131
132
@MessagePattern('user.commands.process')
133
async processUserCommand(
134
@Payload() command: any,
135
@Ctx() context: KafkaContext
136
): Promise<any> {
137
const producer = context.getProducer();
138
const consumer = context.getConsumer();
139
140
try {
141
const result = await this.executeCommand(command);
142
143
// Send result to another topic using the producer
144
await producer.send({
145
topic: 'user.events.processed',
146
messages: [{
147
key: command.userId,
148
value: JSON.stringify(result),
149
headers: {
150
'correlation-id': context.getMessage().headers['correlation-id'],
151
'processed-at': Date.now().toString()
152
}
153
}]
154
});
155
156
return result;
157
} catch (error) {
158
// Manual offset management if needed
159
const { topic, partition, offset } = context.getMessage();
160
await consumer.commitOffsets([{
161
topic,
162
partition,
163
offset: (parseInt(offset) + 1).toString()
164
}]);
165
166
throw error;
167
}
168
}
169
}
170
```
171
172
### TCP Context
173
174
TCP-specific context providing access to the underlying socket connection and pattern information.
175
176
```typescript { .api }
177
/**
178
* TCP-specific context implementation
179
*/
180
class TcpContext extends BaseRpcContext {
181
/**
182
* Returns reference to the underlying JSON socket
183
* @returns JSON socket instance for direct socket operations
184
*/
185
getSocketRef(): any;
186
187
/**
188
* Returns the pattern name used for message routing
189
* @returns Pattern identifier
190
*/
191
getPattern(): string;
192
}
193
```
194
195
**Usage Examples:**
196
197
```typescript
198
import { Controller, Logger } from '@nestjs/common';
199
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
200
import { TcpContext } from '@nestjs/microservices';
201
202
@Controller()
203
export class TcpController {
204
private readonly logger = new Logger(TcpController.name);
205
206
@MessagePattern({ cmd: 'process_with_socket' })
207
processWithSocket(
208
@Payload() data: any,
209
@Ctx() context: TcpContext
210
): any {
211
const socket = context.getSocketRef();
212
const pattern = context.getPattern();
213
214
this.logger.log(`Processing command: ${JSON.stringify(pattern)}`);
215
216
// Access socket properties
217
const clientAddress = socket.socket.remoteAddress;
218
const clientPort = socket.socket.remotePort;
219
220
this.logger.log(`Request from client: ${clientAddress}:${clientPort}`);
221
222
// Direct socket operations if needed
223
socket.write(JSON.stringify({
224
type: 'ack',
225
pattern: pattern,
226
timestamp: Date.now()
227
}));
228
229
return this.processData(data, { clientAddress, pattern });
230
}
231
232
@EventPattern('tcp_notification')
233
handleTcpNotification(
234
@Payload() notification: any,
235
@Ctx() context: TcpContext
236
): void {
237
const socket = context.getSocketRef();
238
const pattern = context.getPattern();
239
240
// Log connection details
241
this.logger.log(`TCP notification: ${pattern} from ${socket.socket.remoteAddress}`);
242
243
// Send immediate acknowledgment
244
socket.write(JSON.stringify({
245
type: 'notification_received',
246
id: notification.id,
247
timestamp: Date.now()
248
}));
249
250
this.processNotification(notification);
251
}
252
}
253
```
254
255
### NATS Context
256
257
NATS-specific context providing access to subject and message headers.
258
259
```typescript { .api }
260
/**
261
* NATS-specific context implementation
262
*/
263
class NatsContext extends BaseRpcContext {
264
/**
265
* Returns the NATS subject name
266
* @returns Subject string used for message routing
267
*/
268
getSubject(): string;
269
270
/**
271
* Returns message headers if available
272
* @returns Message headers object
273
*/
274
getHeaders(): Record<string, any>;
275
}
276
```
277
278
**Usage Examples:**
279
280
```typescript
281
import { Controller, Logger } from '@nestjs/common';
282
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
283
import { NatsContext } from '@nestjs/microservices';
284
285
@Controller()
286
export class NatsController {
287
private readonly logger = new Logger(NatsController.name);
288
289
@MessagePattern('user.*.get')
290
getUserBySubject(
291
@Payload() request: any,
292
@Ctx() context: NatsContext
293
): any {
294
const subject = context.getSubject();
295
const headers = context.getHeaders();
296
297
// Extract user type from subject pattern
298
const userType = subject.split('.')[1]; // e.g., 'admin' from 'user.admin.get'
299
300
this.logger.log(`Getting ${userType} user with ID: ${request.id}`);
301
this.logger.debug(`Request headers:`, headers);
302
303
return this.userService.getByType(userType, request.id, {
304
requestId: headers?.['request-id'],
305
userAgent: headers?.['user-agent']
306
});
307
}
308
309
@EventPattern('events.*.created')
310
handleEntityCreated(
311
@Payload() entity: any,
312
@Ctx() context: NatsContext
313
): void {
314
const subject = context.getSubject();
315
const headers = context.getHeaders();
316
const entityType = subject.split('.')[1]; // Extract entity type from subject
317
318
this.logger.log(`${entityType} created with ID: ${entity.id}`);
319
320
// Use headers for tracking and correlation
321
if (headers?.['correlation-id']) {
322
this.trackingService.correlateEvent(headers['correlation-id'], entity);
323
}
324
325
this.processEntityCreation(entityType, entity);
326
}
327
}
328
```
329
330
### MQTT Context
331
332
MQTT-specific context providing access to topic and original MQTT packet information.
333
334
```typescript { .api }
335
/**
336
* MQTT-specific context implementation
337
*/
338
class MqttContext extends BaseRpcContext {
339
/**
340
* Returns the MQTT topic name
341
* @returns Topic string used for message routing
342
*/
343
getTopic(): string;
344
345
/**
346
* Returns the original MQTT packet
347
* @returns MQTT packet with QoS, retain flag, etc.
348
*/
349
getPacket(): any;
350
}
351
```
352
353
**Usage Examples:**
354
355
```typescript
356
import { Controller, Logger } from '@nestjs/common';
357
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
358
import { MqttContext } from '@nestjs/microservices';
359
360
@Controller()
361
export class MqttController {
362
private readonly logger = new Logger(MqttController.name);
363
364
@EventPattern('sensors/+/temperature')
365
handleTemperatureReading(
366
@Payload() reading: any,
367
@Ctx() context: MqttContext
368
): void {
369
const topic = context.getTopic();
370
const packet = context.getPacket();
371
372
// Extract sensor ID from topic (e.g., 'sensors/sensor001/temperature')
373
const sensorId = topic.split('/')[1];
374
375
this.logger.log(`Temperature reading from sensor ${sensorId}: ${reading.value}°C`);
376
this.logger.debug(`MQTT QoS: ${packet.qos}, Retain: ${packet.retain}`);
377
378
// Handle retained messages differently
379
if (packet.retain) {
380
this.logger.log('Processing retained temperature message');
381
this.sensorService.updateLastKnownValue(sensorId, reading);
382
} else {
383
this.sensorService.processRealtimeReading(sensorId, reading);
384
}
385
}
386
387
@EventPattern('devices/+/status/#')
388
handleDeviceStatus(
389
@Payload() status: any,
390
@Ctx() context: MqttContext
391
): void {
392
const topic = context.getTopic();
393
const packet = context.getPacket();
394
395
const topicParts = topic.split('/');
396
const deviceId = topicParts[1];
397
const statusType = topicParts.slice(3).join('/'); // Everything after 'status/'
398
399
this.logger.log(`Device ${deviceId} status update: ${statusType}`);
400
401
// Use packet information for processing decisions
402
const priority = packet.qos === 2 ? 'high' : 'normal';
403
404
this.deviceService.updateStatus(deviceId, statusType, status, { priority });
405
}
406
}
407
```
408
409
### Redis Context
410
411
Redis-specific context providing access to channel information for pub/sub operations.
412
413
```typescript { .api }
414
/**
415
* Redis-specific context implementation
416
*/
417
class RedisContext extends BaseRpcContext {
418
/**
419
* Returns the Redis channel name
420
* @returns Channel name used for pub/sub
421
*/
422
getChannel(): string;
423
}
424
```
425
426
**Usage Examples:**
427
428
```typescript
429
import { Controller, Logger } from '@nestjs/common';
430
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
431
import { RedisContext } from '@nestjs/microservices';
432
433
@Controller()
434
export class RedisController {
435
private readonly logger = new Logger(RedisController.name);
436
437
@MessagePattern('cache:get:*')
438
getCacheValue(
439
@Payload() request: any,
440
@Ctx() context: RedisContext
441
): any {
442
const channel = context.getChannel();
443
444
// Extract cache key from channel pattern
445
const cacheKey = channel.replace('cache:get:', '');
446
447
this.logger.log(`Cache request for key: ${cacheKey}`);
448
449
return this.cacheService.get(cacheKey, request.options);
450
}
451
452
@EventPattern('notifications:*')
453
handleNotification(
454
@Payload() notification: any,
455
@Ctx() context: RedisContext
456
): void {
457
const channel = context.getChannel();
458
const notificationType = channel.split(':')[1];
459
460
this.logger.log(`Notification received on channel: ${channel}`);
461
462
switch (notificationType) {
463
case 'user':
464
this.userNotificationService.handle(notification);
465
break;
466
case 'system':
467
this.systemNotificationService.handle(notification);
468
break;
469
default:
470
this.logger.warn(`Unknown notification type: ${notificationType}`);
471
}
472
}
473
}
474
```
475
476
### RabbitMQ Context
477
478
RabbitMQ-specific context providing access to message, channel reference, and pattern information.
479
480
```typescript { .api }
481
/**
482
* RabbitMQ-specific context implementation
483
*/
484
class RmqContext extends BaseRpcContext {
485
/**
486
* Returns the original RabbitMQ message
487
* @returns RabbitMQ message with properties and fields
488
*/
489
getMessage(): any;
490
491
/**
492
* Returns reference to the RabbitMQ channel
493
* @returns Channel instance for manual operations
494
*/
495
getChannelRef(): any;
496
497
/**
498
* Returns the pattern name used for routing
499
* @returns Pattern/routing key string
500
*/
501
getPattern(): string;
502
}
503
```
504
505
**Usage Examples:**
506
507
```typescript
508
import { Controller, Logger } from '@nestjs/common';
509
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
510
import { RmqContext } from '@nestjs/microservices';
511
512
@Controller()
513
export class RmqController {
514
private readonly logger = new Logger(RmqController.name);
515
516
@MessagePattern('user.commands.process')
517
async processUserCommand(
518
@Payload() command: any,
519
@Ctx() context: RmqContext
520
): Promise<any> {
521
const message = context.getMessage();
522
const channel = context.getChannelRef();
523
const pattern = context.getPattern();
524
525
this.logger.log(`Processing command with routing key: ${pattern}`);
526
this.logger.debug(`Message properties:`, message.properties);
527
528
try {
529
const result = await this.commandService.process(command);
530
531
// Manual acknowledgment after successful processing
532
channel.ack(message);
533
534
// Publish result to another exchange
535
await channel.publish(
536
'results_exchange',
537
'user.results.processed',
538
Buffer.from(JSON.stringify(result)),
539
{
540
correlationId: message.properties.correlationId,
541
timestamp: Date.now(),
542
headers: {
543
'processed-by': 'user-service',
544
'original-routing-key': pattern
545
}
546
}
547
);
548
549
return result;
550
} catch (error) {
551
this.logger.error(`Command processing failed:`, error);
552
553
// Reject message and potentially requeue
554
const requeue = error.retryable === true;
555
channel.nack(message, false, requeue);
556
557
throw error;
558
}
559
}
560
561
@EventPattern('user.events.*')
562
handleUserEvent(
563
@Payload() event: any,
564
@Ctx() context: RmqContext
565
): void {
566
const message = context.getMessage();
567
const channel = context.getChannelRef();
568
const pattern = context.getPattern();
569
570
const eventType = pattern.split('.').pop(); // Extract event type
571
572
this.logger.log(`Handling user event: ${eventType}`);
573
574
try {
575
// Process event based on type
576
this.eventService.handle(eventType, event);
577
578
// Manual acknowledgment
579
channel.ack(message);
580
581
// Publish follow-up events if needed
582
if (eventType === 'created') {
583
channel.publish(
584
'notifications_exchange',
585
'notifications.welcome',
586
Buffer.from(JSON.stringify({ userId: event.id })),
587
{ correlationId: message.properties.correlationId }
588
);
589
}
590
} catch (error) {
591
this.logger.error(`Event handling failed:`, error);
592
channel.nack(message, false, false); // Don't requeue events
593
}
594
}
595
}
596
```