0
# Dataset Management
1
2
CDAP's Dataset Management system provides a comprehensive abstraction layer for data storage and access, supporting both built-in dataset types and custom implementations with ACID transaction support.
3
4
## Core Dataset Interfaces
5
6
### Dataset
7
8
```java { .api }
9
public interface Dataset extends Closeable {
10
@Override
11
void close();
12
}
13
```
14
15
Base interface for all datasets. All dataset implementations must extend this interface and provide proper resource cleanup.
16
17
### DatasetDefinition
18
19
```java { .api }
20
public interface DatasetDefinition<D extends Dataset, A extends DatasetAdmin> {
21
String getName();
22
23
D getDataset(DatasetContext datasetContext, DatasetSpecification spec,
24
Map<String, String> arguments, ClassLoader classLoader) throws IOException;
25
26
A getAdmin(DatasetContext datasetContext, DatasetSpecification spec,
27
ClassLoader classLoader) throws IOException;
28
29
DatasetSpecification configure(String instanceName, DatasetProperties properties);
30
31
DatasetSpecification reconfigure(String instanceName, DatasetProperties newProperties,
32
DatasetSpecification currentSpec) throws IncompatibleUpdateException;
33
}
34
```
35
36
Defines how dataset instances are created, configured, and managed. Custom dataset types implement this interface.
37
38
### AbstractDatasetDefinition
39
40
```java { .api }
41
public abstract class AbstractDatasetDefinition<D extends Dataset, A extends DatasetAdmin>
42
implements DatasetDefinition<D, A> {
43
44
protected final String name;
45
46
protected AbstractDatasetDefinition(String name);
47
48
@Override
49
public final String getName();
50
51
@Override
52
public DatasetSpecification configure(String instanceName, DatasetProperties properties);
53
54
@Override
55
public DatasetSpecification reconfigure(String instanceName, DatasetProperties newProperties,
56
DatasetSpecification currentSpec) throws IncompatibleUpdateException;
57
}
58
```
59
60
Base implementation for dataset definitions providing common functionality.
61
62
## Dataset Administration
63
64
### DatasetAdmin
65
66
```java { .api }
67
public interface DatasetAdmin {
68
boolean exists() throws IOException;
69
void create() throws IOException;
70
void drop() throws IOException;
71
void truncate() throws IOException;
72
void upgrade() throws IOException;
73
}
74
```
75
76
Administrative operations for dataset lifecycle management.
77
78
### DatasetManager
79
80
```java { .api }
81
public interface DatasetManager {
82
<T extends Dataset> T getDataset(String name) throws DatasetInstantiationException;
83
<T extends Dataset> T getDataset(String name, Map<String, String> arguments)
84
throws DatasetInstantiationException;
85
void releaseDataset(Dataset dataset);
86
}
87
```
88
89
Manager for dataset instance creation and lifecycle.
90
91
## Dataset Configuration
92
93
### DatasetProperties
94
95
```java { .api }
96
public class DatasetProperties {
97
public static Builder builder();
98
99
public Map<String, String> getProperties();
100
public String get(String key);
101
public String get(String key, String defaultValue);
102
103
public static class Builder {
104
public Builder add(String key, String value);
105
public Builder addAll(Map<String, String> properties);
106
public DatasetProperties build();
107
}
108
}
109
```
110
111
Configuration properties for dataset instances.
112
113
### DatasetSpecification
114
115
```java { .api }
116
public class DatasetSpecification {
117
public String getName();
118
public String getType();
119
public Map<String, String> getProperties();
120
public Map<String, DatasetSpecification> getSpecifications();
121
122
public static Builder builder(String name, String type);
123
124
public static class Builder {
125
public Builder properties(Map<String, String> properties);
126
public Builder property(String key, String value);
127
public Builder datasets(DatasetSpecification... specifications);
128
public DatasetSpecification build();
129
}
130
}
131
```
132
133
Complete specification of a dataset including type, properties, and nested datasets.
134
135
## Built-in Dataset Types
136
137
### Key-Value Storage
138
139
```java { .api }
140
public class KeyValueTable extends AbstractDataset {
141
public void write(String key, String value);
142
public void write(byte[] key, byte[] value);
143
public void write(String key, byte[] value);
144
145
public String read(String key);
146
public byte[] read(byte[] key);
147
148
public void delete(String key);
149
public void delete(byte[] key);
150
151
public CloseableIterator<KeyValue<byte[], byte[]>> scan(byte[] startKey, byte[] stopKey);
152
}
153
```
154
155
Simple key-value storage supporting string and byte array keys/values.
156
157
### Table Storage
158
159
```java { .api }
160
public interface Table extends Dataset {
161
byte[] read(byte[] row, byte[] column);
162
byte[] read(byte[] row, String column);
163
Row get(byte[] row);
164
Row get(byte[] row, byte[][] columns);
165
Row get(byte[] row, String[] columns);
166
Row get(Get get);
167
168
void put(byte[] row, byte[] column, byte[] value);
169
void put(byte[] row, String column, byte[] value);
170
void put(byte[] row, String column, String value);
171
void put(Put put);
172
173
void delete(byte[] row);
174
void delete(byte[] row, byte[] column);
175
void delete(byte[] row, String column);
176
void delete(Delete delete);
177
178
Scanner scan(byte[] startRow, byte[] stopRow);
179
Scanner scan(Scan scan);
180
181
void increment(byte[] row, byte[] column, long amount);
182
void increment(Increment increment);
183
}
184
```
185
186
HBase-style table with row/column storage and atomic operations.
187
188
### Object Storage
189
190
```java { .api }
191
public class ObjectStore<T> extends AbstractDataset {
192
public void write(byte[] key, T object);
193
public void write(String key, T object);
194
195
public T read(byte[] key);
196
public T read(String key);
197
198
public void delete(byte[] key);
199
public void delete(String key);
200
201
public CloseableIterator<KeyValue<byte[], T>> scan(byte[] startKey, byte[] stopKey);
202
}
203
```
204
205
Type-safe object storage with automatic serialization/deserialization.
206
207
### File Storage
208
209
```java { .api }
210
public interface FileSet extends Dataset {
211
Location getLocation(String relativePath);
212
Location getBaseLocation();
213
214
Iterable<Location> getInputLocations();
215
Location getOutputLocation();
216
217
Map<String, String> getInputArguments();
218
Map<String, String> getOutputArguments();
219
}
220
```
221
222
File-based dataset for storing and accessing files in distributed storage.
223
224
### Partitioned File Storage
225
226
```java { .api }
227
public interface PartitionedFileSet extends Dataset {
228
PartitionOutput getPartitionOutput(PartitionKey key);
229
PartitionOutput getPartitionOutput(PartitionKey key, DatasetArguments arguments);
230
231
Partition getPartition(PartitionKey key);
232
Set<Partition> getPartitions(PartitionFilter filter);
233
234
void addPartition(PartitionKey key, String path);
235
void addPartition(PartitionKey key, String path, Map<String, String> metadata);
236
237
void dropPartition(PartitionKey key);
238
239
PartitionConsumer getPartitionConsumer();
240
}
241
```
242
243
Partitioned file storage supporting efficient querying and processing of large datasets organized by partition keys.
244
245
## Custom Dataset Implementation
246
247
### Abstract Base Classes
248
249
```java { .api }
250
public abstract class AbstractDataset implements Dataset {
251
protected final DatasetSpecification spec;
252
protected final Map<String, String> arguments;
253
254
protected AbstractDataset(DatasetSpecification spec, Map<String, String> arguments);
255
256
public final DatasetSpecification getSpec();
257
public final Map<String, String> getArguments();
258
259
@Override
260
public void close() {
261
// Default implementation - override if needed
262
}
263
}
264
```
265
266
Base class for custom dataset implementations.
267
268
## Dataset Context
269
270
### DatasetContext
271
272
```java { .api }
273
public interface DatasetContext {
274
<T extends Dataset> T getDataset(String name) throws DatasetInstantiationException;
275
<T extends Dataset> T getDataset(String name, Map<String, String> arguments)
276
throws DatasetInstantiationException;
277
void releaseDataset(Dataset dataset);
278
}
279
```
280
281
Context interface providing dataset access within programs and services.
282
283
## Usage Examples
284
285
### Basic Dataset Operations
286
287
```java
288
public class DatasetExample extends AbstractMapReduce {
289
290
@Override
291
public void configure(MapReduceConfigurer configurer) {
292
configurer.useDataset("userProfiles");
293
configurer.useDataset("userScores");
294
}
295
296
@Override
297
public void initialize(MapReduceContext context) throws Exception {
298
// Access datasets in the context
299
KeyValueTable profiles = context.getDataset("userProfiles");
300
ObjectStore<UserScore> scores = context.getDataset("userScores");
301
302
// Read data
303
String profile = profiles.read("user123");
304
UserScore score = scores.read("user123");
305
306
// Write data
307
profiles.write("user456", "profile data");
308
scores.write("user456", new UserScore(100, "Gold"));
309
}
310
}
311
```
312
313
### Dataset Creation in Application
314
315
```java
316
public class DataApplication extends AbstractApplication<Config> {
317
318
@Override
319
public void configure() {
320
// Create simple key-value dataset
321
createDataset("userCache", KeyValueTable.class);
322
323
// Create table with properties
324
createDataset("userTable", Table.class,
325
DatasetProperties.builder()
326
.add("table.rowkey.ttl", "3600")
327
.build());
328
329
// Create partitioned file set
330
createDataset("logs", PartitionedFileSet.class,
331
DatasetProperties.builder()
332
.add("schema", logSchema)
333
.add("partitioning", "year/month/day")
334
.build());
335
336
addMapReduce(new DataProcessor());
337
}
338
}
339
```
340
341
### Custom Dataset Implementation
342
343
```java
344
public class CounterDataset extends AbstractDataset {
345
private final Table table;
346
347
public CounterDataset(DatasetSpecification spec, Map<String, String> arguments, Table table) {
348
super(spec, arguments);
349
this.table = table;
350
}
351
352
public void increment(String counter, long delta) {
353
table.increment(counter.getBytes(), "count".getBytes(), delta);
354
}
355
356
public long get(String counter) {
357
byte[] value = table.read(counter.getBytes(), "count".getBytes());
358
return value == null ? 0 : Bytes.toLong(value);
359
}
360
361
@Override
362
public void close() {
363
table.close();
364
}
365
}
366
367
public class CounterDatasetDefinition extends AbstractDatasetDefinition<CounterDataset, DatasetAdmin> {
368
369
public CounterDatasetDefinition(String name) {
370
super(name);
371
}
372
373
@Override
374
public CounterDataset getDataset(DatasetContext datasetContext, DatasetSpecification spec,
375
Map<String, String> arguments, ClassLoader classLoader) throws IOException {
376
Table table = datasetContext.getDataset("table");
377
return new CounterDataset(spec, arguments, table);
378
}
379
380
@Override
381
public DatasetAdmin getAdmin(DatasetContext datasetContext, DatasetSpecification spec,
382
ClassLoader classLoader) throws IOException {
383
return datasetContext.getDataset("table").getAdmin();
384
}
385
}
386
```
387
388
### Transaction Support
389
390
```java
391
public class TransactionalDatasetExample extends AbstractService {
392
393
@UseDataSet("userAccounts")
394
private Table accounts;
395
396
@UseDataSet("transactions")
397
private ObjectStore<Transaction> transactions;
398
399
public void transferFunds(String fromAccount, String toAccount, double amount) {
400
Transactionals.execute(this, new TxRunnable() {
401
@Override
402
public void run(DatasetContext context) throws Exception {
403
Table accounts = context.getDataset("userAccounts");
404
ObjectStore<Transaction> transactions = context.getDataset("transactions");
405
406
// Read current balances
407
double fromBalance = getBalance(accounts, fromAccount);
408
double toBalance = getBalance(accounts, toAccount);
409
410
// Validate and perform transfer
411
if (fromBalance >= amount) {
412
setBalance(accounts, fromAccount, fromBalance - amount);
413
setBalance(accounts, toAccount, toBalance + amount);
414
415
// Log transaction
416
Transaction tx = new Transaction(fromAccount, toAccount, amount, System.currentTimeMillis());
417
transactions.write(UUID.randomUUID().toString(), tx);
418
} else {
419
throw new InsufficientFundsException();
420
}
421
}
422
});
423
}
424
}
425
```