or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-proxies.mdcontext-objects.mdexceptions.mdgrpc.mdindex.mdmessage-patterns.mdmodules.mdtransports.md

message-patterns.mddocs/

0

# Message Patterns & Event Handling

1

2

Decorators and patterns for defining message handlers with request-response and event-driven communication models, supporting pattern-based routing and payload extraction for distributed microservice architectures.

3

4

## Capabilities

5

6

### Message Pattern Decorator

7

8

Decorator for subscribing to incoming messages with request-response pattern where a response is expected.

9

10

```typescript { .api }

11

/**

12

* Subscribes to incoming messages with request-response pattern

13

* @param metadata - Pattern metadata for message routing

14

* @param transport - Specific transport to bind the pattern to

15

* @param extras - Additional configuration options

16

* @returns Method decorator

17

*/

18

function MessagePattern<T = string>(

19

metadata?: T,

20

transport?: Transport | symbol,

21

extras?: Record<string, any>

22

): MethodDecorator;

23

```

24

25

**Usage Examples:**

26

27

```typescript

28

import { Controller } from '@nestjs/common';

29

import { MessagePattern, Payload } from '@nestjs/microservices';

30

31

@Controller()

32

export class MathController {

33

@MessagePattern({ cmd: 'sum' })

34

accumulate(@Payload() data: number[]): number {

35

return (data || []).reduce((a, b) => a + b, 0);

36

}

37

38

@MessagePattern('get_user')

39

getUser(@Payload() id: number): Promise<User> {

40

return this.userService.findById(id);

41

}

42

43

// With transport-specific binding

44

@MessagePattern({ cmd: 'process' }, Transport.REDIS)

45

processOnRedis(@Payload() data: any): any {

46

return this.processData(data);

47

}

48

49

// With metadata and extras

50

@MessagePattern(

51

{ service: 'user', method: 'create' },

52

Transport.KAFKA,

53

{ version: '1.0' }

54

)

55

createUser(@Payload() userData: CreateUserDto): Promise<User> {

56

return this.userService.create(userData);

57

}

58

}

59

```

60

61

### Event Pattern Decorator

62

63

Decorator for subscribing to incoming events with fire-and-forget pattern where no response is expected.

64

65

```typescript { .api }

66

/**

67

* Subscribes to incoming events with fire-and-forget pattern

68

* @param metadata - Event pattern metadata for routing

69

* @param transport - Specific transport to bind the pattern to

70

* @param extras - Additional configuration options

71

* @returns Method decorator

72

*/

73

function EventPattern<T = string>(

74

metadata?: T,

75

transport?: Transport | symbol,

76

extras?: Record<string, any>

77

): MethodDecorator;

78

```

79

80

**Usage Examples:**

81

82

```typescript

83

import { Controller } from '@nestjs/common';

84

import { EventPattern, Payload, Ctx } from '@nestjs/microservices';

85

86

@Controller()

87

export class EventController {

88

@EventPattern('user_created')

89

handleUserCreated(@Payload() data: User): void {

90

console.log('New user created:', data.email);

91

// Perform side effects like sending welcome email

92

this.emailService.sendWelcomeEmail(data.email);

93

}

94

95

@EventPattern({ type: 'order', action: 'completed' })

96

handleOrderCompleted(@Payload() order: Order): void {

97

this.analyticsService.trackOrderCompletion(order);

98

this.inventoryService.updateStock(order.items);

99

}

100

101

// Transport-specific event handling

102

@EventPattern('inventory.updated', Transport.KAFKA)

103

handleInventoryUpdate(@Payload() data: InventoryUpdate): void {

104

this.cacheService.invalidateProductCache(data.productId);

105

}

106

}

107

```

108

109

### Payload Decorator

110

111

Parameter decorator for extracting payload data from incoming messages with optional property selection and pipe transformations.

112

113

```typescript { .api }

114

/**

115

* Extracts payload from incoming microservice message

116

* @param property - Optional property name to extract from payload

117

* @param pipes - Optional transformation pipes

118

* @returns Parameter decorator

119

*/

120

function Payload(): ParameterDecorator;

121

function Payload(...pipes: PipeTransform[]): ParameterDecorator;

122

function Payload(property?: string, ...pipes: PipeTransform[]): ParameterDecorator;

123

```

124

125

**Usage Examples:**

126

127

```typescript

128

import { Controller, ParseIntPipe, ValidationPipe } from '@nestjs/common';

129

import { MessagePattern, Payload } from '@nestjs/microservices';

130

131

@Controller()

132

export class DataController {

133

// Extract entire payload

134

@MessagePattern('process_data')

135

processData(@Payload() data: ProcessDataDto): ProcessResult {

136

return this.dataService.process(data);

137

}

138

139

// Extract specific property from payload

140

@MessagePattern('get_user')

141

getUser(@Payload('id') userId: number): Promise<User> {

142

return this.userService.findById(userId);

143

}

144

145

// Apply validation pipe to payload

146

@MessagePattern('create_user')

147

createUser(@Payload(ValidationPipe) userData: CreateUserDto): Promise<User> {

148

return this.userService.create(userData);

149

}

150

151

// Extract property and apply transformation pipe

152

@MessagePattern('calculate')

153

calculate(@Payload('value', ParseIntPipe) value: number): number {

154

return value * 2;

155

}

156

157

// Multiple parameters with different extractions

158

@MessagePattern('update_user')

159

updateUser(

160

@Payload('id', ParseIntPipe) id: number,

161

@Payload('data', ValidationPipe) updateData: UpdateUserDto

162

): Promise<User> {

163

return this.userService.update(id, updateData);

164

}

165

}

166

```

167

168

### Context Decorator

169

170

Parameter decorator for injecting transport-specific context object into message handler methods.

171

172

```typescript { .api }

173

/**

174

* Injects RPC context into handler method parameter

175

* @returns Parameter decorator

176

*/

177

function Ctx(): ParameterDecorator;

178

```

179

180

**Usage Examples:**

181

182

```typescript

183

import { Controller } from '@nestjs/common';

184

import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';

185

import { KafkaContext, TcpContext, RedisContext } from '@nestjs/microservices';

186

187

@Controller()

188

export class ContextController {

189

@MessagePattern('kafka_message')

190

handleKafkaMessage(

191

@Payload() data: any,

192

@Ctx() context: KafkaContext

193

): any {

194

const topic = context.getTopic();

195

const partition = context.getPartition();

196

const message = context.getMessage();

197

198

console.log(`Processing message from topic: ${topic}, partition: ${partition}`);

199

return this.processMessage(data, { topic, partition });

200

}

201

202

@EventPattern('tcp_event')

203

handleTcpEvent(

204

@Payload() data: any,

205

@Ctx() context: TcpContext

206

): void {

207

const socket = context.getSocketRef();

208

const pattern = context.getPattern();

209

210

console.log(`TCP event received with pattern: ${pattern}`);

211

// Access raw socket for additional operations

212

socket.write('ACK');

213

}

214

215

@MessagePattern('redis_request')

216

handleRedisRequest(

217

@Payload() data: any,

218

@Ctx() context: RedisContext

219

): any {

220

const channel = context.getChannel();

221

console.log(`Redis request on channel: ${channel}`);

222

return this.processRedisData(data);

223

}

224

}

225

```

226

227

### Pattern Metadata Interface

228

229

Interface for defining pattern metadata structure used in message and event patterns.

230

231

```typescript { .api }

232

/**

233

* Pattern metadata interface for message routing

234

*/

235

interface PatternMetadata {

236

[key: string]: any;

237

}

238

239

/**

240

* Message handler interface for custom implementations

241

*/

242

interface MessageHandler<TInput = any, TResult = any, TContext = any> {

243

(data: TInput, ctx?: TContext): Observable<TResult> | Promise<TResult> | TResult;

244

}

245

```

246

247

### Advanced Pattern Usage

248

249

**Pattern Matching Strategies:**

250

251

```typescript

252

// String patterns

253

@MessagePattern('user.get')

254

@EventPattern('user.created')

255

256

// Object patterns - exact match

257

@MessagePattern({ cmd: 'get', resource: 'user' })

258

@EventPattern({ type: 'user', action: 'created' })

259

260

// Nested object patterns

261

@MessagePattern({

262

service: 'user',

263

operation: { type: 'read', version: 'v1' }

264

})

265

266

// Array patterns (for some transports)

267

@MessagePattern(['user', 'get'])

268

@EventPattern(['events', 'user', 'created'])

269

```

270

271

**Transport-Specific Pattern Features:**

272

273

```typescript

274

// Kafka topic patterns

275

@MessagePattern('user.commands.create', Transport.KAFKA)

276

@EventPattern('user.events.created', Transport.KAFKA)

277

278

// MQTT topic patterns with wildcards (transport handles routing)

279

@EventPattern('sensors/+/temperature', Transport.MQTT)

280

@EventPattern('devices/+/status/#', Transport.MQTT)

281

282

// gRPC service method patterns

283

@MessagePattern({ service: 'UserService', method: 'GetUser' }, Transport.GRPC)

284

285

// RabbitMQ routing key patterns

286

@MessagePattern('user.commands.create', Transport.RMQ)

287

@EventPattern('user.events.*', Transport.RMQ)

288

```

289

290

**Error Handling in Message Handlers:**

291

292

```typescript

293

import { RpcException } from '@nestjs/microservices';

294

295

@Controller()

296

export class ErrorHandlingController {

297

@MessagePattern('risky_operation')

298

performRiskyOperation(@Payload() data: any): any {

299

try {

300

return this.processData(data);

301

} catch (error) {

302

throw new RpcException({

303

code: 'PROCESSING_ERROR',

304

message: error.message,

305

details: { input: data }

306

});

307

}

308

}

309

310

@EventPattern('cleanup_task')

311

async performCleanup(@Payload() data: any): Promise<void> {

312

try {

313

await this.cleanupService.cleanup(data);

314

} catch (error) {

315

// Log error but don't throw for fire-and-forget events

316

this.logger.error('Cleanup failed', error);

317

}

318

}

319

}

320

```