0
# Dataset Management
1
2
Enable and disable exploration capabilities on CDAP datasets, manage partitions, handle dataset lifecycle operations, and integrate with stream processing. Provides comprehensive dataset exploration management for both simple datasets and complex partitioned data structures.
3
4
## Capabilities
5
6
### Dataset Exploration Control
7
8
Enable and disable exploration capabilities for CDAP datasets with various configuration options.
9
10
```java { .api }
11
/**
12
* Dataset exploration management via ExploreClient
13
*/
14
interface ExploreClient {
15
/**
16
* Enable exploration for a dataset (basic)
17
* @param datasetInstance dataset to enable exploration for
18
* @return future completing when operation finishes
19
*/
20
ListenableFuture<Void> enableExploreDataset(DatasetId datasetInstance);
21
22
/**
23
* Enable exploration with detailed specification
24
* @param datasetInstance dataset to enable exploration for
25
* @param spec dataset specification containing schema and properties
26
* @param truncating whether to truncate existing data during enable
27
* @return future completing when operation finishes
28
*/
29
ListenableFuture<Void> enableExploreDataset(DatasetId datasetInstance,
30
DatasetSpecification spec, boolean truncating);
31
32
/**
33
* Update exploration configuration for existing dataset
34
* @param datasetInstance dataset to update
35
* @param oldSpec previous dataset specification
36
* @param newSpec new dataset specification
37
* @return future completing when operation finishes
38
*/
39
ListenableFuture<Void> updateExploreDataset(DatasetId datasetInstance,
40
DatasetSpecification oldSpec,
41
DatasetSpecification newSpec);
42
43
/**
44
* Disable exploration for a dataset (basic)
45
* @param datasetInstance dataset to disable exploration for
46
* @return future completing when operation finishes
47
*/
48
ListenableFuture<Void> disableExploreDataset(DatasetId datasetInstance);
49
50
/**
51
* Disable exploration with specification
52
* @param datasetInstance dataset to disable exploration for
53
* @param spec dataset specification
54
* @return future completing when operation finishes
55
*/
56
ListenableFuture<Void> disableExploreDataset(DatasetId datasetInstance,
57
DatasetSpecification spec);
58
}
59
60
/**
61
* Simplified dataset exploration via ExploreFacade
62
*/
63
class ExploreFacade {
64
/**
65
* Enable exploration with automatic configuration checking
66
* @param datasetInstance dataset to enable exploration for
67
* @throws ExploreException if operation fails
68
*/
69
void enableExploreDataset(DatasetId datasetInstance) throws ExploreException;
70
71
/**
72
* Enable exploration with specification and options
73
* @param datasetInstance dataset to enable
74
* @param spec dataset specification
75
* @param truncating whether to truncate existing data
76
* @throws ExploreException if operation fails
77
*/
78
void enableExploreDataset(DatasetId datasetInstance, DatasetSpecification spec,
79
boolean truncating) throws ExploreException;
80
81
/**
82
* Update dataset exploration configuration
83
* @param datasetInstance dataset to update
84
* @param oldSpec old specification
85
* @param newSpec new specification
86
* @throws ExploreException if operation fails
87
*/
88
void updateExploreDataset(DatasetId datasetInstance, DatasetSpecification oldSpec,
89
DatasetSpecification newSpec) throws ExploreException;
90
91
/**
92
* Disable exploration for a dataset
93
* @param datasetInstance dataset to disable exploration for
94
* @throws ExploreException if operation fails
95
*/
96
void disableExploreDataset(DatasetId datasetInstance) throws ExploreException;
97
98
/**
99
* Disable exploration with specification
100
* @param datasetInstance dataset to disable
101
* @param spec dataset specification
102
* @throws ExploreException if operation fails
103
*/
104
void disableExploreDataset(DatasetId datasetInstance, DatasetSpecification spec)
105
throws ExploreException;
106
}
107
```
108
109
### Partition Management
110
111
Manage partitions for partitioned datasets including adding, dropping, and maintaining partitions.
112
113
```java { .api }
114
/**
115
* Partition management operations
116
*/
117
interface ExploreClient {
118
/**
119
* Add a partition to a partitioned dataset
120
* @param datasetInstance dataset to add partition to
121
* @param spec dataset specification
122
* @param key partition key identifying the partition
123
* @param path file system path for the partition data
124
* @return future completing when partition is added
125
*/
126
ListenableFuture<Void> addPartition(DatasetId datasetInstance, DatasetSpecification spec,
127
PartitionKey key, String path);
128
129
/**
130
* Drop a partition from a partitioned dataset
131
* @param datasetInstance dataset to drop partition from
132
* @param spec dataset specification
133
* @param key partition key identifying the partition to drop
134
* @return future completing when partition is dropped
135
*/
136
ListenableFuture<Void> dropPartition(DatasetId datasetInstance, DatasetSpecification spec,
137
PartitionKey key);
138
139
/**
140
* Concatenate partition files for optimization
141
* @param datasetInstance dataset containing the partition
142
* @param spec dataset specification
143
* @param key partition key identifying the partition
144
* @return future completing when concatenation finishes
145
*/
146
ListenableFuture<Void> concatenatePartition(DatasetId datasetInstance, DatasetSpecification spec,
147
PartitionKey key);
148
}
149
150
/**
151
* Partition management via ExploreFacade
152
*/
153
class ExploreFacade {
154
/**
155
* Add partition with automatic configuration
156
* @param datasetInstance dataset to add partition to
157
* @param spec dataset specification
158
* @param key partition key
159
* @param location partition data location
160
* @throws ExploreException if operation fails
161
*/
162
void addPartition(DatasetId datasetInstance, DatasetSpecification spec,
163
PartitionKey key, String location) throws ExploreException;
164
165
/**
166
* Drop partition with automatic configuration
167
* @param datasetInstance dataset to drop partition from
168
* @param spec dataset specification
169
* @param key partition key
170
* @throws ExploreException if operation fails
171
*/
172
void dropPartition(DatasetId datasetInstance, DatasetSpecification spec,
173
PartitionKey key) throws ExploreException;
174
175
/**
176
* Concatenate partition files
177
* @param datasetInstance dataset containing partition
178
* @param spec dataset specification
179
* @param key partition key
180
* @return future for async operation
181
* @throws ExploreException if operation fails
182
*/
183
ListenableFuture<Void> concatenatePartition(DatasetId datasetInstance, DatasetSpecification spec,
184
PartitionKey key) throws ExploreException;
185
}
186
```
187
188
### Stream Integration
189
190
Enable and manage exploration for CDAP streams with format specifications.
191
192
```java { .api }
193
/**
194
* Stream exploration management
195
*/
196
interface ExploreClient {
197
/**
198
* Enable exploration for a stream with format specification
199
* @param stream stream to enable exploration for
200
* @param tableName table name to create for the stream
201
* @param format format specification for parsing stream data
202
* @return future completing when stream exploration is enabled
203
*/
204
ListenableFuture<Void> enableExploreStream(StreamId stream, String tableName,
205
FormatSpecification format);
206
207
/**
208
* Disable exploration for a stream
209
* @param stream stream to disable exploration for
210
* @param tableName table name for the stream
211
* @return future completing when stream exploration is disabled
212
*/
213
ListenableFuture<Void> disableExploreStream(StreamId stream, String tableName);
214
}
215
216
/**
217
* Stream exploration via ExploreFacade
218
*/
219
class ExploreFacade {
220
/**
221
* Enable stream exploration with format
222
* @param stream stream to enable exploration for
223
* @param tableName table name for the stream
224
* @param format format specification
225
* @throws ExploreException if operation fails
226
*/
227
void enableExploreStream(StreamId stream, String tableName,
228
FormatSpecification format) throws ExploreException;
229
230
/**
231
* Disable stream exploration
232
* @param stream stream to disable exploration for
233
* @param tableName table name for the stream
234
* @throws ExploreException if operation fails
235
*/
236
void disableExploreStream(StreamId stream, String tableName) throws ExploreException;
237
}
238
```
239
240
### Namespace Management
241
242
Create and manage namespaces for dataset organization and access control.
243
244
```java { .api }
245
/**
246
* Namespace management operations
247
*/
248
interface Explore {
249
/**
250
* Create a new namespace for exploration
251
* @param namespaceMeta namespace metadata and configuration
252
* @return query handle for the creation operation
253
* @throws ExploreException if namespace creation fails
254
*/
255
QueryHandle createNamespace(NamespaceMeta namespaceMeta) throws ExploreException;
256
257
/**
258
* Delete a namespace from exploration
259
* @param namespace namespace to delete
260
* @return query handle for the deletion operation
261
* @throws ExploreException if namespace deletion fails
262
*/
263
QueryHandle deleteNamespace(NamespaceId namespace) throws ExploreException;
264
}
265
266
/**
267
* Asynchronous namespace operations
268
*/
269
interface ExploreClient {
270
/**
271
* Add namespace asynchronously
272
* @param namespaceMeta namespace metadata
273
* @return future containing namespace creation results
274
*/
275
ListenableFuture<ExploreExecutionResult> addNamespace(NamespaceMeta namespaceMeta);
276
277
/**
278
* Remove namespace asynchronously
279
* @param namespace namespace to remove
280
* @return future containing namespace removal results
281
*/
282
ListenableFuture<ExploreExecutionResult> removeNamespace(NamespaceId namespace);
283
}
284
285
/**
286
* Namespace operations via ExploreFacade
287
*/
288
class ExploreFacade {
289
/**
290
* Create namespace with automatic configuration
291
* @param namespace namespace metadata
292
* @throws ExploreException if creation fails
293
*/
294
void createNamespace(NamespaceMeta namespace) throws ExploreException;
295
296
/**
297
* Remove namespace with cleanup
298
* @param namespace namespace to remove
299
* @throws ExploreException if removal fails
300
*/
301
void removeNamespace(NamespaceId namespace) throws ExploreException;
302
}
303
```
304
305
### Configuration Classes
306
307
Parameter classes for dataset operations with detailed configuration options.
308
309
```java { .api }
310
/**
311
* Parameters for enabling dataset exploration
312
*/
313
class EnableExploreParameters {
314
/**
315
* Get dataset instance identifier
316
* @return dataset ID
317
*/
318
public DatasetId getDatasetInstance();
319
320
/**
321
* Get dataset specification
322
* @return dataset specification
323
*/
324
public DatasetSpecification getSpec();
325
326
/**
327
* Check if truncating is enabled
328
* @return true if data should be truncated
329
*/
330
public boolean isTruncating();
331
}
332
333
/**
334
* Parameters for disabling dataset exploration
335
*/
336
class DisableExploreParameters {
337
/**
338
* Get dataset instance identifier
339
* @return dataset ID
340
*/
341
public DatasetId getDatasetInstance();
342
343
/**
344
* Get dataset specification
345
* @return dataset specification
346
*/
347
public DatasetSpecification getSpec();
348
}
349
350
/**
351
* Parameters for updating dataset exploration
352
*/
353
class UpdateExploreParameters {
354
/**
355
* Get dataset instance identifier
356
* @return dataset ID
357
*/
358
public DatasetId getDatasetInstance();
359
360
/**
361
* Get old dataset specification
362
* @return old specification
363
*/
364
public DatasetSpecification getOldSpec();
365
366
/**
367
* Get new dataset specification
368
* @return new specification
369
*/
370
public DatasetSpecification getNewSpec();
371
}
372
```
373
374
**Usage Examples:**
375
376
```java
377
import co.cask.cdap.explore.client.ExploreClient;
378
import co.cask.cdap.explore.client.ExploreFacade;
379
import co.cask.cdap.proto.id.DatasetId;
380
import co.cask.cdap.proto.id.StreamId;
381
import co.cask.cdap.data2.dataset2.DatasetSpecification;
382
import co.cask.cdap.api.dataset.lib.PartitionKey;
383
384
// Basic dataset exploration management
385
ExploreClient client = // obtained via dependency injection
386
387
try {
388
DatasetId dataset = new DatasetId("default", "user_events");
389
390
// Enable exploration for dataset
391
ListenableFuture<Void> enableFuture = client.enableExploreDataset(dataset);
392
enableFuture.get(); // Wait for completion
393
394
System.out.println("Dataset exploration enabled");
395
396
// Query the dataset
397
ListenableFuture<ExploreExecutionResult> queryFuture =
398
client.submit(new NamespaceId("default"),
399
"SELECT COUNT(*) FROM dataset_user_events");
400
401
ExploreExecutionResult result = queryFuture.get();
402
if (result.hasNext()) {
403
QueryResult row = result.next();
404
System.out.println("Row count: " + row.getColumns().get(0));
405
}
406
result.close();
407
408
// Disable exploration when done
409
ListenableFuture<Void> disableFuture = client.disableExploreDataset(dataset);
410
disableFuture.get();
411
412
} catch (Exception e) {
413
System.err.println("Dataset operation failed: " + e.getMessage());
414
}
415
416
// Using ExploreFacade for simplified operations
417
ExploreFacade facade = // obtained via dependency injection
418
419
try {
420
DatasetId dataset = new DatasetId("default", "product_catalog");
421
422
// Enable with automatic configuration checking
423
facade.enableExploreDataset(dataset);
424
System.out.println("Explore enabled via facade");
425
426
// Disable when done
427
facade.disableExploreDataset(dataset);
428
429
} catch (ExploreException e) {
430
System.err.println("Facade operation failed: " + e.getMessage());
431
}
432
433
// Partition management example
434
try {
435
DatasetId partitionedDataset = new DatasetId("default", "daily_logs");
436
DatasetSpecification spec = // obtained from dataset metadata
437
438
// Add a new partition
439
PartitionKey partitionKey = PartitionKey.builder()
440
.addStringField("year", "2023")
441
.addStringField("month", "09")
442
.addStringField("day", "07")
443
.build();
444
445
String partitionPath = "/data/daily_logs/2023/09/07";
446
447
ListenableFuture<Void> addFuture =
448
client.addPartition(partitionedDataset, spec, partitionKey, partitionPath);
449
addFuture.get();
450
451
System.out.println("Partition added successfully");
452
453
// Query the specific partition
454
ListenableFuture<ExploreExecutionResult> partitionQuery =
455
client.submit(new NamespaceId("default"),
456
"SELECT * FROM dataset_daily_logs WHERE year='2023' AND month='09' AND day='07'");
457
458
ExploreExecutionResult partitionResult = partitionQuery.get();
459
// Process partition results...
460
partitionResult.close();
461
462
// Drop partition when no longer needed
463
ListenableFuture<Void> dropFuture =
464
client.dropPartition(partitionedDataset, spec, partitionKey);
465
dropFuture.get();
466
467
} catch (Exception e) {
468
System.err.println("Partition operation failed: " + e.getMessage());
469
}
470
471
// Stream exploration example
472
try {
473
StreamId stream = new StreamId("default", "access_logs");
474
String tableName = "stream_access_logs";
475
476
// Define format for log parsing
477
FormatSpecification format = FormatSpecification.builder("clf")
478
.setSchema(Schema.recordOf("access_log",
479
Schema.Field.of("ip", Schema.of(Schema.Type.STRING)),
480
Schema.Field.of("timestamp", Schema.of(Schema.Type.LONG)),
481
Schema.Field.of("method", Schema.of(Schema.Type.STRING)),
482
Schema.Field.of("url", Schema.of(Schema.Type.STRING)),
483
Schema.Field.of("status", Schema.of(Schema.Type.INT))
484
))
485
.build();
486
487
// Enable stream exploration
488
ListenableFuture<Void> streamEnableFuture =
489
client.enableExploreStream(stream, tableName, format);
490
streamEnableFuture.get();
491
492
System.out.println("Stream exploration enabled");
493
494
// Query stream data
495
ListenableFuture<ExploreExecutionResult> streamQuery =
496
client.submit(new NamespaceId("default"),
497
"SELECT method, COUNT(*) FROM stream_access_logs GROUP BY method");
498
499
ExploreExecutionResult streamResult = streamQuery.get();
500
while (streamResult.hasNext()) {
501
QueryResult row = streamResult.next();
502
System.out.println("Method: " + row.getColumns().get(0) +
503
", Count: " + row.getColumns().get(1));
504
}
505
streamResult.close();
506
507
// Disable stream exploration
508
ListenableFuture<Void> streamDisableFuture =
509
client.disableExploreStream(stream, tableName);
510
streamDisableFuture.get();
511
512
} catch (Exception e) {
513
System.err.println("Stream operation failed: " + e.getMessage());
514
}
515
```