or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-services.mdhigh-level-consumers.mdindex.mdmessage-consumption.mdmessage-publishing.mdtopic-management.md

client-services.mddocs/

0

# Client Services

1

2

HTTP-based messaging client for distributed environments with service discovery, comprehensive error handling, and transparent network communication. Enables messaging operations across distributed CDAP deployments.

3

4

## Capabilities

5

6

### Client Messaging Service

7

8

HTTP client implementation of MessagingService for distributed deployments with automatic service discovery.

9

10

```java { .api }

11

class ClientMessagingService implements MessagingService {

12

/**

13

* Creates a client messaging service with service discovery.

14

* @param discoveryServiceClient client for discovering messaging service instances

15

*/

16

ClientMessagingService(DiscoveryServiceClient discoveryServiceClient);

17

}

18

```

19

20

**Usage Examples:**

21

22

```java

23

import co.cask.cdap.messaging.client.ClientMessagingService;

24

import co.cask.cdap.messaging.MessagingService;

25

import org.apache.twill.discovery.DiscoveryServiceClient;

26

27

// Create client with service discovery

28

DiscoveryServiceClient discoveryClient = // obtain discovery client

29

MessagingService messagingService = new ClientMessagingService(discoveryClient);

30

31

// Use exactly like local MessagingService

32

TopicMetadata metadata = new TopicMetadata(topicId,

33

TopicMetadata.TTL_KEY, "3600");

34

messagingService.createTopic(metadata);

35

36

// All MessagingService operations work transparently

37

StoreRequest request = StoreRequestBuilder.of(topicId)

38

.addPayload("Remote message")

39

.build();

40

messagingService.publish(request);

41

```

42

43

### Dependency Injection Setup

44

45

Configure client services using Guice modules for clean dependency management.

46

47

```java { .api }

48

class MessagingClientModule extends AbstractModule {

49

/**

50

* Configures MessagingService binding to ClientMessagingService.

51

*/

52

protected void configure();

53

}

54

```

55

56

**Usage Examples:**

57

58

```java

59

import co.cask.cdap.messaging.guice.MessagingClientModule;

60

import com.google.inject.Guice;

61

import com.google.inject.Injector;

62

63

// Set up dependency injection

64

Injector injector = Guice.createInjector(

65

new MessagingClientModule(),

66

// other modules

67

);

68

69

// Inject messaging service

70

MessagingService messagingService = injector.getInstance(MessagingService.class);

71

// This will be a ClientMessagingService instance

72

73

// Use in application classes

74

public class MyService {

75

private final MessagingService messagingService;

76

77

@Inject

78

public MyService(MessagingService messagingService) {

79

this.messagingService = messagingService;

80

}

81

82

public void publishEvent(String event) throws IOException {

83

StoreRequest request = StoreRequestBuilder.of(topicId)

84

.addPayload(event)

85

.build();

86

messagingService.publish(request);

87

}

88

}

89

```

90

91

### Request Building

92

93

Enhanced StoreRequestBuilder for client-side request construction with fluent API.

94

95

```java { .api }

96

class StoreRequestBuilder {

97

/** Creates a new StoreRequestBuilder instance for the specified topic */

98

static StoreRequestBuilder of(TopicId topicId);

99

100

/** Adds a single payload string (UTF-8 encoded) */

101

StoreRequestBuilder addPayload(String payload);

102

103

/** Adds a single byte array payload */

104

StoreRequestBuilder addPayload(byte[] payload);

105

106

/** Adds multiple payloads from iterator */

107

StoreRequestBuilder addPayloads(Iterator<byte[]> payloads);

108

109

/** Adds multiple payloads from iterable */

110

StoreRequestBuilder addPayloads(Iterable<byte[]> payloads);

111

112

/** Sets transaction write pointer for transactional publish */

113

StoreRequestBuilder setTransaction(Long txWritePointer);

114

115

/** Returns true if builder contains payloads */

116

boolean hasPayload();

117

118

/** Creates StoreRequest from builder configuration */

119

StoreRequest build();

120

}

121

```

122

123

**Usage Examples:**

124

125

```java

126

// Build complex requests

127

List<String> events = Arrays.asList(

128

"user.login",

129

"page.view",

130

"button.click"

131

);

132

133

StoreRequest request = StoreRequestBuilder.of(topicId)

134

.addPayload("session.start")

135

.addPayloads(events.stream()

136

.map(String::getBytes)

137

.collect(Collectors.toList()))

138

.build();

139

140

// Conditional payload addition

141

StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);

142

if (includeMetadata) {

143

builder.addPayload(generateMetadata());

144

}

145

builder.addPayload(mainContent);

146

StoreRequest request = builder.build();

147

148

// Transactional request building

149

Transaction tx = // obtain transaction

150

StoreRequest txRequest = StoreRequestBuilder.of(topicId)

151

.addPayload("transactional event")

152

.setTransaction(tx.getWritePointer())

153

.build();

154

```

155

156

### Client Rollback Detail

157

158

Client-side implementation of RollbackDetail with encoded rollback information.

159

160

```java { .api }

161

class ClientRollbackDetail implements RollbackDetail {

162

/**

163

* Creates rollback detail from encoded bytes

164

* (typically received from server response)

165

*/

166

ClientRollbackDetail(byte[] encoded);

167

168

/** Returns encoded rollback information */

169

byte[] getEncoded();

170

171

// Implements RollbackDetail interface methods

172

long getTransactionWritePointer();

173

long getStartTimestamp();

174

int getStartSequenceId();

175

long getEndTimestamp();

176

int getEndSequenceId();

177

}

178

```

179

180

## Network Communication

181

182

### HTTP Transport

183

184

Client uses HTTP/HTTPS for communication with messaging service endpoints:

185

186

- **Topic Operations**: RESTful API for topic management

187

- **Message Publishing**: Binary Avro encoding for efficiency

188

- **Message Consumption**: Streaming HTTP responses with Avro

189

- **Service Discovery**: Automatic endpoint discovery and failover

190

191

**Connection Configuration:**

192

193

```java

194

// HTTP configuration is handled internally

195

// Timeouts and connection settings use DefaultHttpRequestConfig

196

197

// Service discovery automatically finds messaging service instances

198

// No manual endpoint configuration required

199

```

200

201

### Error Handling and Retries

202

203

Comprehensive error handling with appropriate exception mapping:

204

205

```java

206

try {

207

messagingService.publish(request);

208

} catch (TopicNotFoundException e) {

209

// HTTP 404 mapped to TopicNotFoundException

210

System.out.println("Topic not found: " + e.getTopicName());

211

} catch (ServiceUnavailableException e) {

212

// HTTP 503 mapped to ServiceUnavailableException

213

System.out.println("Service unavailable: " + e.getServiceName());

214

} catch (IllegalArgumentException e) {

215

// HTTP 400 mapped to IllegalArgumentException

216

System.out.println("Bad request: " + e.getMessage());

217

} catch (IOException e) {

218

// Other HTTP errors mapped to IOException

219

System.out.println("Network error: " + e.getMessage());

220

}

221

```

222

223

### Content Type Handling

224

225

Client handles multiple content types for different operations:

226

227

- **JSON**: Topic metadata and configuration

228

- **Avro Binary**: Message payloads and requests

229

- **HTTP Streaming**: Large message consumption

230

231

```java

232

// Content types are handled automatically

233

// JSON for topic operations

234

TopicMetadata metadata = messagingService.getTopic(topicId);

235

236

// Avro binary for message operations

237

RollbackDetail rollback = messagingService.publish(request);

238

239

// Streaming for message consumption

240

MessageFetcher fetcher = messagingService.prepareFetch(topicId);

241

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

242

// Streaming consumption

243

}

244

```

245

246

## Service Discovery Integration

247

248

### Discovery Service Client

249

250

Integration with CDAP service discovery for automatic endpoint resolution:

251

252

```java

253

import org.apache.twill.discovery.DiscoveryServiceClient;

254

import org.apache.twill.discovery.ServiceDiscovered;

255

256

// Service discovery handles endpoint resolution

257

DiscoveryServiceClient discoveryClient = // obtain from CDAP

258

ClientMessagingService messagingService = new ClientMessagingService(discoveryClient);

259

260

// Client automatically discovers and connects to messaging service instances

261

// Handles service failures and endpoint changes transparently

262

```

263

264

### Failover and Load Balancing

265

266

Client automatically handles service instance failures and load distribution:

267

268

- **Automatic Failover**: Switches to healthy service instances

269

- **Load Distribution**: Distributes requests across available instances

270

- **Health Checking**: Monitors service instance availability

271

- **Circuit Breaking**: Prevents cascading failures

272

273

## Configuration Examples

274

275

### Spring Configuration

276

277

```java

278

@Configuration

279

public class MessagingConfig {

280

281

@Bean

282

public MessagingService messagingService(DiscoveryServiceClient discoveryClient) {

283

return new ClientMessagingService(discoveryClient);

284

}

285

286

@Bean

287

public MessagePublisher messagePublisher(MessagingService messagingService) {

288

return new MessagePublisher() {

289

public void publish(TopicId topicId, String message) throws IOException {

290

StoreRequest request = StoreRequestBuilder.of(topicId)

291

.addPayload(message)

292

.build();

293

messagingService.publish(request);

294

}

295

};

296

}

297

}

298

```

299

300

### Application Integration

301

302

```java

303

public class EventPublisher {

304

private final MessagingService messagingService;

305

private final TopicId eventTopic;

306

307

public EventPublisher(MessagingService messagingService, String namespace, String topic) {

308

this.messagingService = messagingService;

309

this.eventTopic = new NamespaceId(namespace).topic(topic);

310

}

311

312

public void publishEvent(Object event) throws IOException {

313

String eventJson = gson.toJson(event);

314

StoreRequest request = StoreRequestBuilder.of(eventTopic)

315

.addPayload(eventJson)

316

.build();

317

318

messagingService.publish(request);

319

}

320

321

public void publishEvents(List<Object> events) throws IOException {

322

StoreRequestBuilder builder = StoreRequestBuilder.of(eventTopic);

323

for (Object event : events) {

324

builder.addPayload(gson.toJson(event));

325

}

326

327

messagingService.publish(builder.build());

328

}

329

}

330

```

331

332

## Performance Optimization

333

334

### Batch Operations

335

336

Optimize network usage with batch operations:

337

338

```java

339

// Batch multiple messages in single request

340

List<String> events = collectEvents();

341

StoreRequest batchRequest = StoreRequestBuilder.of(topicId)

342

.addPayloads(events.stream()

343

.map(String::getBytes)

344

.collect(Collectors.toList()))

345

.build();

346

347

messagingService.publish(batchRequest);

348

```

349

350

### Connection Reuse

351

352

Client automatically reuses HTTP connections for efficiency:

353

354

```java

355

// Same MessagingService instance reuses connections

356

MessagingService client = new ClientMessagingService(discoveryClient);

357

358

// Multiple operations reuse connections

359

client.createTopic(metadata1);

360

client.createTopic(metadata2);

361

client.publish(request1);

362

client.publish(request2);

363

```

364

365

### Async Patterns

366

367

While the client API is synchronous, you can implement async patterns:

368

369

```java

370

import java.util.concurrent.CompletableFuture;

371

import java.util.concurrent.ExecutorService;

372

373

public class AsyncEventPublisher {

374

private final MessagingService messagingService;

375

private final ExecutorService executor;

376

377

public CompletableFuture<Void> publishEventAsync(TopicId topicId, String event) {

378

return CompletableFuture.runAsync(() -> {

379

try {

380

StoreRequest request = StoreRequestBuilder.of(topicId)

381

.addPayload(event)

382

.build();

383

messagingService.publish(request);

384

} catch (IOException e) {

385

throw new RuntimeException("Failed to publish event", e);

386

}

387

}, executor);

388

}

389

}

390

```