or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-services.mdhigh-level-consumers.mdindex.mdmessage-consumption.mdmessage-publishing.mdtopic-management.md

high-level-consumers.mddocs/

0

# High-Level Consumers

1

2

Abstract subscriber service providing automatic message processing, failure handling, transaction management, and message ID persistence. Simplifies building robust, long-running message consumers with automatic retry and state management.

3

4

## Capabilities

5

6

### Abstract Messaging Subscriber Service

7

8

Base class for implementing reliable message consumers with built-in failure handling, transaction support, and progress tracking.

9

10

```java { .api }

11

abstract class AbstractMessagingSubscriberService<T> extends AbstractRetryableScheduledService {

12

/**

13

* Constructor for messaging subscriber service.

14

* @param topicId the topic to consume from

15

* @param transactionalFetch true to perform fetching inside transaction

16

* @param fetchSize number of messages to fetch in each batch

17

* @param txTimeoutSeconds transaction timeout in seconds

18

* @param maxTxTimeoutSeconds max transaction timeout in seconds

19

* @param emptyFetchDelayMillis milliseconds to sleep after empty fetch

20

* @param retryStrategy strategy for retrying on failures

21

* @param metricsContext metrics context for monitoring

22

*/

23

protected AbstractMessagingSubscriberService(TopicId topicId, boolean transactionalFetch,

24

int fetchSize, int txTimeoutSeconds, int maxTxTimeoutSeconds,

25

long emptyFetchDelayMillis, RetryStrategy retryStrategy, MetricsContext metricsContext);

26

27

/** Returns the TopicId that this service is fetching from */

28

protected final TopicId getTopicId();

29

30

/** Returns the MessagingContext for interacting with TMS */

31

protected abstract MessagingContext getMessagingContext();

32

33

/** Returns the Transactional for executing tasks in transaction */

34

protected abstract Transactional getTransactional();

35

36

/** Loads last persisted message id (called from transaction) */

37

protected abstract String loadMessageId(DatasetContext datasetContext) throws Exception;

38

39

/** Persists message id (called from same transaction as processMessages) */

40

protected abstract void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception;

41

42

/** Decodes raw Message into object of type T */

43

protected abstract T decodeMessage(Message message) throws Exception;

44

45

/** Processes decoded messages (called from transaction) */

46

protected abstract void processMessages(DatasetContext datasetContext,

47

Iterator<ImmutablePair<String, T>> messages) throws Exception;

48

49

/** Whether message should run in separate transaction (expensive operations) */

50

protected boolean shouldRunInSeparateTx(T message);

51

52

/** Post processing after batch completion (outside transaction) */

53

protected void postProcess();

54

}

55

```

56

57

**Usage Examples:**

58

59

```java

60

import co.cask.cdap.messaging.subscriber.AbstractMessagingSubscriberService;

61

import co.cask.cdap.api.messaging.MessagingContext;

62

import co.cask.cdap.api.Transactional;

63

import co.cask.cdap.api.data.DatasetContext;

64

import co.cask.cdap.common.utils.ImmutablePair;

65

66

public class UserEventProcessor extends AbstractMessagingSubscriberService<UserEvent> {

67

68

private final MessagingContext messagingContext;

69

private final Transactional transactional;

70

private final UserEventDataset eventDataset;

71

72

public UserEventProcessor(TopicId topicId, MessagingContext messagingContext,

73

Transactional transactional, UserEventDataset eventDataset) {

74

super(topicId,

75

true, // transactional fetch

76

100, // fetch size

77

30, // tx timeout seconds

78

300, // max tx timeout seconds

79

1000, // empty fetch delay ms

80

RetryStrategies.exponentialDelay(1000, 60000),

81

metricsContext);

82

83

this.messagingContext = messagingContext;

84

this.transactional = transactional;

85

this.eventDataset = eventDataset;

86

}

87

88

@Override

89

protected MessagingContext getMessagingContext() {

90

return messagingContext;

91

}

92

93

@Override

94

protected Transactional getTransactional() {

95

return transactional;

96

}

97

98

@Override

99

protected String loadMessageId(DatasetContext datasetContext) throws Exception {

100

return eventDataset.getLastProcessedMessageId();

101

}

102

103

@Override

104

protected void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception {

105

eventDataset.setLastProcessedMessageId(messageId);

106

}

107

108

@Override

109

protected UserEvent decodeMessage(Message message) throws Exception {

110

return gson.fromJson(message.getPayloadAsString(), UserEvent.class);

111

}

112

113

@Override

114

protected void processMessages(DatasetContext datasetContext,

115

Iterator<ImmutablePair<String, UserEvent>> messages) throws Exception {

116

while (messages.hasNext()) {

117

ImmutablePair<String, UserEvent> pair = messages.next();

118

String messageId = pair.getFirst();

119

UserEvent event = pair.getSecond();

120

121

// Process the event

122

processUserEvent(event);

123

124

// Event is automatically committed with message ID

125

}

126

}

127

128

private void processUserEvent(UserEvent event) {

129

// Business logic for processing user events

130

System.out.println("Processing event: " + event.getType() + " for user " + event.getUserId());

131

}

132

}

133

```

134

135

### Service Lifecycle Management

136

137

The subscriber service extends AbstractRetryableScheduledService for robust lifecycle management:

138

139

```java

140

// Start the subscriber service

141

UserEventProcessor processor = new UserEventProcessor(topicId, messagingContext,

142

transactional, eventDataset);

143

144

// Start service (begins consuming messages)

145

processor.startAsync();

146

147

// Wait for service to be running

148

processor.awaitRunning();

149

150

// Stop service gracefully

151

processor.stopAsync();

152

processor.awaitTerminated();

153

```

154

155

### Advanced Processing Patterns

156

157

#### Separate Transaction for Expensive Operations

158

159

Handle expensive operations in separate transactions to avoid timeouts:

160

161

```java

162

public class HeavyProcessingSubscriber extends AbstractMessagingSubscriberService<HeavyTask> {

163

164

@Override

165

protected boolean shouldRunInSeparateTx(HeavyTask task) {

166

// Run expensive tasks in separate transactions

167

return task.getEstimatedProcessingTimeMs() > 10000;

168

}

169

170

@Override

171

protected void processMessages(DatasetContext datasetContext,

172

Iterator<ImmutablePair<String, HeavyTask>> messages) throws Exception {

173

while (messages.hasNext()) {

174

ImmutablePair<String, HeavyTask> pair = messages.next();

175

HeavyTask task = pair.getSecond();

176

177

if (shouldRunInSeparateTx(task)) {

178

// This will be processed in its own transaction

179

processHeavyTask(task);

180

} else {

181

// Process normally in current transaction

182

processLightTask(task);

183

}

184

}

185

}

186

}

187

```

188

189

#### Post-Processing Hook

190

191

Handle post-processing operations outside of transactions:

192

193

```java

194

public class EventAggregatorSubscriber extends AbstractMessagingSubscriberService<Event> {

195

196

private final List<Event> processedEvents = new ArrayList<>();

197

198

@Override

199

protected void processMessages(DatasetContext datasetContext,

200

Iterator<ImmutablePair<String, Event>> messages) throws Exception {

201

while (messages.hasNext()) {

202

Event event = messages.next().getSecond();

203

processedEvents.add(event);

204

205

// Store event in dataset (transactional)

206

eventDataset.addEvent(event);

207

}

208

}

209

210

@Override

211

protected void postProcess() {

212

try {

213

// Send aggregated metrics (non-transactional)

214

if (!processedEvents.isEmpty()) {

215

sendAggregatedMetrics(processedEvents);

216

processedEvents.clear();

217

}

218

} catch (Exception e) {

219

LOG.warn("Failed to send aggregated metrics", e);

220

}

221

}

222

}

223

```

224

225

### Error Handling and Retry

226

227

Built-in error handling with configurable retry strategies:

228

229

```java

230

import co.cask.cdap.common.service.RetryStrategy;

231

import co.cask.cdap.common.service.RetryStrategies;

232

233

public class RobustEventProcessor extends AbstractMessagingSubscriberService<Event> {

234

235

public RobustEventProcessor() {

236

super(topicId,

237

true, // transactional fetch

238

50, // smaller batch size for reliability

239

60, // 1 minute tx timeout

240

600, // 10 minute max tx timeout

241

5000, // 5 second delay on empty fetch

242

RetryStrategies.exponentialDelay(1000, 300000), // 1s to 5min backoff

243

metricsContext);

244

}

245

246

@Override

247

protected Event decodeMessage(Message message) throws Exception {

248

try {

249

return gson.fromJson(message.getPayloadAsString(), Event.class);

250

} catch (JsonSyntaxException e) {

251

// Log and skip malformed messages

252

LOG.warn("Skipping malformed message: {}", message.getId(), e);

253

throw e; // Will cause message to be skipped

254

}

255

}

256

257

@Override

258

protected void processMessages(DatasetContext datasetContext,

259

Iterator<ImmutablePair<String, Event>> messages) throws Exception {

260

int processed = 0;

261

while (messages.hasNext()) {

262

try {

263

Event event = messages.next().getSecond();

264

processEvent(event);

265

processed++;

266

} catch (Exception e) {

267

LOG.error("Failed to process event, aborting batch", e);

268

throw e; // Will trigger retry of entire batch

269

}

270

}

271

272

LOG.info("Successfully processed {} events", processed);

273

}

274

}

275

```

276

277

### Metrics and Monitoring

278

279

Built-in metrics collection for monitoring subscriber health:

280

281

```java

282

import co.cask.cdap.api.metrics.MetricsContext;

283

284

public class MonitoredSubscriber extends AbstractMessagingSubscriberService<Event> {

285

286

public MonitoredSubscriber(MetricsContext metricsContext) {

287

super(topicId, true, 100, 30, 300, 1000,

288

RetryStrategies.exponentialDelay(1000, 60000),

289

metricsContext); // Metrics context provided to parent

290

}

291

292

@Override

293

protected void processMessages(DatasetContext datasetContext,

294

Iterator<ImmutablePair<String, Event>> messages) throws Exception {

295

int processedCount = 0;

296

long startTime = System.currentTimeMillis();

297

298

while (messages.hasNext()) {

299

Event event = messages.next().getSecond();

300

processEvent(event);

301

processedCount++;

302

}

303

304

// Custom metrics (parent also emits built-in metrics)

305

long processingTime = System.currentTimeMillis() - startTime;

306

MetricsContext metrics = getMetricsContext();

307

metrics.increment("events.processed", processedCount);

308

metrics.gauge("processing.time.ms", processingTime);

309

}

310

}

311

```

312

313

## Integration Patterns

314

315

### Dataset Integration

316

317

Integrate with CDAP datasets for state persistence:

318

319

```java

320

public class DatasetIntegratedSubscriber extends AbstractMessagingSubscriberService<Event> {

321

322

private final String datasetName;

323

324

@Override

325

protected String loadMessageId(DatasetContext datasetContext) throws Exception {

326

KeyValueTable stateTable = datasetContext.getDataset(datasetName);

327

byte[] messageIdBytes = stateTable.read("last.message.id");

328

return messageIdBytes != null ? Bytes.toString(messageIdBytes) : null;

329

}

330

331

@Override

332

protected void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception {

333

KeyValueTable stateTable = datasetContext.getDataset(datasetName);

334

stateTable.write("last.message.id", Bytes.toBytes(messageId));

335

}

336

337

@Override

338

protected void processMessages(DatasetContext datasetContext,

339

Iterator<ImmutablePair<String, Event>> messages) throws Exception {

340

Table eventTable = datasetContext.getDataset("events");

341

342

while (messages.hasNext()) {

343

Event event = messages.next().getSecond();

344

345

// Store event in dataset

346

Put put = new Put(Bytes.toBytes(event.getId()));

347

put.add("data", "payload", Bytes.toBytes(gson.toJson(event)));

348

put.add("data", "timestamp", Bytes.toBytes(System.currentTimeMillis()));

349

eventTable.put(put);

350

}

351

}

352

}

353

```

354

355

### Service Discovery Integration

356

357

Use with CDAP service discovery and dependency injection:

358

359

```java

360

public class ServiceDiscoverySubscriber extends AbstractMessagingSubscriberService<Event> {

361

362

private final MessagingContext messagingContext;

363

private final Transactional transactional;

364

365

@Inject

366

public ServiceDiscoverySubscriber(@Named("event.topic") TopicId topicId,

367

MessagingContext messagingContext,

368

Transactional transactional,

369

MetricsContext metricsContext) {

370

super(topicId, true, 100, 30, 300, 1000,

371

RetryStrategies.exponentialDelay(1000, 60000),

372

metricsContext);

373

374

this.messagingContext = messagingContext;

375

this.transactional = transactional;

376

}

377

378

// Implementation methods...

379

}

380

```

381

382

### Multi-Topic Processing

383

384

Handle multiple topics with separate subscriber instances:

385

386

```java

387

public class MultiTopicProcessor {

388

389

private final List<AbstractMessagingSubscriberService<? extends Event>> subscribers;

390

391

public MultiTopicProcessor() {

392

this.subscribers = Arrays.asList(

393

new UserEventProcessor(userEventTopic, messagingContext, transactional, datasets),

394

new SystemEventProcessor(systemEventTopic, messagingContext, transactional, datasets),

395

new AuditEventProcessor(auditEventTopic, messagingContext, transactional, datasets)

396

);

397

}

398

399

public void start() {

400

for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {

401

subscriber.startAsync();

402

}

403

404

// Wait for all to be running

405

for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {

406

subscriber.awaitRunning();

407

}

408

}

409

410

public void stop() {

411

for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {

412

subscriber.stopAsync();

413

}

414

415

// Wait for all to stop

416

for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {

417

subscriber.awaitTerminated();

418

}

419

}

420

}

421

```

422

423

## Performance Tuning

424

425

### Batch Size Optimization

426

427

```java

428

// For high-throughput topics

429

public class HighThroughputSubscriber extends AbstractMessagingSubscriberService<Event> {

430

public HighThroughputSubscriber() {

431

super(topicId,

432

true, // transactional

433

1000, // large batch size

434

120, // longer timeout

435

600, // max timeout

436

100, // short empty delay

437

retryStrategy, metricsContext);

438

}

439

}

440

441

// For low-latency processing

442

public class LowLatencySubscriber extends AbstractMessagingSubscriberService<Event> {

443

public LowLatencySubscriber() {

444

super(topicId,

445

false, // non-transactional for speed

446

10, // small batch size

447

10, // short timeout

448

30, // short max timeout

449

100, // quick retry on empty

450

retryStrategy, metricsContext);

451

}

452

}

453

```

454

455

### Transaction Timeout Handling

456

457

The service automatically increases transaction timeouts on failures:

458

459

```java

460

// Initial timeout: 30 seconds

461

// On TransactionNotInProgressException:

462

// - Retry with 60 seconds

463

// - Then 120 seconds

464

// - Up to maxTxTimeoutSeconds (300)

465

// Then fails if still timing out

466

```

467

468

This automatic timeout scaling helps handle variable processing loads without manual intervention.