or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-framework.mddata-management.mddata-processing.mdindex.mdoperational.mdplugin-system.mdsecurity-metadata.md

data-management.mddocs/

0

# Data Management

1

2

CDAP provides a comprehensive data management framework with support for various dataset types, messaging systems, and data access patterns. The framework abstracts underlying storage technologies while providing consistent APIs for data operations across different storage systems.

3

4

## Dataset Framework

5

6

The dataset framework is the foundation for data storage and access in CDAP, providing a unified abstraction layer over different storage systems.

7

8

### Core Dataset Interfaces

9

10

```java { .api }

11

import io.cdap.cdap.api.dataset.*;

12

import io.cdap.cdap.api.dataset.table.*;

13

14

// Base dataset interface

15

public interface Dataset {

16

void close() throws IOException;

17

}

18

19

// Dataset context for accessing datasets

20

public interface DatasetContext {

21

<T extends Dataset> T getDataset(String name) throws DataSetException;

22

<T extends Dataset> T getDataset(String namespace, String name) throws DataSetException;

23

void releaseDataset(Dataset dataset);

24

void discardDataset(Dataset dataset);

25

}

26

27

// Dataset management interface

28

public interface DatasetManager {

29

boolean datasetExists(String name) throws DataSetException;

30

DatasetProperties getDatasetProperties(String name) throws DataSetException;

31

void createDataset(String name, String type, DatasetProperties properties) throws DataSetException;

32

void updateDataset(String name, DatasetProperties properties) throws DataSetException;

33

void dropDataset(String name) throws DataSetException;

34

void truncateDataset(String name) throws DataSetException;

35

}

36

37

// Dataset configurer for application setup

38

public interface DatasetConfigurer {

39

void createDataset(String datasetName, String typeName, DatasetProperties properties);

40

void createDataset(String datasetName, String typeName);

41

void createDataset(String datasetName, Class<? extends Dataset> datasetClass, DatasetProperties props);

42

void createDataset(String datasetName, Class<? extends Dataset> datasetClass);

43

void addDatasetModule(String moduleName, Class<? extends DatasetModule> moduleClass);

44

void addDatasetType(Class<? extends Dataset> datasetClass);

45

}

46

```

47

48

### Dataset Properties and Configuration

49

50

```java { .api }

51

// Dataset properties container

52

public class DatasetProperties {

53

public static Builder builder() { return new Builder(); }

54

public Map<String, String> getProperties() { /* returns properties map */ }

55

56

public static class Builder {

57

public Builder add(String key, String value) { /* add property */ return this; }

58

public Builder addAll(Map<String, String> properties) { /* add all properties */ return this; }

59

public DatasetProperties build() { /* build properties */ }

60

}

61

}

62

63

// Dataset specification

64

public final class DatasetSpecification {

65

public String getName() { /* returns dataset name */ }

66

public String getType() { /* returns dataset type */ }

67

public Map<String, String> getProperties() { /* returns properties */ }

68

public Map<String, DatasetSpecification> getSpecifications() { /* returns nested specs */ }

69

}

70

71

// Dataset instantiation exception

72

public class DatasetInstantiationException extends RuntimeException {

73

public DatasetInstantiationException(String message) { super(message); }

74

public DatasetInstantiationException(String message, Throwable cause) { super(message, cause); }

75

}

76

```

77

78

## Table Dataset

79

80

The Table dataset provides a flexible, schema-free NoSQL storage abstraction with support for complex queries and batch operations.

81

82

### Table Interface and Operations

83

84

```java { .api }

85

import io.cdap.cdap.api.dataset.table.*;

86

import io.cdap.cdap.api.data.batch.*;

87

88

// Table interface - core NoSQL storage

89

@Deprecated // Note: table based datasets will be removed in a future version

90

public interface Table extends BatchReadable<byte[], Row>, BatchWritable<byte[], Put>,

91

Dataset, RecordScannable<StructuredRecord>, RecordWritable<StructuredRecord> {

92

93

String TYPE = "table";

94

95

// Table properties

96

String PROPERTY_TTL = "dataset.table.ttl";

97

String PROPERTY_READLESS_INCREMENT = "dataset.table.readless.increment";

98

String PROPERTY_CONFLICT_DETECTION = "dataset.table.conflict.detection";

99

100

// Basic operations

101

Row get(Get get);

102

Row get(byte[] row);

103

Row get(byte[] row, byte[][] columns);

104

Row get(byte[] row, byte[] startColumn, byte[] stopColumn, int limit);

105

106

void put(Put put);

107

void put(byte[] row, byte[] column, byte[] value);

108

void put(byte[] row, byte[][] columns, byte[][] values);

109

110

boolean delete(Delete delete);

111

void delete(byte[] row);

112

void delete(byte[] row, byte[] column);

113

void delete(byte[] row, byte[][] columns);

114

115

// Scanning operations

116

Scanner scan(Scan scan);

117

Scanner scan(byte[] startRow, byte[] stopRow);

118

119

// Increment operations

120

Row increment(Increment increment);

121

long increment(byte[] row, byte[] column, long amount);

122

Row increment(byte[] row, byte[][] columns, long[] amounts);

123

124

// Batch operations

125

void write(byte[] key, Put value) throws IOException;

126

127

// Compare and swap operations

128

boolean compareAndSwap(byte[] row, byte[] column, byte[] expectedValue, byte[] newValue);

129

}

130

131

// Row representation

132

public interface Row {

133

byte[] getRow();

134

135

// Column access

136

boolean isEmpty();

137

int size();

138

Map<byte[], byte[]> getColumns();

139

byte[] get(byte[] column);

140

byte[] get(String column);

141

142

// Typed access methods

143

Boolean getBoolean(byte[] column);

144

Boolean getBoolean(String column);

145

Integer getInt(byte[] column);

146

Integer getInt(String column);

147

Long getLong(byte[] column);

148

Long getLong(String column);

149

Double getDouble(byte[] column);

150

Double getDouble(String column);

151

String getString(byte[] column);

152

String getString(String column);

153

}

154

```

155

156

### Table Operations Examples

157

158

```java { .api }

159

// Basic table operations

160

public class UserProfileService extends AbstractHttpServiceHandler {

161

162

@UseDataSet("user_profiles")

163

private Table userProfiles;

164

165

@GET

166

@Path("/user/{id}")

167

public void getUser(HttpServiceRequest request, HttpServiceResponder responder,

168

@PathParam("id") String userId) {

169

try {

170

Row row = userProfiles.get(Bytes.toBytes(userId));

171

if (row.isEmpty()) {

172

responder.sendError(404, "User not found");

173

return;

174

}

175

176

// Build user profile JSON

177

JsonObject profile = new JsonObject();

178

profile.addProperty("id", userId);

179

profile.addProperty("name", row.getString("name"));

180

profile.addProperty("email", row.getString("email"));

181

profile.addProperty("created", row.getLong("created"));

182

profile.addProperty("lastLogin", row.getLong("lastLogin"));

183

184

responder.sendJson(200, profile);

185

} catch (Exception e) {

186

responder.sendError(500, "Error retrieving user: " + e.getMessage());

187

}

188

}

189

190

@POST

191

@Path("/user")

192

public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {

193

try {

194

String content = Charset.forName("UTF-8").decode(

195

ByteBuffer.wrap(request.getContent())).toString();

196

JsonObject userJson = new JsonParser().parse(content).getAsJsonObject();

197

198

String userId = userJson.get("id").getAsString();

199

String name = userJson.get("name").getAsString();

200

String email = userJson.get("email").getAsString();

201

202

// Create put operation

203

Put put = new Put(Bytes.toBytes(userId));

204

put.add("name", name);

205

put.add("email", email);

206

put.add("created", System.currentTimeMillis());

207

put.add("lastLogin", 0L);

208

put.add("status", "active");

209

210

userProfiles.put(put);

211

responder.sendString(201, "User created successfully", "text/plain");

212

213

} catch (Exception e) {

214

responder.sendError(400, "Error creating user: " + e.getMessage());

215

}

216

}

217

218

@PUT

219

@Path("/user/{id}/login")

220

public void recordLogin(HttpServiceRequest request, HttpServiceResponder responder,

221

@PathParam("id") String userId) {

222

try {

223

// Use increment for login count and update last login time

224

userProfiles.increment(Bytes.toBytes(userId), Bytes.toBytes("loginCount"), 1L);

225

userProfiles.put(Bytes.toBytes(userId), Bytes.toBytes("lastLogin"),

226

Bytes.toBytes(System.currentTimeMillis()));

227

228

responder.sendString(200, "Login recorded", "text/plain");

229

} catch (Exception e) {

230

responder.sendError(500, "Error recording login: " + e.getMessage());

231

}

232

}

233

}

234

235

// Complex table scanning and filtering

236

public class UserAnalyticsMapReduce extends AbstractMapReduce {

237

238

public static class UserStatsMapper extends Mapper<byte[], Row, Text, UserStats> {

239

240

@Override

241

protected void map(byte[] key, Row row, Context context)

242

throws IOException, InterruptedException {

243

244

String userId = Bytes.toString(key);

245

String status = row.getString("status");

246

Long created = row.getLong("created");

247

Long lastLogin = row.getLong("lastLogin");

248

Long loginCount = row.getLong("loginCount");

249

250

if (status != null && status.equals("active")) {

251

UserStats stats = new UserStats();

252

stats.setUserId(userId);

253

stats.setCreated(created != null ? created : 0L);

254

stats.setLastLogin(lastLogin != null ? lastLogin : 0L);

255

stats.setLoginCount(loginCount != null ? loginCount : 0L);

256

257

// Calculate activity metrics

258

long daysSinceCreation = (System.currentTimeMillis() - stats.getCreated()) / (24 * 60 * 60 * 1000);

259

long daysSinceLogin = (System.currentTimeMillis() - stats.getLastLogin()) / (24 * 60 * 60 * 1000);

260

261

stats.setDaysSinceCreation(daysSinceCreation);

262

stats.setDaysSinceLogin(daysSinceLogin);

263

264

// Categorize user activity level

265

String activityLevel;

266

if (daysSinceLogin <= 7) {

267

activityLevel = "highly_active";

268

} else if (daysSinceLogin <= 30) {

269

activityLevel = "moderately_active";

270

} else if (daysSinceLogin <= 90) {

271

activityLevel = "low_activity";

272

} else {

273

activityLevel = "inactive";

274

}

275

276

context.write(new Text(activityLevel), stats);

277

}

278

}

279

}

280

}

281

```

282

283

## Key-Value Table

284

285

A simplified key-value storage interface built on top of Table:

286

287

```java { .api }

288

import io.cdap.cdap.api.dataset.lib.*;

289

290

// Key-Value table interface

291

@Deprecated // table based datasets will be removed in a future version

292

public interface KeyValueTable extends BatchReadable<byte[], KeyValue<byte[], byte[]>>,

293

BatchWritable<byte[], byte[]>, Dataset {

294

295

String TYPE = "keyValueTable";

296

297

// Basic operations

298

@ReadOnly

299

@Nullable

300

byte[] read(String key);

301

302

@ReadOnly

303

@Nullable

304

byte[] read(byte[] key);

305

306

@WriteOnly

307

void write(String key, String value);

308

309

@WriteOnly

310

void write(String key, byte[] value);

311

312

@WriteOnly

313

void write(byte[] key, byte[] value);

314

315

@WriteOnly

316

void increment(byte[] key, long amount);

317

318

@WriteOnly

319

void increment(String key, long amount);

320

321

@WriteOnly

322

void delete(String key);

323

324

@WriteOnly

325

void delete(byte[] key);

326

327

// Batch operations

328

@ReadOnly

329

Map<byte[], byte[]> readAll(byte[][] keys);

330

331

@WriteOnly

332

void writeAll(Map<byte[], byte[]> entries);

333

334

@WriteOnly

335

void deleteAll(byte[][] keys);

336

}

337

338

// Key-Value pair representation

339

public class KeyValue<K, V> {

340

public KeyValue(K key, V value) { /* constructor */ }

341

public K getKey() { /* returns key */ }

342

public V getValue() { /* returns value */ }

343

}

344

345

// Usage example

346

public class ConfigurationStore {

347

private KeyValueTable configTable;

348

349

public void storeConfiguration(String key, String value) {

350

configTable.write(key, value);

351

}

352

353

public String getConfiguration(String key) {

354

byte[] value = configTable.read(key);

355

return value != null ? Bytes.toString(value) : null;

356

}

357

358

public void updateCounter(String counterName, long increment) {

359

configTable.increment(counterName, increment);

360

}

361

362

public Map<String, String> getAllConfigurations(String[] keys) {

363

byte[][] keyBytes = Arrays.stream(keys)

364

.map(Bytes::toBytes)

365

.toArray(byte[][]::new);

366

367

Map<byte[], byte[]> results = configTable.readAll(keyBytes);

368

369

return results.entrySet().stream()

370

.collect(Collectors.toMap(

371

entry -> Bytes.toString(entry.getKey()),

372

entry -> Bytes.toString(entry.getValue())

373

));

374

}

375

}

376

```

377

378

## File-Based Datasets

379

380

CDAP provides several file-based dataset types for working with HDFS and other file systems:

381

382

### FileSet Dataset

383

384

```java { .api }

385

import io.cdap.cdap.api.dataset.lib.*;

386

import org.apache.hadoop.fs.Path;

387

import java.io.IOException;

388

389

// FileSet interface for file-based operations

390

public interface FileSet extends Dataset, BatchReadable<Void, Location>, BatchWritable<Void, Location> {

391

392

String TYPE = "fileSet";

393

394

// File operations

395

Location getLocation(String relativePath) throws IOException;

396

Location getBaseLocation() throws IOException;

397

398

// Input/Output format configuration

399

Map<String, String> getInputFormatConfiguration();

400

Map<String, String> getOutputFormatConfiguration();

401

402

// Runtime arguments access

403

Map<String, String> getRuntimeArguments();

404

}

405

406

// File location abstraction

407

public interface Location {

408

String getName();

409

URI toURI();

410

boolean exists() throws IOException;

411

boolean isDirectory() throws IOException;

412

long lastModified() throws IOException;

413

long length() throws IOException;

414

415

// Stream operations

416

InputStream getInputStream() throws IOException;

417

OutputStream getOutputStream() throws IOException;

418

OutputStream getOutputStream(String permission) throws IOException;

419

420

// Directory operations

421

boolean mkdirs() throws IOException;

422

List<Location> list() throws IOException;

423

boolean delete() throws IOException;

424

boolean delete(boolean recursive) throws IOException;

425

426

// Path operations

427

Location append(String child) throws IOException;

428

Location append(Path child) throws IOException;

429

}

430

431

// FileSet properties and arguments

432

public final class FileSetProperties {

433

public static final String INPUT_FORMAT = "input.format";

434

public static final String OUTPUT_FORMAT = "output.format";

435

public static final String INPUT_PROPERTIES_PREFIX = "input.properties.";

436

public static final String OUTPUT_PROPERTIES_PREFIX = "output.properties.";

437

438

public static Builder builder() { return new Builder(); }

439

440

public static class Builder {

441

public Builder setInputFormat(Class<? extends InputFormat> inputFormat) { /* set input format */ return this; }

442

public Builder setOutputFormat(Class<? extends OutputFormat> outputFormat) { /* set output format */ return this; }

443

public Builder setInputProperty(String key, String value) { /* set input property */ return this; }

444

public Builder setOutputProperty(String key, String value) { /* set output property */ return this; }

445

public DatasetProperties build() { /* build properties */ }

446

}

447

}

448

```

449

450

### PartitionedFileSet Dataset

451

452

```java { .api }

453

// Partitioned FileSet for organizing files by partitions

454

public interface PartitionedFileSet extends Dataset,

455

BatchReadable<PartitionKey, PartitionDetail>,

456

BatchWritable<PartitionKey, PartitionOutput> {

457

458

String TYPE = "partitionedFileSet";

459

460

// Partition operations

461

PartitionDetail getPartition(PartitionKey key);

462

Set<PartitionDetail> getPartitions(PartitionFilter filter);

463

void addPartition(PartitionKey key, String path);

464

void addPartition(PartitionKey key, String path, Map<String, String> metadata);

465

void dropPartition(PartitionKey key);

466

467

// Output operations

468

PartitionOutput getPartitionOutput(PartitionKey key);

469

Location getLocation(PartitionKey key);

470

471

// FileSet operations

472

FileSet getEmbeddedFileSet();

473

}

474

475

// Partition key for organizing data

476

public class PartitionKey {

477

public static Builder builder() { return new Builder(); }

478

public Map<String, Comparable<?>> getFields() { /* returns partition fields */ }

479

480

public static class Builder {

481

public Builder addField(String name, Comparable<?> value) { /* add partition field */ return this; }

482

public Builder addStringField(String name, String value) { /* add string field */ return this; }

483

public Builder addIntField(String name, int value) { /* add int field */ return this; }

484

public Builder addLongField(String name, long value) { /* add long field */ return this; }

485

public PartitionKey build() { /* build partition key */ }

486

}

487

}

488

489

// Partition metadata and details

490

public interface PartitionDetail {

491

PartitionKey getPartitionKey();

492

String getRelativePath();

493

Location getLocation() throws IOException;

494

Map<String, String> getMetadata();

495

long getLastModified();

496

}

497

498

// Partitioning strategy

499

public abstract class Partitioning {

500

public static Builder builder() { return new Builder(); }

501

502

public static class Builder {

503

public Builder addField(String name, Partitioning.FieldType type) { /* add field */ return this; }

504

public Builder addStringField(String name) { /* add string field */ return this; }

505

public Builder addIntField(String name) { /* add int field */ return this; }

506

public Builder addLongField(String name) { /* add long field */ return this; }

507

public Partitioning build() { /* build partitioning */ }

508

}

509

510

public enum FieldType {

511

STRING, INT, LONG

512

}

513

}

514

```

515

516

### Time-Partitioned FileSet

517

518

```java { .api }

519

// Time-based partitioning for time-series data

520

public interface TimePartitionedFileSet extends Dataset,

521

BatchReadable<Long, TimePartitionDetail>,

522

BatchWritable<Long, TimePartitionOutput> {

523

524

String TYPE = "timePartitionedFileSet";

525

526

// Time partition operations

527

TimePartitionDetail getPartitionByTime(long time);

528

Set<TimePartitionDetail> getPartitionsByTime(long startTime, long endTime);

529

TimePartitionOutput getPartitionOutput(long time);

530

531

// Partition management

532

void addPartition(long time, String path);

533

void addPartition(long time, String path, Map<String, String> metadata);

534

void dropPartition(long time);

535

536

// FileSet operations

537

PartitionedFileSet getEmbeddedFileSet();

538

}

539

540

// Time partition representation

541

public interface TimePartition {

542

long getTime();

543

String getRelativePath();

544

Location getLocation() throws IOException;

545

}

546

547

// Usage example for ETL processing

548

public class DailyETLWorkflow extends AbstractWorkflow {

549

550

@Override

551

public void configure(WorkflowConfigurer configurer) {

552

configurer.setName("DailyETLWorkflow");

553

554

// Add time-partitioned datasets for daily processing

555

configurer.addAction(new DataIngestionAction());

556

configurer.addMapReduce("DataTransformationMapReduce");

557

configurer.addAction(new PartitionCleanupAction());

558

}

559

560

public static class DataIngestionAction extends AbstractCustomAction {

561

562

@Override

563

public void run(CustomActionContext context) throws Exception {

564

TimePartitionedFileSet rawData = context.getDataset("raw_data");

565

566

// Get today's partition

567

long today = DateUtils.truncateToDay(System.currentTimeMillis());

568

TimePartitionOutput output = rawData.getPartitionOutput(today);

569

570

// Ingest data for today's partition

571

Location outputLocation = output.getLocation();

572

try (OutputStream os = outputLocation.getOutputStream()) {

573

// Write ingested data to partition

574

ingestDailyData(os);

575

}

576

577

// Add partition with metadata

578

Map<String, String> metadata = new HashMap<>();

579

metadata.put("ingestion.timestamp", String.valueOf(System.currentTimeMillis()));

580

metadata.put("source", "daily-feed");

581

582

output.addPartition(metadata);

583

}

584

585

private void ingestDailyData(OutputStream outputStream) throws IOException {

586

// Implementation for data ingestion

587

}

588

}

589

}

590

```

591

592

## Messaging System

593

594

CDAP provides a transactional messaging system for reliable message passing and stream processing:

595

596

### Messaging Interfaces

597

598

```java { .api }

599

import io.cdap.cdap.api.messaging.*;

600

import io.cdap.cdap.api.*;

601

import java.nio.charset.StandardCharsets;

602

603

// Message publisher interface

604

@Beta

605

public interface MessagePublisher {

606

607

// Publish single message

608

void publish(String namespace, String topic, String payload) throws TopicNotFoundException, IOException, AccessException;

609

void publish(String namespace, String topic, String payload, Charset charset) throws TopicNotFoundException, IOException, AccessException;

610

void publish(String namespace, String topic, byte[] payload) throws TopicNotFoundException, IOException, AccessException;

611

612

// Publish multiple messages

613

void publish(String namespace, String topic, String charset, String... payloads) throws TopicNotFoundException, IOException, AccessException;

614

void publish(String namespace, String topic, Charset charset, String... payloads) throws TopicNotFoundException, IOException, AccessException;

615

void publish(String namespace, String topic, byte[]... payloads) throws TopicNotFoundException, IOException, AccessException;

616

void publish(String namespace, String topic, Iterator<byte[]> payloads) throws TopicNotFoundException, IOException, AccessException;

617

}

618

619

// Message fetcher interface

620

@Beta

621

public interface MessageFetcher {

622

623

// Fetch messages with limit

624

CloseableIterator<Message> fetch(String namespace, String topic, int limit, long afterMessageId)

625

throws TopicNotFoundException, IOException, AccessException;

626

627

// Fetch messages with time range

628

CloseableIterator<Message> fetch(String namespace, String topic, int limit, long startTime, long endTime)

629

throws TopicNotFoundException, IOException, AccessException;

630

}

631

632

// Message representation

633

public interface Message {

634

String getId();

635

byte[] getPayload();

636

long getPublishTimestamp();

637

Map<String, String> getHeaders();

638

}

639

640

// Messaging administration

641

@Beta

642

public interface MessagingAdmin {

643

void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException, AccessException;

644

void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException, AccessException;

645

void deleteTopic(String namespace, String topic) throws TopicNotFoundException, IOException, AccessException;

646

List<TopicMetadata> listTopics(String namespace) throws IOException, AccessException;

647

TopicMetadata getTopic(String namespace, String topic) throws TopicNotFoundException, IOException, AccessException;

648

}

649

650

// Topic metadata

651

public class TopicMetadata {

652

public static Builder builder(String topic) { return new Builder(topic); }

653

654

public String getTopic() { /* returns topic name */ }

655

public String getNamespace() { /* returns namespace */ }

656

public Map<String, String> getProperties() { /* returns properties */ }

657

public int getGeneration() { /* returns generation */ }

658

659

public static class Builder {

660

public Builder setNamespace(String namespace) { /* set namespace */ return this; }

661

public Builder setDescription(String description) { /* set description */ return this; }

662

public Builder setProperty(String key, String value) { /* set property */ return this; }

663

public Builder setProperties(Map<String, String> properties) { /* set properties */ return this; }

664

public TopicMetadata build() { /* build metadata */ }

665

}

666

}

667

```

668

669

### Messaging Context and Usage

670

671

```java { .api }

672

// Messaging context for accessing messaging APIs

673

public interface MessagingContext {

674

MessagePublisher getMessagePublisher();

675

MessageFetcher getMessageFetcher();

676

MessagingAdmin getMessagingAdmin();

677

}

678

679

// Usage in worker programs

680

public class MessageProcessingWorker extends AbstractWorker {

681

682

@Override

683

public void configure(WorkerConfigurer configurer) {

684

configurer.setName("MessageProcessor");

685

configurer.setDescription("Processes messages from topic");

686

}

687

688

@Override

689

public void run() throws Exception {

690

WorkerContext context = getContext();

691

MessagingContext messagingContext = context.getMessagingContext();

692

693

MessageFetcher fetcher = messagingContext.getMessageFetcher();

694

MessagePublisher publisher = messagingContext.getMessagePublisher();

695

696

String namespace = context.getNamespace();

697

long lastProcessedId = getLastProcessedMessageId();

698

699

while (context.getState().equals(ProgramRunStatus.RUNNING)) {

700

try (CloseableIterator<Message> messages =

701

fetcher.fetch(namespace, "input-topic", 100, lastProcessedId)) {

702

703

while (messages.hasNext()) {

704

Message message = messages.next();

705

706

// Process message

707

ProcessedMessage processed = processMessage(message);

708

709

// Publish result

710

if (processed != null) {

711

publisher.publish(namespace, "output-topic", processed.toJson());

712

}

713

714

lastProcessedId = Long.parseLong(message.getId());

715

}

716

}

717

718

// Save checkpoint

719

saveLastProcessedMessageId(lastProcessedId);

720

721

// Sleep before next fetch

722

Thread.sleep(1000);

723

}

724

}

725

726

private ProcessedMessage processMessage(Message message) {

727

// Implementation for message processing

728

return new ProcessedMessage(new String(message.getPayload(), StandardCharsets.UTF_8));

729

}

730

731

private long getLastProcessedMessageId() {

732

// Implementation to retrieve last processed message ID

733

return 0L;

734

}

735

736

private void saveLastProcessedMessageId(long messageId) {

737

// Implementation to save checkpoint

738

}

739

}

740

741

// Usage in MapReduce for batch message processing

742

public class MessageBatchProcessor extends AbstractMapReduce {

743

744

@Override

745

public void initialize(MapReduceContext context) throws Exception {

746

Job job = context.getHadoopJob();

747

748

// Configure to read from messaging system

749

MessagingUtils.configureInput(job, context.getNamespace(), "batch-topic");

750

751

// Configure output dataset

752

context.setOutput(Output.ofDataset("processed_messages"));

753

754

job.setMapperClass(MessageMapper.class);

755

job.setReducerClass(MessageAggregator.class);

756

}

757

758

public static class MessageMapper extends Mapper<LongWritable, Message, Text, IntWritable> {

759

760

@Override

761

protected void map(LongWritable key, Message message, Context context)

762

throws IOException, InterruptedException {

763

764

String payload = new String(message.getPayload(), StandardCharsets.UTF_8);

765

JsonObject json = new JsonParser().parse(payload).getAsJsonObject();

766

767

String eventType = json.get("eventType").getAsString();

768

context.write(new Text(eventType), new IntWritable(1));

769

}

770

}

771

772

public static class MessageAggregator extends Reducer<Text, IntWritable, byte[], Put> {

773

774

@Override

775

protected void reduce(Text eventType, Iterable<IntWritable> counts, Context context)

776

throws IOException, InterruptedException {

777

778

int total = 0;

779

for (IntWritable count : counts) {

780

total += count.get();

781

}

782

783

Put put = new Put(Bytes.toBytes(eventType.toString()));

784

put.add("stats", "count", total);

785

put.add("stats", "timestamp", System.currentTimeMillis());

786

787

context.write(Bytes.toBytes(eventType.toString()), put);

788

}

789

}

790

}

791

```

792

793

## Advanced Dataset Patterns

794

795

### Custom Dataset Implementation

796

797

```java { .api }

798

// Custom dataset definition

799

public abstract class AbstractDatasetDefinition<T extends Dataset>

800

implements DatasetDefinition<T> {

801

802

private final String name;

803

804

public AbstractDatasetDefinition(String name) {

805

this.name = name;

806

}

807

808

@Override

809

public String getName() {

810

return name;

811

}

812

813

@Override

814

public abstract DatasetSpecification configure(String instanceName, DatasetProperties properties);

815

816

@Override

817

public abstract T getDataset(DatasetContext datasetContext, DatasetSpecification spec,

818

Map<String, String> arguments, ClassLoader classLoader) throws IOException;

819

}

820

821

// Dataset state persistence

822

public interface DatasetStatePersistor {

823

void persistState(String name, byte[] state) throws IOException;

824

byte[] readState(String name) throws IOException;

825

void deleteState(String name) throws IOException;

826

}

827

828

// Composite dataset for combining multiple datasets

829

public abstract class CompositeDatasetDefinition<T extends Dataset>

830

extends AbstractDatasetDefinition<T> {

831

832

protected CompositeDatasetDefinition(String name) {

833

super(name);

834

}

835

836

// Methods for managing constituent datasets

837

protected abstract Map<String, DatasetSpecification> getConstituentDatasets(DatasetProperties properties);

838

}

839

```

840

841

The CDAP data management framework provides a comprehensive, abstracted approach to data storage and access, enabling applications to work with various storage systems through consistent APIs while maintaining enterprise-grade features like transactions, security, and operational control.