Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
MapReduce programs in CDAP provide distributed batch processing capabilities built on Apache Hadoop MapReduce, with integrated access to CDAP datasets, metrics, and services.
public interface MapReduce extends ProgramLifecycle<MapReduceContext> {
void configure(MapReduceConfigurer configurer);
}Base interface for MapReduce programs. Implementations must provide configuration logic and can optionally implement lifecycle methods.
public abstract class AbstractMapReduce implements MapReduce {
public abstract void configure(MapReduceConfigurer configurer);
@Override
public void initialize(MapReduceContext context) throws Exception {
// Optional initialization logic
}
@Override
public void destroy() {
// Optional cleanup logic
}
}Base implementation class for MapReduce programs providing default lifecycle behavior.
public interface MapReduceConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
void setMapperResources(Resources resources);
void setReducerResources(Resources resources);
void setDriverResources(Resources resources);
}Interface for configuring MapReduce programs including resource allocation and dataset usage.
public class MapReduceSpecification implements ProgramSpecification {
public String getName();
public String getDescription();
public String getClassName();
public Map<String, String> getProperties();
public Resources getMapperResources();
public Resources getReducerResources();
public Resources getDriverResources();
public Set<String> getDatasets();
}Complete specification of a MapReduce program.
public interface MapReduceContext extends RuntimeContext, DatasetContext, ServiceDiscoverer {
<T> T getHadoopJob();
void addInput(Input input);
void addOutput(Output output);
Map<String, String> getRuntimeArguments();
WorkflowToken getWorkflowToken();
void setMapperResources(Resources resources);
void setReducerResources(Resources resources);
void setNumReducers(int numReducers);
}Runtime context available to MapReduce programs providing access to Hadoop job configuration, input/output specification, and CDAP services.
public interface MapReduceTaskContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>, DatasetContext {
Metrics getMetrics();
ServiceDiscoverer getServiceDiscoverer();
PluginContext getPluginContext();
WorkflowToken getWorkflowToken();
String getNamespace();
String getApplicationName();
String getProgramName();
String getRunId();
}Task-level context available within mapper and reducer implementations.
public class WordCountMapReduce extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.setName("WordCount");
configurer.setDescription("Counts words in text files");
// Configure resources
configurer.setMapperResources(new Resources(1024)); // 1GB for mappers
configurer.setReducerResources(new Resources(2048)); // 2GB for reducers
// Use datasets
configurer.useDataset("textFiles");
configurer.useDataset("wordCounts");
}
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Configure input and output
context.addInput(Input.ofDataset("textFiles"));
context.addOutput(Output.ofDataset("wordCounts"));
}
}public class CustomerDataProcessor extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.setName("CustomerProcessor");
configurer.useDataset("customers");
configurer.useDataset("processedCustomers");
}
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
job.setMapperClass(CustomerMapper.class);
job.setReducerClass(CustomerReducer.class);
context.addInput(Input.ofDataset("customers"));
context.addOutput(Output.ofDataset("processedCustomers"));
}
public static class CustomerMapper extends Mapper<byte[], Customer, Text, Customer> {
private Metrics metrics;
private KeyValueTable lookupTable;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
MapReduceTaskContext<byte[], Customer, Text, Customer> cdapContext =
(MapReduceTaskContext<byte[], Customer, Text, Customer>) context;
metrics = cdapContext.getMetrics();
lookupTable = cdapContext.getDataset("lookupTable");
}
@Override
protected void map(byte[] key, Customer customer, Context context)
throws IOException, InterruptedException {
// Process customer data
if (customer.isActive()) {
metrics.count("active.customers", 1);
// Lookup additional data
byte[] additionalData = lookupTable.read(customer.getId().getBytes());
if (additionalData != null) {
customer.setAdditionalInfo(new String(additionalData));
}
context.write(new Text(customer.getRegion()), customer);
}
}
}
}public class PluginBasedMapReduce extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.setName("PluginProcessor");
configurer.usePlugin("transform", "customerTransform", "transform1",
PluginProperties.builder()
.add("field", "customerName")
.add("operation", "uppercase")
.build());
}
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
job.setMapperClass(PluginMapper.class);
context.addInput(Input.ofDataset("rawData"));
context.addOutput(Output.ofDataset("transformedData"));
}
public static class PluginMapper extends Mapper<byte[], Record, byte[], Record> {
private CustomerTransform transformer;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
MapReduceTaskContext<byte[], Record, byte[], Record> cdapContext =
(MapReduceTaskContext<byte[], Record, byte[], Record>) context;
transformer = cdapContext.getPluginContext().newPluginInstance("transform1");
}
@Override
protected void map(byte[] key, Record record, Context context)
throws IOException, InterruptedException {
Record transformedRecord = transformer.transform(record);
context.write(key, transformedRecord);
}
}
}public class WorkflowMapReduce extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.setName("WorkflowProcessor");
configurer.useDataset("workflowData");
}
@Override
public void initialize(MapReduceContext context) throws Exception {
// Access workflow token to get data from previous workflow nodes
WorkflowToken token = context.getWorkflowToken();
String inputPath = token.get("inputPath").toString();
int batchSize = Integer.parseInt(token.get("batchSize").toString());
Job job = context.getHadoopJob();
job.setMapperClass(WorkflowAwareMapper.class);
job.getConfiguration().set("input.path", inputPath);
job.getConfiguration().setInt("batch.size", batchSize);
context.addInput(Input.ofDataset("workflowData"));
context.addOutput(Output.ofDataset("processedData"));
// Write results back to workflow token
token.put("processed.records", "0"); // Will be updated by mapper
}
}public class Input {
public static Input ofDataset(String datasetName);
public static Input ofDataset(String datasetName, Map<String, String> arguments);
public static Input ofDataset(String datasetName, DatasetStateSplitter splitter);
}public class Output {
public static Output ofDataset(String datasetName);
public static Output ofDataset(String datasetName, Map<String, String> arguments);
}These classes provide fluent APIs for configuring MapReduce input sources and output destinations, supporting various CDAP datasets and external data sources.
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api