0
# Data Management
1
2
CDAP provides a comprehensive data management framework with support for various dataset types, messaging systems, and data access patterns. The framework abstracts underlying storage technologies while providing consistent APIs for data operations across different storage systems.
3
4
## Dataset Framework
5
6
The dataset framework is the foundation for data storage and access in CDAP, providing a unified abstraction layer over different storage systems.
7
8
### Core Dataset Interfaces
9
10
```java { .api }
11
import io.cdap.cdap.api.dataset.*;
12
import io.cdap.cdap.api.dataset.table.*;
13
14
// Base dataset interface
15
public interface Dataset {
16
void close() throws IOException;
17
}
18
19
// Dataset context for accessing datasets
20
public interface DatasetContext {
21
<T extends Dataset> T getDataset(String name) throws DataSetException;
22
<T extends Dataset> T getDataset(String namespace, String name) throws DataSetException;
23
void releaseDataset(Dataset dataset);
24
void discardDataset(Dataset dataset);
25
}
26
27
// Dataset management interface
28
public interface DatasetManager {
29
boolean datasetExists(String name) throws DataSetException;
30
DatasetProperties getDatasetProperties(String name) throws DataSetException;
31
void createDataset(String name, String type, DatasetProperties properties) throws DataSetException;
32
void updateDataset(String name, DatasetProperties properties) throws DataSetException;
33
void dropDataset(String name) throws DataSetException;
34
void truncateDataset(String name) throws DataSetException;
35
}
36
37
// Dataset configurer for application setup
38
public interface DatasetConfigurer {
39
void createDataset(String datasetName, String typeName, DatasetProperties properties);
40
void createDataset(String datasetName, String typeName);
41
void createDataset(String datasetName, Class<? extends Dataset> datasetClass, DatasetProperties props);
42
void createDataset(String datasetName, Class<? extends Dataset> datasetClass);
43
void addDatasetModule(String moduleName, Class<? extends DatasetModule> moduleClass);
44
void addDatasetType(Class<? extends Dataset> datasetClass);
45
}
46
```
47
48
### Dataset Properties and Configuration
49
50
```java { .api }
51
// Dataset properties container
52
public class DatasetProperties {
53
public static Builder builder() { return new Builder(); }
54
public Map<String, String> getProperties() { /* returns properties map */ }
55
56
public static class Builder {
57
public Builder add(String key, String value) { /* add property */ return this; }
58
public Builder addAll(Map<String, String> properties) { /* add all properties */ return this; }
59
public DatasetProperties build() { /* build properties */ }
60
}
61
}
62
63
// Dataset specification
64
public final class DatasetSpecification {
65
public String getName() { /* returns dataset name */ }
66
public String getType() { /* returns dataset type */ }
67
public Map<String, String> getProperties() { /* returns properties */ }
68
public Map<String, DatasetSpecification> getSpecifications() { /* returns nested specs */ }
69
}
70
71
// Dataset instantiation exception
72
public class DatasetInstantiationException extends RuntimeException {
73
public DatasetInstantiationException(String message) { super(message); }
74
public DatasetInstantiationException(String message, Throwable cause) { super(message, cause); }
75
}
76
```
77
78
## Table Dataset
79
80
The Table dataset provides a flexible, schema-free NoSQL storage abstraction with support for complex queries and batch operations.
81
82
### Table Interface and Operations
83
84
```java { .api }
85
import io.cdap.cdap.api.dataset.table.*;
86
import io.cdap.cdap.api.data.batch.*;
87
88
// Table interface - core NoSQL storage
89
@Deprecated // Note: table based datasets will be removed in a future version
90
public interface Table extends BatchReadable<byte[], Row>, BatchWritable<byte[], Put>,
91
Dataset, RecordScannable<StructuredRecord>, RecordWritable<StructuredRecord> {
92
93
String TYPE = "table";
94
95
// Table properties
96
String PROPERTY_TTL = "dataset.table.ttl";
97
String PROPERTY_READLESS_INCREMENT = "dataset.table.readless.increment";
98
String PROPERTY_CONFLICT_DETECTION = "dataset.table.conflict.detection";
99
100
// Basic operations
101
Row get(Get get);
102
Row get(byte[] row);
103
Row get(byte[] row, byte[][] columns);
104
Row get(byte[] row, byte[] startColumn, byte[] stopColumn, int limit);
105
106
void put(Put put);
107
void put(byte[] row, byte[] column, byte[] value);
108
void put(byte[] row, byte[][] columns, byte[][] values);
109
110
boolean delete(Delete delete);
111
void delete(byte[] row);
112
void delete(byte[] row, byte[] column);
113
void delete(byte[] row, byte[][] columns);
114
115
// Scanning operations
116
Scanner scan(Scan scan);
117
Scanner scan(byte[] startRow, byte[] stopRow);
118
119
// Increment operations
120
Row increment(Increment increment);
121
long increment(byte[] row, byte[] column, long amount);
122
Row increment(byte[] row, byte[][] columns, long[] amounts);
123
124
// Batch operations
125
void write(byte[] key, Put value) throws IOException;
126
127
// Compare and swap operations
128
boolean compareAndSwap(byte[] row, byte[] column, byte[] expectedValue, byte[] newValue);
129
}
130
131
// Row representation
132
public interface Row {
133
byte[] getRow();
134
135
// Column access
136
boolean isEmpty();
137
int size();
138
Map<byte[], byte[]> getColumns();
139
byte[] get(byte[] column);
140
byte[] get(String column);
141
142
// Typed access methods
143
Boolean getBoolean(byte[] column);
144
Boolean getBoolean(String column);
145
Integer getInt(byte[] column);
146
Integer getInt(String column);
147
Long getLong(byte[] column);
148
Long getLong(String column);
149
Double getDouble(byte[] column);
150
Double getDouble(String column);
151
String getString(byte[] column);
152
String getString(String column);
153
}
154
```
155
156
### Table Operations Examples
157
158
```java { .api }
159
// Basic table operations
160
public class UserProfileService extends AbstractHttpServiceHandler {
161
162
@UseDataSet("user_profiles")
163
private Table userProfiles;
164
165
@GET
166
@Path("/user/{id}")
167
public void getUser(HttpServiceRequest request, HttpServiceResponder responder,
168
@PathParam("id") String userId) {
169
try {
170
Row row = userProfiles.get(Bytes.toBytes(userId));
171
if (row.isEmpty()) {
172
responder.sendError(404, "User not found");
173
return;
174
}
175
176
// Build user profile JSON
177
JsonObject profile = new JsonObject();
178
profile.addProperty("id", userId);
179
profile.addProperty("name", row.getString("name"));
180
profile.addProperty("email", row.getString("email"));
181
profile.addProperty("created", row.getLong("created"));
182
profile.addProperty("lastLogin", row.getLong("lastLogin"));
183
184
responder.sendJson(200, profile);
185
} catch (Exception e) {
186
responder.sendError(500, "Error retrieving user: " + e.getMessage());
187
}
188
}
189
190
@POST
191
@Path("/user")
192
public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {
193
try {
194
String content = Charset.forName("UTF-8").decode(
195
ByteBuffer.wrap(request.getContent())).toString();
196
JsonObject userJson = new JsonParser().parse(content).getAsJsonObject();
197
198
String userId = userJson.get("id").getAsString();
199
String name = userJson.get("name").getAsString();
200
String email = userJson.get("email").getAsString();
201
202
// Create put operation
203
Put put = new Put(Bytes.toBytes(userId));
204
put.add("name", name);
205
put.add("email", email);
206
put.add("created", System.currentTimeMillis());
207
put.add("lastLogin", 0L);
208
put.add("status", "active");
209
210
userProfiles.put(put);
211
responder.sendString(201, "User created successfully", "text/plain");
212
213
} catch (Exception e) {
214
responder.sendError(400, "Error creating user: " + e.getMessage());
215
}
216
}
217
218
@PUT
219
@Path("/user/{id}/login")
220
public void recordLogin(HttpServiceRequest request, HttpServiceResponder responder,
221
@PathParam("id") String userId) {
222
try {
223
// Use increment for login count and update last login time
224
userProfiles.increment(Bytes.toBytes(userId), Bytes.toBytes("loginCount"), 1L);
225
userProfiles.put(Bytes.toBytes(userId), Bytes.toBytes("lastLogin"),
226
Bytes.toBytes(System.currentTimeMillis()));
227
228
responder.sendString(200, "Login recorded", "text/plain");
229
} catch (Exception e) {
230
responder.sendError(500, "Error recording login: " + e.getMessage());
231
}
232
}
233
}
234
235
// Complex table scanning and filtering
236
public class UserAnalyticsMapReduce extends AbstractMapReduce {
237
238
public static class UserStatsMapper extends Mapper<byte[], Row, Text, UserStats> {
239
240
@Override
241
protected void map(byte[] key, Row row, Context context)
242
throws IOException, InterruptedException {
243
244
String userId = Bytes.toString(key);
245
String status = row.getString("status");
246
Long created = row.getLong("created");
247
Long lastLogin = row.getLong("lastLogin");
248
Long loginCount = row.getLong("loginCount");
249
250
if (status != null && status.equals("active")) {
251
UserStats stats = new UserStats();
252
stats.setUserId(userId);
253
stats.setCreated(created != null ? created : 0L);
254
stats.setLastLogin(lastLogin != null ? lastLogin : 0L);
255
stats.setLoginCount(loginCount != null ? loginCount : 0L);
256
257
// Calculate activity metrics
258
long daysSinceCreation = (System.currentTimeMillis() - stats.getCreated()) / (24 * 60 * 60 * 1000);
259
long daysSinceLogin = (System.currentTimeMillis() - stats.getLastLogin()) / (24 * 60 * 60 * 1000);
260
261
stats.setDaysSinceCreation(daysSinceCreation);
262
stats.setDaysSinceLogin(daysSinceLogin);
263
264
// Categorize user activity level
265
String activityLevel;
266
if (daysSinceLogin <= 7) {
267
activityLevel = "highly_active";
268
} else if (daysSinceLogin <= 30) {
269
activityLevel = "moderately_active";
270
} else if (daysSinceLogin <= 90) {
271
activityLevel = "low_activity";
272
} else {
273
activityLevel = "inactive";
274
}
275
276
context.write(new Text(activityLevel), stats);
277
}
278
}
279
}
280
}
281
```
282
283
## Key-Value Table
284
285
A simplified key-value storage interface built on top of Table:
286
287
```java { .api }
288
import io.cdap.cdap.api.dataset.lib.*;
289
290
// Key-Value table interface
291
@Deprecated // table based datasets will be removed in a future version
292
public interface KeyValueTable extends BatchReadable<byte[], KeyValue<byte[], byte[]>>,
293
BatchWritable<byte[], byte[]>, Dataset {
294
295
String TYPE = "keyValueTable";
296
297
// Basic operations
298
@ReadOnly
299
@Nullable
300
byte[] read(String key);
301
302
@ReadOnly
303
@Nullable
304
byte[] read(byte[] key);
305
306
@WriteOnly
307
void write(String key, String value);
308
309
@WriteOnly
310
void write(String key, byte[] value);
311
312
@WriteOnly
313
void write(byte[] key, byte[] value);
314
315
@WriteOnly
316
void increment(byte[] key, long amount);
317
318
@WriteOnly
319
void increment(String key, long amount);
320
321
@WriteOnly
322
void delete(String key);
323
324
@WriteOnly
325
void delete(byte[] key);
326
327
// Batch operations
328
@ReadOnly
329
Map<byte[], byte[]> readAll(byte[][] keys);
330
331
@WriteOnly
332
void writeAll(Map<byte[], byte[]> entries);
333
334
@WriteOnly
335
void deleteAll(byte[][] keys);
336
}
337
338
// Key-Value pair representation
339
public class KeyValue<K, V> {
340
public KeyValue(K key, V value) { /* constructor */ }
341
public K getKey() { /* returns key */ }
342
public V getValue() { /* returns value */ }
343
}
344
345
// Usage example
346
public class ConfigurationStore {
347
private KeyValueTable configTable;
348
349
public void storeConfiguration(String key, String value) {
350
configTable.write(key, value);
351
}
352
353
public String getConfiguration(String key) {
354
byte[] value = configTable.read(key);
355
return value != null ? Bytes.toString(value) : null;
356
}
357
358
public void updateCounter(String counterName, long increment) {
359
configTable.increment(counterName, increment);
360
}
361
362
public Map<String, String> getAllConfigurations(String[] keys) {
363
byte[][] keyBytes = Arrays.stream(keys)
364
.map(Bytes::toBytes)
365
.toArray(byte[][]::new);
366
367
Map<byte[], byte[]> results = configTable.readAll(keyBytes);
368
369
return results.entrySet().stream()
370
.collect(Collectors.toMap(
371
entry -> Bytes.toString(entry.getKey()),
372
entry -> Bytes.toString(entry.getValue())
373
));
374
}
375
}
376
```
377
378
## File-Based Datasets
379
380
CDAP provides several file-based dataset types for working with HDFS and other file systems:
381
382
### FileSet Dataset
383
384
```java { .api }
385
import io.cdap.cdap.api.dataset.lib.*;
386
import org.apache.hadoop.fs.Path;
387
import java.io.IOException;
388
389
// FileSet interface for file-based operations
390
public interface FileSet extends Dataset, BatchReadable<Void, Location>, BatchWritable<Void, Location> {
391
392
String TYPE = "fileSet";
393
394
// File operations
395
Location getLocation(String relativePath) throws IOException;
396
Location getBaseLocation() throws IOException;
397
398
// Input/Output format configuration
399
Map<String, String> getInputFormatConfiguration();
400
Map<String, String> getOutputFormatConfiguration();
401
402
// Runtime arguments access
403
Map<String, String> getRuntimeArguments();
404
}
405
406
// File location abstraction
407
public interface Location {
408
String getName();
409
URI toURI();
410
boolean exists() throws IOException;
411
boolean isDirectory() throws IOException;
412
long lastModified() throws IOException;
413
long length() throws IOException;
414
415
// Stream operations
416
InputStream getInputStream() throws IOException;
417
OutputStream getOutputStream() throws IOException;
418
OutputStream getOutputStream(String permission) throws IOException;
419
420
// Directory operations
421
boolean mkdirs() throws IOException;
422
List<Location> list() throws IOException;
423
boolean delete() throws IOException;
424
boolean delete(boolean recursive) throws IOException;
425
426
// Path operations
427
Location append(String child) throws IOException;
428
Location append(Path child) throws IOException;
429
}
430
431
// FileSet properties and arguments
432
public final class FileSetProperties {
433
public static final String INPUT_FORMAT = "input.format";
434
public static final String OUTPUT_FORMAT = "output.format";
435
public static final String INPUT_PROPERTIES_PREFIX = "input.properties.";
436
public static final String OUTPUT_PROPERTIES_PREFIX = "output.properties.";
437
438
public static Builder builder() { return new Builder(); }
439
440
public static class Builder {
441
public Builder setInputFormat(Class<? extends InputFormat> inputFormat) { /* set input format */ return this; }
442
public Builder setOutputFormat(Class<? extends OutputFormat> outputFormat) { /* set output format */ return this; }
443
public Builder setInputProperty(String key, String value) { /* set input property */ return this; }
444
public Builder setOutputProperty(String key, String value) { /* set output property */ return this; }
445
public DatasetProperties build() { /* build properties */ }
446
}
447
}
448
```
449
450
### PartitionedFileSet Dataset
451
452
```java { .api }
453
// Partitioned FileSet for organizing files by partitions
454
public interface PartitionedFileSet extends Dataset,
455
BatchReadable<PartitionKey, PartitionDetail>,
456
BatchWritable<PartitionKey, PartitionOutput> {
457
458
String TYPE = "partitionedFileSet";
459
460
// Partition operations
461
PartitionDetail getPartition(PartitionKey key);
462
Set<PartitionDetail> getPartitions(PartitionFilter filter);
463
void addPartition(PartitionKey key, String path);
464
void addPartition(PartitionKey key, String path, Map<String, String> metadata);
465
void dropPartition(PartitionKey key);
466
467
// Output operations
468
PartitionOutput getPartitionOutput(PartitionKey key);
469
Location getLocation(PartitionKey key);
470
471
// FileSet operations
472
FileSet getEmbeddedFileSet();
473
}
474
475
// Partition key for organizing data
476
public class PartitionKey {
477
public static Builder builder() { return new Builder(); }
478
public Map<String, Comparable<?>> getFields() { /* returns partition fields */ }
479
480
public static class Builder {
481
public Builder addField(String name, Comparable<?> value) { /* add partition field */ return this; }
482
public Builder addStringField(String name, String value) { /* add string field */ return this; }
483
public Builder addIntField(String name, int value) { /* add int field */ return this; }
484
public Builder addLongField(String name, long value) { /* add long field */ return this; }
485
public PartitionKey build() { /* build partition key */ }
486
}
487
}
488
489
// Partition metadata and details
490
public interface PartitionDetail {
491
PartitionKey getPartitionKey();
492
String getRelativePath();
493
Location getLocation() throws IOException;
494
Map<String, String> getMetadata();
495
long getLastModified();
496
}
497
498
// Partitioning strategy
499
public abstract class Partitioning {
500
public static Builder builder() { return new Builder(); }
501
502
public static class Builder {
503
public Builder addField(String name, Partitioning.FieldType type) { /* add field */ return this; }
504
public Builder addStringField(String name) { /* add string field */ return this; }
505
public Builder addIntField(String name) { /* add int field */ return this; }
506
public Builder addLongField(String name) { /* add long field */ return this; }
507
public Partitioning build() { /* build partitioning */ }
508
}
509
510
public enum FieldType {
511
STRING, INT, LONG
512
}
513
}
514
```
515
516
### Time-Partitioned FileSet
517
518
```java { .api }
519
// Time-based partitioning for time-series data
520
public interface TimePartitionedFileSet extends Dataset,
521
BatchReadable<Long, TimePartitionDetail>,
522
BatchWritable<Long, TimePartitionOutput> {
523
524
String TYPE = "timePartitionedFileSet";
525
526
// Time partition operations
527
TimePartitionDetail getPartitionByTime(long time);
528
Set<TimePartitionDetail> getPartitionsByTime(long startTime, long endTime);
529
TimePartitionOutput getPartitionOutput(long time);
530
531
// Partition management
532
void addPartition(long time, String path);
533
void addPartition(long time, String path, Map<String, String> metadata);
534
void dropPartition(long time);
535
536
// FileSet operations
537
PartitionedFileSet getEmbeddedFileSet();
538
}
539
540
// Time partition representation
541
public interface TimePartition {
542
long getTime();
543
String getRelativePath();
544
Location getLocation() throws IOException;
545
}
546
547
// Usage example for ETL processing
548
public class DailyETLWorkflow extends AbstractWorkflow {
549
550
@Override
551
public void configure(WorkflowConfigurer configurer) {
552
configurer.setName("DailyETLWorkflow");
553
554
// Add time-partitioned datasets for daily processing
555
configurer.addAction(new DataIngestionAction());
556
configurer.addMapReduce("DataTransformationMapReduce");
557
configurer.addAction(new PartitionCleanupAction());
558
}
559
560
public static class DataIngestionAction extends AbstractCustomAction {
561
562
@Override
563
public void run(CustomActionContext context) throws Exception {
564
TimePartitionedFileSet rawData = context.getDataset("raw_data");
565
566
// Get today's partition
567
long today = DateUtils.truncateToDay(System.currentTimeMillis());
568
TimePartitionOutput output = rawData.getPartitionOutput(today);
569
570
// Ingest data for today's partition
571
Location outputLocation = output.getLocation();
572
try (OutputStream os = outputLocation.getOutputStream()) {
573
// Write ingested data to partition
574
ingestDailyData(os);
575
}
576
577
// Add partition with metadata
578
Map<String, String> metadata = new HashMap<>();
579
metadata.put("ingestion.timestamp", String.valueOf(System.currentTimeMillis()));
580
metadata.put("source", "daily-feed");
581
582
output.addPartition(metadata);
583
}
584
585
private void ingestDailyData(OutputStream outputStream) throws IOException {
586
// Implementation for data ingestion
587
}
588
}
589
}
590
```
591
592
## Messaging System
593
594
CDAP provides a transactional messaging system for reliable message passing and stream processing:
595
596
### Messaging Interfaces
597
598
```java { .api }
599
import io.cdap.cdap.api.messaging.*;
600
import io.cdap.cdap.api.*;
601
import java.nio.charset.StandardCharsets;
602
603
// Message publisher interface
604
@Beta
605
public interface MessagePublisher {
606
607
// Publish single message
608
void publish(String namespace, String topic, String payload) throws TopicNotFoundException, IOException, AccessException;
609
void publish(String namespace, String topic, String payload, Charset charset) throws TopicNotFoundException, IOException, AccessException;
610
void publish(String namespace, String topic, byte[] payload) throws TopicNotFoundException, IOException, AccessException;
611
612
// Publish multiple messages
613
void publish(String namespace, String topic, String charset, String... payloads) throws TopicNotFoundException, IOException, AccessException;
614
void publish(String namespace, String topic, Charset charset, String... payloads) throws TopicNotFoundException, IOException, AccessException;
615
void publish(String namespace, String topic, byte[]... payloads) throws TopicNotFoundException, IOException, AccessException;
616
void publish(String namespace, String topic, Iterator<byte[]> payloads) throws TopicNotFoundException, IOException, AccessException;
617
}
618
619
// Message fetcher interface
620
@Beta
621
public interface MessageFetcher {
622
623
// Fetch messages with limit
624
CloseableIterator<Message> fetch(String namespace, String topic, int limit, long afterMessageId)
625
throws TopicNotFoundException, IOException, AccessException;
626
627
// Fetch messages with time range
628
CloseableIterator<Message> fetch(String namespace, String topic, int limit, long startTime, long endTime)
629
throws TopicNotFoundException, IOException, AccessException;
630
}
631
632
// Message representation
633
public interface Message {
634
String getId();
635
byte[] getPayload();
636
long getPublishTimestamp();
637
Map<String, String> getHeaders();
638
}
639
640
// Messaging administration
641
@Beta
642
public interface MessagingAdmin {
643
void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException, AccessException;
644
void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException, AccessException;
645
void deleteTopic(String namespace, String topic) throws TopicNotFoundException, IOException, AccessException;
646
List<TopicMetadata> listTopics(String namespace) throws IOException, AccessException;
647
TopicMetadata getTopic(String namespace, String topic) throws TopicNotFoundException, IOException, AccessException;
648
}
649
650
// Topic metadata
651
public class TopicMetadata {
652
public static Builder builder(String topic) { return new Builder(topic); }
653
654
public String getTopic() { /* returns topic name */ }
655
public String getNamespace() { /* returns namespace */ }
656
public Map<String, String> getProperties() { /* returns properties */ }
657
public int getGeneration() { /* returns generation */ }
658
659
public static class Builder {
660
public Builder setNamespace(String namespace) { /* set namespace */ return this; }
661
public Builder setDescription(String description) { /* set description */ return this; }
662
public Builder setProperty(String key, String value) { /* set property */ return this; }
663
public Builder setProperties(Map<String, String> properties) { /* set properties */ return this; }
664
public TopicMetadata build() { /* build metadata */ }
665
}
666
}
667
```
668
669
### Messaging Context and Usage
670
671
```java { .api }
672
// Messaging context for accessing messaging APIs
673
public interface MessagingContext {
674
MessagePublisher getMessagePublisher();
675
MessageFetcher getMessageFetcher();
676
MessagingAdmin getMessagingAdmin();
677
}
678
679
// Usage in worker programs
680
public class MessageProcessingWorker extends AbstractWorker {
681
682
@Override
683
public void configure(WorkerConfigurer configurer) {
684
configurer.setName("MessageProcessor");
685
configurer.setDescription("Processes messages from topic");
686
}
687
688
@Override
689
public void run() throws Exception {
690
WorkerContext context = getContext();
691
MessagingContext messagingContext = context.getMessagingContext();
692
693
MessageFetcher fetcher = messagingContext.getMessageFetcher();
694
MessagePublisher publisher = messagingContext.getMessagePublisher();
695
696
String namespace = context.getNamespace();
697
long lastProcessedId = getLastProcessedMessageId();
698
699
while (context.getState().equals(ProgramRunStatus.RUNNING)) {
700
try (CloseableIterator<Message> messages =
701
fetcher.fetch(namespace, "input-topic", 100, lastProcessedId)) {
702
703
while (messages.hasNext()) {
704
Message message = messages.next();
705
706
// Process message
707
ProcessedMessage processed = processMessage(message);
708
709
// Publish result
710
if (processed != null) {
711
publisher.publish(namespace, "output-topic", processed.toJson());
712
}
713
714
lastProcessedId = Long.parseLong(message.getId());
715
}
716
}
717
718
// Save checkpoint
719
saveLastProcessedMessageId(lastProcessedId);
720
721
// Sleep before next fetch
722
Thread.sleep(1000);
723
}
724
}
725
726
private ProcessedMessage processMessage(Message message) {
727
// Implementation for message processing
728
return new ProcessedMessage(new String(message.getPayload(), StandardCharsets.UTF_8));
729
}
730
731
private long getLastProcessedMessageId() {
732
// Implementation to retrieve last processed message ID
733
return 0L;
734
}
735
736
private void saveLastProcessedMessageId(long messageId) {
737
// Implementation to save checkpoint
738
}
739
}
740
741
// Usage in MapReduce for batch message processing
742
public class MessageBatchProcessor extends AbstractMapReduce {
743
744
@Override
745
public void initialize(MapReduceContext context) throws Exception {
746
Job job = context.getHadoopJob();
747
748
// Configure to read from messaging system
749
MessagingUtils.configureInput(job, context.getNamespace(), "batch-topic");
750
751
// Configure output dataset
752
context.setOutput(Output.ofDataset("processed_messages"));
753
754
job.setMapperClass(MessageMapper.class);
755
job.setReducerClass(MessageAggregator.class);
756
}
757
758
public static class MessageMapper extends Mapper<LongWritable, Message, Text, IntWritable> {
759
760
@Override
761
protected void map(LongWritable key, Message message, Context context)
762
throws IOException, InterruptedException {
763
764
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
765
JsonObject json = new JsonParser().parse(payload).getAsJsonObject();
766
767
String eventType = json.get("eventType").getAsString();
768
context.write(new Text(eventType), new IntWritable(1));
769
}
770
}
771
772
public static class MessageAggregator extends Reducer<Text, IntWritable, byte[], Put> {
773
774
@Override
775
protected void reduce(Text eventType, Iterable<IntWritable> counts, Context context)
776
throws IOException, InterruptedException {
777
778
int total = 0;
779
for (IntWritable count : counts) {
780
total += count.get();
781
}
782
783
Put put = new Put(Bytes.toBytes(eventType.toString()));
784
put.add("stats", "count", total);
785
put.add("stats", "timestamp", System.currentTimeMillis());
786
787
context.write(Bytes.toBytes(eventType.toString()), put);
788
}
789
}
790
}
791
```
792
793
## Advanced Dataset Patterns
794
795
### Custom Dataset Implementation
796
797
```java { .api }
798
// Custom dataset definition
799
public abstract class AbstractDatasetDefinition<T extends Dataset>
800
implements DatasetDefinition<T> {
801
802
private final String name;
803
804
public AbstractDatasetDefinition(String name) {
805
this.name = name;
806
}
807
808
@Override
809
public String getName() {
810
return name;
811
}
812
813
@Override
814
public abstract DatasetSpecification configure(String instanceName, DatasetProperties properties);
815
816
@Override
817
public abstract T getDataset(DatasetContext datasetContext, DatasetSpecification spec,
818
Map<String, String> arguments, ClassLoader classLoader) throws IOException;
819
}
820
821
// Dataset state persistence
822
public interface DatasetStatePersistor {
823
void persistState(String name, byte[] state) throws IOException;
824
byte[] readState(String name) throws IOException;
825
void deleteState(String name) throws IOException;
826
}
827
828
// Composite dataset for combining multiple datasets
829
public abstract class CompositeDatasetDefinition<T extends Dataset>
830
extends AbstractDatasetDefinition<T> {
831
832
protected CompositeDatasetDefinition(String name) {
833
super(name);
834
}
835
836
// Methods for managing constituent datasets
837
protected abstract Map<String, DatasetSpecification> getConstituentDatasets(DatasetProperties properties);
838
}
839
```
840
841
The CDAP data management framework provides a comprehensive, abstracted approach to data storage and access, enabling applications to work with various storage systems through consistent APIs while maintaining enterprise-grade features like transactions, security, and operational control.