0
# Message Patterns & Event Handling
1
2
Decorators and patterns for defining message handlers with request-response and event-driven communication models, supporting pattern-based routing and payload extraction for distributed microservice architectures.
3
4
## Capabilities
5
6
### Message Pattern Decorator
7
8
Decorator for subscribing to incoming messages with request-response pattern where a response is expected.
9
10
```typescript { .api }
11
/**
12
* Subscribes to incoming messages with request-response pattern
13
* @param metadata - Pattern metadata for message routing
14
* @param transport - Specific transport to bind the pattern to
15
* @param extras - Additional configuration options
16
* @returns Method decorator
17
*/
18
function MessagePattern<T = string>(
19
metadata?: T,
20
transport?: Transport | symbol,
21
extras?: Record<string, any>
22
): MethodDecorator;
23
```
24
25
**Usage Examples:**
26
27
```typescript
28
import { Controller } from '@nestjs/common';
29
import { MessagePattern, Payload } from '@nestjs/microservices';
30
31
@Controller()
32
export class MathController {
33
@MessagePattern({ cmd: 'sum' })
34
accumulate(@Payload() data: number[]): number {
35
return (data || []).reduce((a, b) => a + b, 0);
36
}
37
38
@MessagePattern('get_user')
39
getUser(@Payload() id: number): Promise<User> {
40
return this.userService.findById(id);
41
}
42
43
// With transport-specific binding
44
@MessagePattern({ cmd: 'process' }, Transport.REDIS)
45
processOnRedis(@Payload() data: any): any {
46
return this.processData(data);
47
}
48
49
// With metadata and extras
50
@MessagePattern(
51
{ service: 'user', method: 'create' },
52
Transport.KAFKA,
53
{ version: '1.0' }
54
)
55
createUser(@Payload() userData: CreateUserDto): Promise<User> {
56
return this.userService.create(userData);
57
}
58
}
59
```
60
61
### Event Pattern Decorator
62
63
Decorator for subscribing to incoming events with fire-and-forget pattern where no response is expected.
64
65
```typescript { .api }
66
/**
67
* Subscribes to incoming events with fire-and-forget pattern
68
* @param metadata - Event pattern metadata for routing
69
* @param transport - Specific transport to bind the pattern to
70
* @param extras - Additional configuration options
71
* @returns Method decorator
72
*/
73
function EventPattern<T = string>(
74
metadata?: T,
75
transport?: Transport | symbol,
76
extras?: Record<string, any>
77
): MethodDecorator;
78
```
79
80
**Usage Examples:**
81
82
```typescript
83
import { Controller } from '@nestjs/common';
84
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
85
86
@Controller()
87
export class EventController {
88
@EventPattern('user_created')
89
handleUserCreated(@Payload() data: User): void {
90
console.log('New user created:', data.email);
91
// Perform side effects like sending welcome email
92
this.emailService.sendWelcomeEmail(data.email);
93
}
94
95
@EventPattern({ type: 'order', action: 'completed' })
96
handleOrderCompleted(@Payload() order: Order): void {
97
this.analyticsService.trackOrderCompletion(order);
98
this.inventoryService.updateStock(order.items);
99
}
100
101
// Transport-specific event handling
102
@EventPattern('inventory.updated', Transport.KAFKA)
103
handleInventoryUpdate(@Payload() data: InventoryUpdate): void {
104
this.cacheService.invalidateProductCache(data.productId);
105
}
106
}
107
```
108
109
### Payload Decorator
110
111
Parameter decorator for extracting payload data from incoming messages with optional property selection and pipe transformations.
112
113
```typescript { .api }
114
/**
115
* Extracts payload from incoming microservice message
116
* @param property - Optional property name to extract from payload
117
* @param pipes - Optional transformation pipes
118
* @returns Parameter decorator
119
*/
120
function Payload(): ParameterDecorator;
121
function Payload(...pipes: PipeTransform[]): ParameterDecorator;
122
function Payload(property?: string, ...pipes: PipeTransform[]): ParameterDecorator;
123
```
124
125
**Usage Examples:**
126
127
```typescript
128
import { Controller, ParseIntPipe, ValidationPipe } from '@nestjs/common';
129
import { MessagePattern, Payload } from '@nestjs/microservices';
130
131
@Controller()
132
export class DataController {
133
// Extract entire payload
134
@MessagePattern('process_data')
135
processData(@Payload() data: ProcessDataDto): ProcessResult {
136
return this.dataService.process(data);
137
}
138
139
// Extract specific property from payload
140
@MessagePattern('get_user')
141
getUser(@Payload('id') userId: number): Promise<User> {
142
return this.userService.findById(userId);
143
}
144
145
// Apply validation pipe to payload
146
@MessagePattern('create_user')
147
createUser(@Payload(ValidationPipe) userData: CreateUserDto): Promise<User> {
148
return this.userService.create(userData);
149
}
150
151
// Extract property and apply transformation pipe
152
@MessagePattern('calculate')
153
calculate(@Payload('value', ParseIntPipe) value: number): number {
154
return value * 2;
155
}
156
157
// Multiple parameters with different extractions
158
@MessagePattern('update_user')
159
updateUser(
160
@Payload('id', ParseIntPipe) id: number,
161
@Payload('data', ValidationPipe) updateData: UpdateUserDto
162
): Promise<User> {
163
return this.userService.update(id, updateData);
164
}
165
}
166
```
167
168
### Context Decorator
169
170
Parameter decorator for injecting transport-specific context object into message handler methods.
171
172
```typescript { .api }
173
/**
174
* Injects RPC context into handler method parameter
175
* @returns Parameter decorator
176
*/
177
function Ctx(): ParameterDecorator;
178
```
179
180
**Usage Examples:**
181
182
```typescript
183
import { Controller } from '@nestjs/common';
184
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
185
import { KafkaContext, TcpContext, RedisContext } from '@nestjs/microservices';
186
187
@Controller()
188
export class ContextController {
189
@MessagePattern('kafka_message')
190
handleKafkaMessage(
191
@Payload() data: any,
192
@Ctx() context: KafkaContext
193
): any {
194
const topic = context.getTopic();
195
const partition = context.getPartition();
196
const message = context.getMessage();
197
198
console.log(`Processing message from topic: ${topic}, partition: ${partition}`);
199
return this.processMessage(data, { topic, partition });
200
}
201
202
@EventPattern('tcp_event')
203
handleTcpEvent(
204
@Payload() data: any,
205
@Ctx() context: TcpContext
206
): void {
207
const socket = context.getSocketRef();
208
const pattern = context.getPattern();
209
210
console.log(`TCP event received with pattern: ${pattern}`);
211
// Access raw socket for additional operations
212
socket.write('ACK');
213
}
214
215
@MessagePattern('redis_request')
216
handleRedisRequest(
217
@Payload() data: any,
218
@Ctx() context: RedisContext
219
): any {
220
const channel = context.getChannel();
221
console.log(`Redis request on channel: ${channel}`);
222
return this.processRedisData(data);
223
}
224
}
225
```
226
227
### Pattern Metadata Interface
228
229
Interface for defining pattern metadata structure used in message and event patterns.
230
231
```typescript { .api }
232
/**
233
* Pattern metadata interface for message routing
234
*/
235
interface PatternMetadata {
236
[key: string]: any;
237
}
238
239
/**
240
* Message handler interface for custom implementations
241
*/
242
interface MessageHandler<TInput = any, TResult = any, TContext = any> {
243
(data: TInput, ctx?: TContext): Observable<TResult> | Promise<TResult> | TResult;
244
}
245
```
246
247
### Advanced Pattern Usage
248
249
**Pattern Matching Strategies:**
250
251
```typescript
252
// String patterns
253
@MessagePattern('user.get')
254
@EventPattern('user.created')
255
256
// Object patterns - exact match
257
@MessagePattern({ cmd: 'get', resource: 'user' })
258
@EventPattern({ type: 'user', action: 'created' })
259
260
// Nested object patterns
261
@MessagePattern({
262
service: 'user',
263
operation: { type: 'read', version: 'v1' }
264
})
265
266
// Array patterns (for some transports)
267
@MessagePattern(['user', 'get'])
268
@EventPattern(['events', 'user', 'created'])
269
```
270
271
**Transport-Specific Pattern Features:**
272
273
```typescript
274
// Kafka topic patterns
275
@MessagePattern('user.commands.create', Transport.KAFKA)
276
@EventPattern('user.events.created', Transport.KAFKA)
277
278
// MQTT topic patterns with wildcards (transport handles routing)
279
@EventPattern('sensors/+/temperature', Transport.MQTT)
280
@EventPattern('devices/+/status/#', Transport.MQTT)
281
282
// gRPC service method patterns
283
@MessagePattern({ service: 'UserService', method: 'GetUser' }, Transport.GRPC)
284
285
// RabbitMQ routing key patterns
286
@MessagePattern('user.commands.create', Transport.RMQ)
287
@EventPattern('user.events.*', Transport.RMQ)
288
```
289
290
**Error Handling in Message Handlers:**
291
292
```typescript
293
import { RpcException } from '@nestjs/microservices';
294
295
@Controller()
296
export class ErrorHandlingController {
297
@MessagePattern('risky_operation')
298
performRiskyOperation(@Payload() data: any): any {
299
try {
300
return this.processData(data);
301
} catch (error) {
302
throw new RpcException({
303
code: 'PROCESSING_ERROR',
304
message: error.message,
305
details: { input: data }
306
});
307
}
308
}
309
310
@EventPattern('cleanup_task')
311
async performCleanup(@Payload() data: any): Promise<void> {
312
try {
313
await this.cleanupService.cleanup(data);
314
} catch (error) {
315
// Log error but don't throw for fire-and-forget events
316
this.logger.error('Cleanup failed', error);
317
}
318
}
319
}
320
```