0
# Client Proxies
1
2
Client-side communication interface for sending messages and events to microservices with support for multiple transport protocols including TCP, Redis, NATS, MQTT, gRPC, RabbitMQ, and Kafka.
3
4
## Capabilities
5
6
### ClientProxy Base Class
7
8
Abstract base class for all microservice client implementations providing core communication methods.
9
10
```typescript { .api }
11
/**
12
* Abstract base class for all microservice client implementations
13
*/
14
abstract class ClientProxy<EventsMap = any, Status = any> {
15
/** Current connection status as observable */
16
readonly status: Observable<Status>;
17
18
/** Establishes connection to the microservice server/broker */
19
connect(): Promise<any>;
20
21
/** Closes the underlying connection */
22
close(): any;
23
24
/**
25
* Send message with response expectation (request-response pattern)
26
* @param pattern - Message pattern for routing
27
* @param data - Payload data to send
28
* @returns Observable with response data
29
*/
30
send<TResult = any, TInput = any>(
31
pattern: any,
32
data: TInput
33
): Observable<TResult>;
34
35
/**
36
* Emit event without response expectation (fire-and-forget pattern)
37
* @param pattern - Event pattern for routing
38
* @param data - Event data to emit
39
* @returns Observable for completion tracking
40
*/
41
emit<TResult = any, TInput = any>(
42
pattern: any,
43
data: TInput
44
): Observable<TResult>;
45
46
/**
47
* Register event listener for client events
48
* @param event - Event name to listen for
49
* @param callback - Event callback function
50
*/
51
on<EventKey, EventCallback>(
52
event: EventKey,
53
callback: EventCallback
54
): void;
55
56
/**
57
* Returns the underlying server/broker instance
58
* @returns Unwrapped transport-specific instance
59
*/
60
unwrap<T = any>(): T;
61
}
62
```
63
64
**Usage Examples:**
65
66
```typescript
67
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
68
69
// Create client
70
const client: ClientProxy = ClientProxyFactory.create({
71
transport: Transport.TCP,
72
options: { host: '127.0.0.1', port: 3001 }
73
});
74
75
// Connect to server
76
await client.connect();
77
78
// Send request-response message
79
const result = await client.send({ cmd: 'get_user' }, { id: 1 }).toPromise();
80
81
// Emit fire-and-forget event
82
client.emit('user_logged_in', { userId: 1, timestamp: Date.now() });
83
84
// Listen to connection events
85
client.on('connect', () => console.log('Connected to microservice'));
86
87
// Close connection
88
await client.close();
89
```
90
91
### ClientProxyFactory
92
93
Factory class for creating client proxy instances based on transport configuration.
94
95
```typescript { .api }
96
/**
97
* Factory for creating client proxy instances
98
*/
99
class ClientProxyFactory {
100
/**
101
* Create a client proxy instance based on transport options
102
* @param clientOptions - Configuration for the client transport
103
* @returns ClientProxy instance for the specified transport
104
*/
105
static create(clientOptions: ClientOptions): ClientProxy;
106
}
107
108
interface ClientOptions {
109
transport: Transport | CustomTransportStrategy;
110
options?: any;
111
}
112
```
113
114
**Usage Examples:**
115
116
```typescript
117
// TCP client
118
const tcpClient = ClientProxyFactory.create({
119
transport: Transport.TCP,
120
options: { host: '127.0.0.1', port: 3001 }
121
});
122
123
// Redis client
124
const redisClient = ClientProxyFactory.create({
125
transport: Transport.REDIS,
126
options: { host: 'localhost', port: 6379 }
127
});
128
129
// NATS client
130
const natsClient = ClientProxyFactory.create({
131
transport: Transport.NATS,
132
options: { servers: ['nats://localhost:4222'] }
133
});
134
```
135
136
### Transport-Specific Clients
137
138
#### ClientTCP
139
140
TCP transport client implementation with socket-based communication.
141
142
```typescript { .api }
143
class ClientTCP extends ClientProxy<TcpEvents, TcpStatus> {
144
constructor(options: TcpClientOptions);
145
}
146
147
interface TcpClientOptions {
148
host?: string;
149
port?: number;
150
socketClass?: any;
151
tlsOptions?: any;
152
}
153
154
enum TcpStatus {
155
DISCONNECTED = 0,
156
CONNECTED = 1
157
}
158
```
159
160
#### ClientRedis
161
162
Redis transport client implementation using pub/sub pattern.
163
164
```typescript { .api }
165
class ClientRedis extends ClientProxy<RedisEvents, RedisStatus> {
166
constructor(options: RedisOptions);
167
168
/** Get request pattern for Redis pub/sub */
169
getRequestPattern(pattern: string): string;
170
171
/** Get reply pattern for Redis pub/sub */
172
getReplyPattern(pattern: string): string;
173
}
174
175
enum RedisStatus {
176
DISCONNECTED = 0,
177
RECONNECTING = 1,
178
CONNECTED = 2
179
}
180
```
181
182
#### ClientNats
183
184
NATS transport client implementation with subject-based messaging.
185
186
```typescript { .api }
187
class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
188
constructor(options: NatsOptions);
189
190
/** Create NATS client instance */
191
createClient(): Promise<any>;
192
193
/** Handle connection status updates */
194
handleStatusUpdates(client: any): void;
195
}
196
197
enum NatsStatus {
198
DISCONNECTED = 0,
199
RECONNECTING = 1,
200
CONNECTED = 2
201
}
202
```
203
204
#### ClientMqtt
205
206
MQTT transport client implementation for IoT and lightweight messaging.
207
208
```typescript { .api }
209
class ClientMqtt extends ClientProxy<MqttEvents, MqttStatus> {
210
constructor(options: MqttClientOptions);
211
212
/** Get request pattern for MQTT topics */
213
getRequestPattern(pattern: string): string;
214
215
/** Get response pattern for MQTT topics */
216
getResponsePattern(pattern: string): string;
217
}
218
219
enum MqttStatus {
220
DISCONNECTED = 0,
221
RECONNECTING = 1,
222
CONNECTED = 2,
223
CLOSED = 3
224
}
225
```
226
227
#### ClientRMQ
228
229
RabbitMQ transport client implementation with queue-based messaging.
230
231
```typescript { .api }
232
class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
233
constructor(options: RmqOptions);
234
235
/** Create RabbitMQ channel */
236
createChannel(): Promise<void>;
237
238
/** Setup RabbitMQ channel with configuration */
239
setupChannel(channel: any, resolve: Function): void;
240
}
241
242
enum RmqStatus {
243
DISCONNECTED = 0,
244
CONNECTED = 1
245
}
246
```
247
248
#### ClientKafka
249
250
Kafka transport client implementation with high-throughput streaming support.
251
252
```typescript { .api }
253
class ClientKafka extends ClientProxy<any, KafkaStatus> {
254
constructor(options: KafkaOptions);
255
256
/** Subscribe to response topic for request-response pattern */
257
subscribeToResponseOf(pattern: unknown): void;
258
259
/**
260
* Emit batch of messages for high-throughput scenarios
261
* @param pattern - Message pattern
262
* @param data - Batch data with messages array
263
* @returns Observable for batch processing results
264
*/
265
emitBatch<TResult = any, TInput = any>(
266
pattern: any,
267
data: { messages: TInput[] }
268
): Observable<TResult>;
269
270
/**
271
* Commit Kafka consumer offsets manually
272
* @param topicPartitions - Topic partition offset data
273
*/
274
commitOffsets(topicPartitions: TopicPartitionOffsetAndMetadata[]): Promise<void>;
275
}
276
277
enum KafkaStatus {
278
DISCONNECTED = 0,
279
CONNECTED = 1,
280
CRASHED = 2,
281
STOPPED = 3,
282
REBALANCING = 4
283
}
284
```
285
286
#### ClientGrpcProxy
287
288
gRPC transport client implementation with protocol buffer support.
289
290
```typescript { .api }
291
class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
292
constructor(options: GrpcOptions);
293
294
/**
295
* Get gRPC service by name
296
* @param name - Service name from proto definition
297
* @returns Typed service interface
298
*/
299
getService<T = any>(name: string): T;
300
301
/**
302
* Get gRPC client by service name
303
* @param name - Service name from proto definition
304
* @returns Typed client interface
305
*/
306
getClientByServiceName<T = any>(name: string): T;
307
308
/** Create unary gRPC service method */
309
createUnaryServiceMethod(client: any, methodName: string): Function;
310
311
/** Create streaming gRPC service method */
312
createStreamServiceMethod(client: unknown, methodName: string): Function;
313
}
314
315
interface ClientGrpc {
316
getService<T = any>(name: string): T;
317
getClientByServiceName<T = any>(name: string): T;
318
}
319
```
320
321
**Usage Examples:**
322
323
```typescript
324
// Kafka batch operations
325
const kafkaClient = ClientProxyFactory.create({
326
transport: Transport.KAFKA,
327
options: {
328
client: { clientId: 'my-client', brokers: ['localhost:9092'] }
329
}
330
});
331
332
await kafkaClient.emitBatch('user.events', {
333
messages: [
334
{ userId: 1, action: 'login' },
335
{ userId: 2, action: 'logout' },
336
{ userId: 3, action: 'signup' }
337
]
338
}).toPromise();
339
340
// gRPC service usage
341
const grpcClient = ClientProxyFactory.create({
342
transport: Transport.GRPC,
343
options: {
344
package: 'hero',
345
protoPath: path.join(__dirname, 'hero.proto'),
346
url: 'localhost:5000'
347
}
348
}) as ClientGrpc;
349
350
const heroService = grpcClient.getService<HeroService>('HeroService');
351
const hero = await heroService.findOne({ id: 1 }).toPromise();
352
```