0
# High-Level Consumers
1
2
Abstract subscriber service providing automatic message processing, failure handling, transaction management, and message ID persistence. Simplifies building robust, long-running message consumers with automatic retry and state management.
3
4
## Capabilities
5
6
### Abstract Messaging Subscriber Service
7
8
Base class for implementing reliable message consumers with built-in failure handling, transaction support, and progress tracking.
9
10
```java { .api }
11
abstract class AbstractMessagingSubscriberService<T> extends AbstractRetryableScheduledService {
12
/**
13
* Constructor for messaging subscriber service.
14
* @param topicId the topic to consume from
15
* @param transactionalFetch true to perform fetching inside transaction
16
* @param fetchSize number of messages to fetch in each batch
17
* @param txTimeoutSeconds transaction timeout in seconds
18
* @param maxTxTimeoutSeconds max transaction timeout in seconds
19
* @param emptyFetchDelayMillis milliseconds to sleep after empty fetch
20
* @param retryStrategy strategy for retrying on failures
21
* @param metricsContext metrics context for monitoring
22
*/
23
protected AbstractMessagingSubscriberService(TopicId topicId, boolean transactionalFetch,
24
int fetchSize, int txTimeoutSeconds, int maxTxTimeoutSeconds,
25
long emptyFetchDelayMillis, RetryStrategy retryStrategy, MetricsContext metricsContext);
26
27
/** Returns the TopicId that this service is fetching from */
28
protected final TopicId getTopicId();
29
30
/** Returns the MessagingContext for interacting with TMS */
31
protected abstract MessagingContext getMessagingContext();
32
33
/** Returns the Transactional for executing tasks in transaction */
34
protected abstract Transactional getTransactional();
35
36
/** Loads last persisted message id (called from transaction) */
37
protected abstract String loadMessageId(DatasetContext datasetContext) throws Exception;
38
39
/** Persists message id (called from same transaction as processMessages) */
40
protected abstract void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception;
41
42
/** Decodes raw Message into object of type T */
43
protected abstract T decodeMessage(Message message) throws Exception;
44
45
/** Processes decoded messages (called from transaction) */
46
protected abstract void processMessages(DatasetContext datasetContext,
47
Iterator<ImmutablePair<String, T>> messages) throws Exception;
48
49
/** Whether message should run in separate transaction (expensive operations) */
50
protected boolean shouldRunInSeparateTx(T message);
51
52
/** Post processing after batch completion (outside transaction) */
53
protected void postProcess();
54
}
55
```
56
57
**Usage Examples:**
58
59
```java
60
import co.cask.cdap.messaging.subscriber.AbstractMessagingSubscriberService;
61
import co.cask.cdap.api.messaging.MessagingContext;
62
import co.cask.cdap.api.Transactional;
63
import co.cask.cdap.api.data.DatasetContext;
64
import co.cask.cdap.common.utils.ImmutablePair;
65
66
public class UserEventProcessor extends AbstractMessagingSubscriberService<UserEvent> {
67
68
private final MessagingContext messagingContext;
69
private final Transactional transactional;
70
private final UserEventDataset eventDataset;
71
72
public UserEventProcessor(TopicId topicId, MessagingContext messagingContext,
73
Transactional transactional, UserEventDataset eventDataset) {
74
super(topicId,
75
true, // transactional fetch
76
100, // fetch size
77
30, // tx timeout seconds
78
300, // max tx timeout seconds
79
1000, // empty fetch delay ms
80
RetryStrategies.exponentialDelay(1000, 60000),
81
metricsContext);
82
83
this.messagingContext = messagingContext;
84
this.transactional = transactional;
85
this.eventDataset = eventDataset;
86
}
87
88
@Override
89
protected MessagingContext getMessagingContext() {
90
return messagingContext;
91
}
92
93
@Override
94
protected Transactional getTransactional() {
95
return transactional;
96
}
97
98
@Override
99
protected String loadMessageId(DatasetContext datasetContext) throws Exception {
100
return eventDataset.getLastProcessedMessageId();
101
}
102
103
@Override
104
protected void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception {
105
eventDataset.setLastProcessedMessageId(messageId);
106
}
107
108
@Override
109
protected UserEvent decodeMessage(Message message) throws Exception {
110
return gson.fromJson(message.getPayloadAsString(), UserEvent.class);
111
}
112
113
@Override
114
protected void processMessages(DatasetContext datasetContext,
115
Iterator<ImmutablePair<String, UserEvent>> messages) throws Exception {
116
while (messages.hasNext()) {
117
ImmutablePair<String, UserEvent> pair = messages.next();
118
String messageId = pair.getFirst();
119
UserEvent event = pair.getSecond();
120
121
// Process the event
122
processUserEvent(event);
123
124
// Event is automatically committed with message ID
125
}
126
}
127
128
private void processUserEvent(UserEvent event) {
129
// Business logic for processing user events
130
System.out.println("Processing event: " + event.getType() + " for user " + event.getUserId());
131
}
132
}
133
```
134
135
### Service Lifecycle Management
136
137
The subscriber service extends AbstractRetryableScheduledService for robust lifecycle management:
138
139
```java
140
// Start the subscriber service
141
UserEventProcessor processor = new UserEventProcessor(topicId, messagingContext,
142
transactional, eventDataset);
143
144
// Start service (begins consuming messages)
145
processor.startAsync();
146
147
// Wait for service to be running
148
processor.awaitRunning();
149
150
// Stop service gracefully
151
processor.stopAsync();
152
processor.awaitTerminated();
153
```
154
155
### Advanced Processing Patterns
156
157
#### Separate Transaction for Expensive Operations
158
159
Handle expensive operations in separate transactions to avoid timeouts:
160
161
```java
162
public class HeavyProcessingSubscriber extends AbstractMessagingSubscriberService<HeavyTask> {
163
164
@Override
165
protected boolean shouldRunInSeparateTx(HeavyTask task) {
166
// Run expensive tasks in separate transactions
167
return task.getEstimatedProcessingTimeMs() > 10000;
168
}
169
170
@Override
171
protected void processMessages(DatasetContext datasetContext,
172
Iterator<ImmutablePair<String, HeavyTask>> messages) throws Exception {
173
while (messages.hasNext()) {
174
ImmutablePair<String, HeavyTask> pair = messages.next();
175
HeavyTask task = pair.getSecond();
176
177
if (shouldRunInSeparateTx(task)) {
178
// This will be processed in its own transaction
179
processHeavyTask(task);
180
} else {
181
// Process normally in current transaction
182
processLightTask(task);
183
}
184
}
185
}
186
}
187
```
188
189
#### Post-Processing Hook
190
191
Handle post-processing operations outside of transactions:
192
193
```java
194
public class EventAggregatorSubscriber extends AbstractMessagingSubscriberService<Event> {
195
196
private final List<Event> processedEvents = new ArrayList<>();
197
198
@Override
199
protected void processMessages(DatasetContext datasetContext,
200
Iterator<ImmutablePair<String, Event>> messages) throws Exception {
201
while (messages.hasNext()) {
202
Event event = messages.next().getSecond();
203
processedEvents.add(event);
204
205
// Store event in dataset (transactional)
206
eventDataset.addEvent(event);
207
}
208
}
209
210
@Override
211
protected void postProcess() {
212
try {
213
// Send aggregated metrics (non-transactional)
214
if (!processedEvents.isEmpty()) {
215
sendAggregatedMetrics(processedEvents);
216
processedEvents.clear();
217
}
218
} catch (Exception e) {
219
LOG.warn("Failed to send aggregated metrics", e);
220
}
221
}
222
}
223
```
224
225
### Error Handling and Retry
226
227
Built-in error handling with configurable retry strategies:
228
229
```java
230
import co.cask.cdap.common.service.RetryStrategy;
231
import co.cask.cdap.common.service.RetryStrategies;
232
233
public class RobustEventProcessor extends AbstractMessagingSubscriberService<Event> {
234
235
public RobustEventProcessor() {
236
super(topicId,
237
true, // transactional fetch
238
50, // smaller batch size for reliability
239
60, // 1 minute tx timeout
240
600, // 10 minute max tx timeout
241
5000, // 5 second delay on empty fetch
242
RetryStrategies.exponentialDelay(1000, 300000), // 1s to 5min backoff
243
metricsContext);
244
}
245
246
@Override
247
protected Event decodeMessage(Message message) throws Exception {
248
try {
249
return gson.fromJson(message.getPayloadAsString(), Event.class);
250
} catch (JsonSyntaxException e) {
251
// Log and skip malformed messages
252
LOG.warn("Skipping malformed message: {}", message.getId(), e);
253
throw e; // Will cause message to be skipped
254
}
255
}
256
257
@Override
258
protected void processMessages(DatasetContext datasetContext,
259
Iterator<ImmutablePair<String, Event>> messages) throws Exception {
260
int processed = 0;
261
while (messages.hasNext()) {
262
try {
263
Event event = messages.next().getSecond();
264
processEvent(event);
265
processed++;
266
} catch (Exception e) {
267
LOG.error("Failed to process event, aborting batch", e);
268
throw e; // Will trigger retry of entire batch
269
}
270
}
271
272
LOG.info("Successfully processed {} events", processed);
273
}
274
}
275
```
276
277
### Metrics and Monitoring
278
279
Built-in metrics collection for monitoring subscriber health:
280
281
```java
282
import co.cask.cdap.api.metrics.MetricsContext;
283
284
public class MonitoredSubscriber extends AbstractMessagingSubscriberService<Event> {
285
286
public MonitoredSubscriber(MetricsContext metricsContext) {
287
super(topicId, true, 100, 30, 300, 1000,
288
RetryStrategies.exponentialDelay(1000, 60000),
289
metricsContext); // Metrics context provided to parent
290
}
291
292
@Override
293
protected void processMessages(DatasetContext datasetContext,
294
Iterator<ImmutablePair<String, Event>> messages) throws Exception {
295
int processedCount = 0;
296
long startTime = System.currentTimeMillis();
297
298
while (messages.hasNext()) {
299
Event event = messages.next().getSecond();
300
processEvent(event);
301
processedCount++;
302
}
303
304
// Custom metrics (parent also emits built-in metrics)
305
long processingTime = System.currentTimeMillis() - startTime;
306
MetricsContext metrics = getMetricsContext();
307
metrics.increment("events.processed", processedCount);
308
metrics.gauge("processing.time.ms", processingTime);
309
}
310
}
311
```
312
313
## Integration Patterns
314
315
### Dataset Integration
316
317
Integrate with CDAP datasets for state persistence:
318
319
```java
320
public class DatasetIntegratedSubscriber extends AbstractMessagingSubscriberService<Event> {
321
322
private final String datasetName;
323
324
@Override
325
protected String loadMessageId(DatasetContext datasetContext) throws Exception {
326
KeyValueTable stateTable = datasetContext.getDataset(datasetName);
327
byte[] messageIdBytes = stateTable.read("last.message.id");
328
return messageIdBytes != null ? Bytes.toString(messageIdBytes) : null;
329
}
330
331
@Override
332
protected void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception {
333
KeyValueTable stateTable = datasetContext.getDataset(datasetName);
334
stateTable.write("last.message.id", Bytes.toBytes(messageId));
335
}
336
337
@Override
338
protected void processMessages(DatasetContext datasetContext,
339
Iterator<ImmutablePair<String, Event>> messages) throws Exception {
340
Table eventTable = datasetContext.getDataset("events");
341
342
while (messages.hasNext()) {
343
Event event = messages.next().getSecond();
344
345
// Store event in dataset
346
Put put = new Put(Bytes.toBytes(event.getId()));
347
put.add("data", "payload", Bytes.toBytes(gson.toJson(event)));
348
put.add("data", "timestamp", Bytes.toBytes(System.currentTimeMillis()));
349
eventTable.put(put);
350
}
351
}
352
}
353
```
354
355
### Service Discovery Integration
356
357
Use with CDAP service discovery and dependency injection:
358
359
```java
360
public class ServiceDiscoverySubscriber extends AbstractMessagingSubscriberService<Event> {
361
362
private final MessagingContext messagingContext;
363
private final Transactional transactional;
364
365
@Inject
366
public ServiceDiscoverySubscriber(@Named("event.topic") TopicId topicId,
367
MessagingContext messagingContext,
368
Transactional transactional,
369
MetricsContext metricsContext) {
370
super(topicId, true, 100, 30, 300, 1000,
371
RetryStrategies.exponentialDelay(1000, 60000),
372
metricsContext);
373
374
this.messagingContext = messagingContext;
375
this.transactional = transactional;
376
}
377
378
// Implementation methods...
379
}
380
```
381
382
### Multi-Topic Processing
383
384
Handle multiple topics with separate subscriber instances:
385
386
```java
387
public class MultiTopicProcessor {
388
389
private final List<AbstractMessagingSubscriberService<? extends Event>> subscribers;
390
391
public MultiTopicProcessor() {
392
this.subscribers = Arrays.asList(
393
new UserEventProcessor(userEventTopic, messagingContext, transactional, datasets),
394
new SystemEventProcessor(systemEventTopic, messagingContext, transactional, datasets),
395
new AuditEventProcessor(auditEventTopic, messagingContext, transactional, datasets)
396
);
397
}
398
399
public void start() {
400
for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
401
subscriber.startAsync();
402
}
403
404
// Wait for all to be running
405
for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
406
subscriber.awaitRunning();
407
}
408
}
409
410
public void stop() {
411
for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
412
subscriber.stopAsync();
413
}
414
415
// Wait for all to stop
416
for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
417
subscriber.awaitTerminated();
418
}
419
}
420
}
421
```
422
423
## Performance Tuning
424
425
### Batch Size Optimization
426
427
```java
428
// For high-throughput topics
429
public class HighThroughputSubscriber extends AbstractMessagingSubscriberService<Event> {
430
public HighThroughputSubscriber() {
431
super(topicId,
432
true, // transactional
433
1000, // large batch size
434
120, // longer timeout
435
600, // max timeout
436
100, // short empty delay
437
retryStrategy, metricsContext);
438
}
439
}
440
441
// For low-latency processing
442
public class LowLatencySubscriber extends AbstractMessagingSubscriberService<Event> {
443
public LowLatencySubscriber() {
444
super(topicId,
445
false, // non-transactional for speed
446
10, // small batch size
447
10, // short timeout
448
30, // short max timeout
449
100, // quick retry on empty
450
retryStrategy, metricsContext);
451
}
452
}
453
```
454
455
### Transaction Timeout Handling
456
457
The service automatically increases transaction timeouts on failures:
458
459
```java
460
// Initial timeout: 30 seconds
461
// On TransactionNotInProgressException:
462
// - Retry with 60 seconds
463
// - Then 120 seconds
464
// - Up to maxTxTimeoutSeconds (300)
465
// Then fails if still timing out
466
```
467
468
This automatic timeout scaling helps handle variable processing loads without manual intervention.