0
# Client Services
1
2
HTTP-based messaging client for distributed environments with service discovery, comprehensive error handling, and transparent network communication. Enables messaging operations across distributed CDAP deployments.
3
4
## Capabilities
5
6
### Client Messaging Service
7
8
HTTP client implementation of MessagingService for distributed deployments with automatic service discovery.
9
10
```java { .api }
11
class ClientMessagingService implements MessagingService {
12
/**
13
* Creates a client messaging service with service discovery.
14
* @param discoveryServiceClient client for discovering messaging service instances
15
*/
16
ClientMessagingService(DiscoveryServiceClient discoveryServiceClient);
17
}
18
```
19
20
**Usage Examples:**
21
22
```java
23
import co.cask.cdap.messaging.client.ClientMessagingService;
24
import co.cask.cdap.messaging.MessagingService;
25
import org.apache.twill.discovery.DiscoveryServiceClient;
26
27
// Create client with service discovery
28
DiscoveryServiceClient discoveryClient = // obtain discovery client
29
MessagingService messagingService = new ClientMessagingService(discoveryClient);
30
31
// Use exactly like local MessagingService
32
TopicMetadata metadata = new TopicMetadata(topicId,
33
TopicMetadata.TTL_KEY, "3600");
34
messagingService.createTopic(metadata);
35
36
// All MessagingService operations work transparently
37
StoreRequest request = StoreRequestBuilder.of(topicId)
38
.addPayload("Remote message")
39
.build();
40
messagingService.publish(request);
41
```
42
43
### Dependency Injection Setup
44
45
Configure client services using Guice modules for clean dependency management.
46
47
```java { .api }
48
class MessagingClientModule extends AbstractModule {
49
/**
50
* Configures MessagingService binding to ClientMessagingService.
51
*/
52
protected void configure();
53
}
54
```
55
56
**Usage Examples:**
57
58
```java
59
import co.cask.cdap.messaging.guice.MessagingClientModule;
60
import com.google.inject.Guice;
61
import com.google.inject.Injector;
62
63
// Set up dependency injection
64
Injector injector = Guice.createInjector(
65
new MessagingClientModule(),
66
// other modules
67
);
68
69
// Inject messaging service
70
MessagingService messagingService = injector.getInstance(MessagingService.class);
71
// This will be a ClientMessagingService instance
72
73
// Use in application classes
74
public class MyService {
75
private final MessagingService messagingService;
76
77
@Inject
78
public MyService(MessagingService messagingService) {
79
this.messagingService = messagingService;
80
}
81
82
public void publishEvent(String event) throws IOException {
83
StoreRequest request = StoreRequestBuilder.of(topicId)
84
.addPayload(event)
85
.build();
86
messagingService.publish(request);
87
}
88
}
89
```
90
91
### Request Building
92
93
Enhanced StoreRequestBuilder for client-side request construction with fluent API.
94
95
```java { .api }
96
class StoreRequestBuilder {
97
/** Creates a new StoreRequestBuilder instance for the specified topic */
98
static StoreRequestBuilder of(TopicId topicId);
99
100
/** Adds a single payload string (UTF-8 encoded) */
101
StoreRequestBuilder addPayload(String payload);
102
103
/** Adds a single byte array payload */
104
StoreRequestBuilder addPayload(byte[] payload);
105
106
/** Adds multiple payloads from iterator */
107
StoreRequestBuilder addPayloads(Iterator<byte[]> payloads);
108
109
/** Adds multiple payloads from iterable */
110
StoreRequestBuilder addPayloads(Iterable<byte[]> payloads);
111
112
/** Sets transaction write pointer for transactional publish */
113
StoreRequestBuilder setTransaction(Long txWritePointer);
114
115
/** Returns true if builder contains payloads */
116
boolean hasPayload();
117
118
/** Creates StoreRequest from builder configuration */
119
StoreRequest build();
120
}
121
```
122
123
**Usage Examples:**
124
125
```java
126
// Build complex requests
127
List<String> events = Arrays.asList(
128
"user.login",
129
"page.view",
130
"button.click"
131
);
132
133
StoreRequest request = StoreRequestBuilder.of(topicId)
134
.addPayload("session.start")
135
.addPayloads(events.stream()
136
.map(String::getBytes)
137
.collect(Collectors.toList()))
138
.build();
139
140
// Conditional payload addition
141
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
142
if (includeMetadata) {
143
builder.addPayload(generateMetadata());
144
}
145
builder.addPayload(mainContent);
146
StoreRequest request = builder.build();
147
148
// Transactional request building
149
Transaction tx = // obtain transaction
150
StoreRequest txRequest = StoreRequestBuilder.of(topicId)
151
.addPayload("transactional event")
152
.setTransaction(tx.getWritePointer())
153
.build();
154
```
155
156
### Client Rollback Detail
157
158
Client-side implementation of RollbackDetail with encoded rollback information.
159
160
```java { .api }
161
class ClientRollbackDetail implements RollbackDetail {
162
/**
163
* Creates rollback detail from encoded bytes
164
* (typically received from server response)
165
*/
166
ClientRollbackDetail(byte[] encoded);
167
168
/** Returns encoded rollback information */
169
byte[] getEncoded();
170
171
// Implements RollbackDetail interface methods
172
long getTransactionWritePointer();
173
long getStartTimestamp();
174
int getStartSequenceId();
175
long getEndTimestamp();
176
int getEndSequenceId();
177
}
178
```
179
180
## Network Communication
181
182
### HTTP Transport
183
184
Client uses HTTP/HTTPS for communication with messaging service endpoints:
185
186
- **Topic Operations**: RESTful API for topic management
187
- **Message Publishing**: Binary Avro encoding for efficiency
188
- **Message Consumption**: Streaming HTTP responses with Avro
189
- **Service Discovery**: Automatic endpoint discovery and failover
190
191
**Connection Configuration:**
192
193
```java
194
// HTTP configuration is handled internally
195
// Timeouts and connection settings use DefaultHttpRequestConfig
196
197
// Service discovery automatically finds messaging service instances
198
// No manual endpoint configuration required
199
```
200
201
### Error Handling and Retries
202
203
Comprehensive error handling with appropriate exception mapping:
204
205
```java
206
try {
207
messagingService.publish(request);
208
} catch (TopicNotFoundException e) {
209
// HTTP 404 mapped to TopicNotFoundException
210
System.out.println("Topic not found: " + e.getTopicName());
211
} catch (ServiceUnavailableException e) {
212
// HTTP 503 mapped to ServiceUnavailableException
213
System.out.println("Service unavailable: " + e.getServiceName());
214
} catch (IllegalArgumentException e) {
215
// HTTP 400 mapped to IllegalArgumentException
216
System.out.println("Bad request: " + e.getMessage());
217
} catch (IOException e) {
218
// Other HTTP errors mapped to IOException
219
System.out.println("Network error: " + e.getMessage());
220
}
221
```
222
223
### Content Type Handling
224
225
Client handles multiple content types for different operations:
226
227
- **JSON**: Topic metadata and configuration
228
- **Avro Binary**: Message payloads and requests
229
- **HTTP Streaming**: Large message consumption
230
231
```java
232
// Content types are handled automatically
233
// JSON for topic operations
234
TopicMetadata metadata = messagingService.getTopic(topicId);
235
236
// Avro binary for message operations
237
RollbackDetail rollback = messagingService.publish(request);
238
239
// Streaming for message consumption
240
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
241
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
242
// Streaming consumption
243
}
244
```
245
246
## Service Discovery Integration
247
248
### Discovery Service Client
249
250
Integration with CDAP service discovery for automatic endpoint resolution:
251
252
```java
253
import org.apache.twill.discovery.DiscoveryServiceClient;
254
import org.apache.twill.discovery.ServiceDiscovered;
255
256
// Service discovery handles endpoint resolution
257
DiscoveryServiceClient discoveryClient = // obtain from CDAP
258
ClientMessagingService messagingService = new ClientMessagingService(discoveryClient);
259
260
// Client automatically discovers and connects to messaging service instances
261
// Handles service failures and endpoint changes transparently
262
```
263
264
### Failover and Load Balancing
265
266
Client automatically handles service instance failures and load distribution:
267
268
- **Automatic Failover**: Switches to healthy service instances
269
- **Load Distribution**: Distributes requests across available instances
270
- **Health Checking**: Monitors service instance availability
271
- **Circuit Breaking**: Prevents cascading failures
272
273
## Configuration Examples
274
275
### Spring Configuration
276
277
```java
278
@Configuration
279
public class MessagingConfig {
280
281
@Bean
282
public MessagingService messagingService(DiscoveryServiceClient discoveryClient) {
283
return new ClientMessagingService(discoveryClient);
284
}
285
286
@Bean
287
public MessagePublisher messagePublisher(MessagingService messagingService) {
288
return new MessagePublisher() {
289
public void publish(TopicId topicId, String message) throws IOException {
290
StoreRequest request = StoreRequestBuilder.of(topicId)
291
.addPayload(message)
292
.build();
293
messagingService.publish(request);
294
}
295
};
296
}
297
}
298
```
299
300
### Application Integration
301
302
```java
303
public class EventPublisher {
304
private final MessagingService messagingService;
305
private final TopicId eventTopic;
306
307
public EventPublisher(MessagingService messagingService, String namespace, String topic) {
308
this.messagingService = messagingService;
309
this.eventTopic = new NamespaceId(namespace).topic(topic);
310
}
311
312
public void publishEvent(Object event) throws IOException {
313
String eventJson = gson.toJson(event);
314
StoreRequest request = StoreRequestBuilder.of(eventTopic)
315
.addPayload(eventJson)
316
.build();
317
318
messagingService.publish(request);
319
}
320
321
public void publishEvents(List<Object> events) throws IOException {
322
StoreRequestBuilder builder = StoreRequestBuilder.of(eventTopic);
323
for (Object event : events) {
324
builder.addPayload(gson.toJson(event));
325
}
326
327
messagingService.publish(builder.build());
328
}
329
}
330
```
331
332
## Performance Optimization
333
334
### Batch Operations
335
336
Optimize network usage with batch operations:
337
338
```java
339
// Batch multiple messages in single request
340
List<String> events = collectEvents();
341
StoreRequest batchRequest = StoreRequestBuilder.of(topicId)
342
.addPayloads(events.stream()
343
.map(String::getBytes)
344
.collect(Collectors.toList()))
345
.build();
346
347
messagingService.publish(batchRequest);
348
```
349
350
### Connection Reuse
351
352
Client automatically reuses HTTP connections for efficiency:
353
354
```java
355
// Same MessagingService instance reuses connections
356
MessagingService client = new ClientMessagingService(discoveryClient);
357
358
// Multiple operations reuse connections
359
client.createTopic(metadata1);
360
client.createTopic(metadata2);
361
client.publish(request1);
362
client.publish(request2);
363
```
364
365
### Async Patterns
366
367
While the client API is synchronous, you can implement async patterns:
368
369
```java
370
import java.util.concurrent.CompletableFuture;
371
import java.util.concurrent.ExecutorService;
372
373
public class AsyncEventPublisher {
374
private final MessagingService messagingService;
375
private final ExecutorService executor;
376
377
public CompletableFuture<Void> publishEventAsync(TopicId topicId, String event) {
378
return CompletableFuture.runAsync(() -> {
379
try {
380
StoreRequest request = StoreRequestBuilder.of(topicId)
381
.addPayload(event)
382
.build();
383
messagingService.publish(request);
384
} catch (IOException e) {
385
throw new RuntimeException("Failed to publish event", e);
386
}
387
}, executor);
388
}
389
}
390
```