or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotations.mdapplication-framework.mddataset-management.mdindex.mdmapreduce-programs.mdplugin-framework.mdscheduling.mdservice-programs.mdspark-programs.mdsystem-services.mdtransactions.mdworker-programs.mdworkflow-programs.md

dataset-management.mddocs/

0

# Dataset Management

1

2

CDAP's Dataset Management system provides a comprehensive abstraction layer for data storage and access, supporting both built-in dataset types and custom implementations with ACID transaction support.

3

4

## Core Dataset Interfaces

5

6

### Dataset

7

8

```java { .api }

9

public interface Dataset extends Closeable {

10

@Override

11

void close();

12

}

13

```

14

15

Base interface for all datasets. All dataset implementations must extend this interface and provide proper resource cleanup.

16

17

### DatasetDefinition

18

19

```java { .api }

20

public interface DatasetDefinition<D extends Dataset, A extends DatasetAdmin> {

21

String getName();

22

23

D getDataset(DatasetContext datasetContext, DatasetSpecification spec,

24

Map<String, String> arguments, ClassLoader classLoader) throws IOException;

25

26

A getAdmin(DatasetContext datasetContext, DatasetSpecification spec,

27

ClassLoader classLoader) throws IOException;

28

29

DatasetSpecification configure(String instanceName, DatasetProperties properties);

30

31

DatasetSpecification reconfigure(String instanceName, DatasetProperties newProperties,

32

DatasetSpecification currentSpec) throws IncompatibleUpdateException;

33

}

34

```

35

36

Defines how dataset instances are created, configured, and managed. Custom dataset types implement this interface.

37

38

### AbstractDatasetDefinition

39

40

```java { .api }

41

public abstract class AbstractDatasetDefinition<D extends Dataset, A extends DatasetAdmin>

42

implements DatasetDefinition<D, A> {

43

44

protected final String name;

45

46

protected AbstractDatasetDefinition(String name);

47

48

@Override

49

public final String getName();

50

51

@Override

52

public DatasetSpecification configure(String instanceName, DatasetProperties properties);

53

54

@Override

55

public DatasetSpecification reconfigure(String instanceName, DatasetProperties newProperties,

56

DatasetSpecification currentSpec) throws IncompatibleUpdateException;

57

}

58

```

59

60

Base implementation for dataset definitions providing common functionality.

61

62

## Dataset Administration

63

64

### DatasetAdmin

65

66

```java { .api }

67

public interface DatasetAdmin {

68

boolean exists() throws IOException;

69

void create() throws IOException;

70

void drop() throws IOException;

71

void truncate() throws IOException;

72

void upgrade() throws IOException;

73

}

74

```

75

76

Administrative operations for dataset lifecycle management.

77

78

### DatasetManager

79

80

```java { .api }

81

public interface DatasetManager {

82

<T extends Dataset> T getDataset(String name) throws DatasetInstantiationException;

83

<T extends Dataset> T getDataset(String name, Map<String, String> arguments)

84

throws DatasetInstantiationException;

85

void releaseDataset(Dataset dataset);

86

}

87

```

88

89

Manager for dataset instance creation and lifecycle.

90

91

## Dataset Configuration

92

93

### DatasetProperties

94

95

```java { .api }

96

public class DatasetProperties {

97

public static Builder builder();

98

99

public Map<String, String> getProperties();

100

public String get(String key);

101

public String get(String key, String defaultValue);

102

103

public static class Builder {

104

public Builder add(String key, String value);

105

public Builder addAll(Map<String, String> properties);

106

public DatasetProperties build();

107

}

108

}

109

```

110

111

Configuration properties for dataset instances.

112

113

### DatasetSpecification

114

115

```java { .api }

116

public class DatasetSpecification {

117

public String getName();

118

public String getType();

119

public Map<String, String> getProperties();

120

public Map<String, DatasetSpecification> getSpecifications();

121

122

public static Builder builder(String name, String type);

123

124

public static class Builder {

125

public Builder properties(Map<String, String> properties);

126

public Builder property(String key, String value);

127

public Builder datasets(DatasetSpecification... specifications);

128

public DatasetSpecification build();

129

}

130

}

131

```

132

133

Complete specification of a dataset including type, properties, and nested datasets.

134

135

## Built-in Dataset Types

136

137

### Key-Value Storage

138

139

```java { .api }

140

public class KeyValueTable extends AbstractDataset {

141

public void write(String key, String value);

142

public void write(byte[] key, byte[] value);

143

public void write(String key, byte[] value);

144

145

public String read(String key);

146

public byte[] read(byte[] key);

147

148

public void delete(String key);

149

public void delete(byte[] key);

150

151

public CloseableIterator<KeyValue<byte[], byte[]>> scan(byte[] startKey, byte[] stopKey);

152

}

153

```

154

155

Simple key-value storage supporting string and byte array keys/values.

156

157

### Table Storage

158

159

```java { .api }

160

public interface Table extends Dataset {

161

byte[] read(byte[] row, byte[] column);

162

byte[] read(byte[] row, String column);

163

Row get(byte[] row);

164

Row get(byte[] row, byte[][] columns);

165

Row get(byte[] row, String[] columns);

166

Row get(Get get);

167

168

void put(byte[] row, byte[] column, byte[] value);

169

void put(byte[] row, String column, byte[] value);

170

void put(byte[] row, String column, String value);

171

void put(Put put);

172

173

void delete(byte[] row);

174

void delete(byte[] row, byte[] column);

175

void delete(byte[] row, String column);

176

void delete(Delete delete);

177

178

Scanner scan(byte[] startRow, byte[] stopRow);

179

Scanner scan(Scan scan);

180

181

void increment(byte[] row, byte[] column, long amount);

182

void increment(Increment increment);

183

}

184

```

185

186

HBase-style table with row/column storage and atomic operations.

187

188

### Object Storage

189

190

```java { .api }

191

public class ObjectStore<T> extends AbstractDataset {

192

public void write(byte[] key, T object);

193

public void write(String key, T object);

194

195

public T read(byte[] key);

196

public T read(String key);

197

198

public void delete(byte[] key);

199

public void delete(String key);

200

201

public CloseableIterator<KeyValue<byte[], T>> scan(byte[] startKey, byte[] stopKey);

202

}

203

```

204

205

Type-safe object storage with automatic serialization/deserialization.

206

207

### File Storage

208

209

```java { .api }

210

public interface FileSet extends Dataset {

211

Location getLocation(String relativePath);

212

Location getBaseLocation();

213

214

Iterable<Location> getInputLocations();

215

Location getOutputLocation();

216

217

Map<String, String> getInputArguments();

218

Map<String, String> getOutputArguments();

219

}

220

```

221

222

File-based dataset for storing and accessing files in distributed storage.

223

224

### Partitioned File Storage

225

226

```java { .api }

227

public interface PartitionedFileSet extends Dataset {

228

PartitionOutput getPartitionOutput(PartitionKey key);

229

PartitionOutput getPartitionOutput(PartitionKey key, DatasetArguments arguments);

230

231

Partition getPartition(PartitionKey key);

232

Set<Partition> getPartitions(PartitionFilter filter);

233

234

void addPartition(PartitionKey key, String path);

235

void addPartition(PartitionKey key, String path, Map<String, String> metadata);

236

237

void dropPartition(PartitionKey key);

238

239

PartitionConsumer getPartitionConsumer();

240

}

241

```

242

243

Partitioned file storage supporting efficient querying and processing of large datasets organized by partition keys.

244

245

## Custom Dataset Implementation

246

247

### Abstract Base Classes

248

249

```java { .api }

250

public abstract class AbstractDataset implements Dataset {

251

protected final DatasetSpecification spec;

252

protected final Map<String, String> arguments;

253

254

protected AbstractDataset(DatasetSpecification spec, Map<String, String> arguments);

255

256

public final DatasetSpecification getSpec();

257

public final Map<String, String> getArguments();

258

259

@Override

260

public void close() {

261

// Default implementation - override if needed

262

}

263

}

264

```

265

266

Base class for custom dataset implementations.

267

268

## Dataset Context

269

270

### DatasetContext

271

272

```java { .api }

273

public interface DatasetContext {

274

<T extends Dataset> T getDataset(String name) throws DatasetInstantiationException;

275

<T extends Dataset> T getDataset(String name, Map<String, String> arguments)

276

throws DatasetInstantiationException;

277

void releaseDataset(Dataset dataset);

278

}

279

```

280

281

Context interface providing dataset access within programs and services.

282

283

## Usage Examples

284

285

### Basic Dataset Operations

286

287

```java

288

public class DatasetExample extends AbstractMapReduce {

289

290

@Override

291

public void configure(MapReduceConfigurer configurer) {

292

configurer.useDataset("userProfiles");

293

configurer.useDataset("userScores");

294

}

295

296

@Override

297

public void initialize(MapReduceContext context) throws Exception {

298

// Access datasets in the context

299

KeyValueTable profiles = context.getDataset("userProfiles");

300

ObjectStore<UserScore> scores = context.getDataset("userScores");

301

302

// Read data

303

String profile = profiles.read("user123");

304

UserScore score = scores.read("user123");

305

306

// Write data

307

profiles.write("user456", "profile data");

308

scores.write("user456", new UserScore(100, "Gold"));

309

}

310

}

311

```

312

313

### Dataset Creation in Application

314

315

```java

316

public class DataApplication extends AbstractApplication<Config> {

317

318

@Override

319

public void configure() {

320

// Create simple key-value dataset

321

createDataset("userCache", KeyValueTable.class);

322

323

// Create table with properties

324

createDataset("userTable", Table.class,

325

DatasetProperties.builder()

326

.add("table.rowkey.ttl", "3600")

327

.build());

328

329

// Create partitioned file set

330

createDataset("logs", PartitionedFileSet.class,

331

DatasetProperties.builder()

332

.add("schema", logSchema)

333

.add("partitioning", "year/month/day")

334

.build());

335

336

addMapReduce(new DataProcessor());

337

}

338

}

339

```

340

341

### Custom Dataset Implementation

342

343

```java

344

public class CounterDataset extends AbstractDataset {

345

private final Table table;

346

347

public CounterDataset(DatasetSpecification spec, Map<String, String> arguments, Table table) {

348

super(spec, arguments);

349

this.table = table;

350

}

351

352

public void increment(String counter, long delta) {

353

table.increment(counter.getBytes(), "count".getBytes(), delta);

354

}

355

356

public long get(String counter) {

357

byte[] value = table.read(counter.getBytes(), "count".getBytes());

358

return value == null ? 0 : Bytes.toLong(value);

359

}

360

361

@Override

362

public void close() {

363

table.close();

364

}

365

}

366

367

public class CounterDatasetDefinition extends AbstractDatasetDefinition<CounterDataset, DatasetAdmin> {

368

369

public CounterDatasetDefinition(String name) {

370

super(name);

371

}

372

373

@Override

374

public CounterDataset getDataset(DatasetContext datasetContext, DatasetSpecification spec,

375

Map<String, String> arguments, ClassLoader classLoader) throws IOException {

376

Table table = datasetContext.getDataset("table");

377

return new CounterDataset(spec, arguments, table);

378

}

379

380

@Override

381

public DatasetAdmin getAdmin(DatasetContext datasetContext, DatasetSpecification spec,

382

ClassLoader classLoader) throws IOException {

383

return datasetContext.getDataset("table").getAdmin();

384

}

385

}

386

```

387

388

### Transaction Support

389

390

```java

391

public class TransactionalDatasetExample extends AbstractService {

392

393

@UseDataSet("userAccounts")

394

private Table accounts;

395

396

@UseDataSet("transactions")

397

private ObjectStore<Transaction> transactions;

398

399

public void transferFunds(String fromAccount, String toAccount, double amount) {

400

Transactionals.execute(this, new TxRunnable() {

401

@Override

402

public void run(DatasetContext context) throws Exception {

403

Table accounts = context.getDataset("userAccounts");

404

ObjectStore<Transaction> transactions = context.getDataset("transactions");

405

406

// Read current balances

407

double fromBalance = getBalance(accounts, fromAccount);

408

double toBalance = getBalance(accounts, toAccount);

409

410

// Validate and perform transfer

411

if (fromBalance >= amount) {

412

setBalance(accounts, fromAccount, fromBalance - amount);

413

setBalance(accounts, toAccount, toBalance + amount);

414

415

// Log transaction

416

Transaction tx = new Transaction(fromAccount, toAccount, amount, System.currentTimeMillis());

417

transactions.write(UUID.randomUUID().toString(), tx);

418

} else {

419

throw new InsufficientFundsException();

420

}

421

}

422

});

423

}

424

}

425

```