CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api

Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.

Pending
Overview
Eval results
Files

mapreduce-programs.mddocs/

MapReduce Programs

MapReduce programs in CDAP provide distributed batch processing capabilities built on Apache Hadoop MapReduce, with integrated access to CDAP datasets, metrics, and services.

Core MapReduce Interfaces

MapReduce Program Interface

public interface MapReduce extends ProgramLifecycle<MapReduceContext> {
    void configure(MapReduceConfigurer configurer);
}

Base interface for MapReduce programs. Implementations must provide configuration logic and can optionally implement lifecycle methods.

AbstractMapReduce

public abstract class AbstractMapReduce implements MapReduce {
    public abstract void configure(MapReduceConfigurer configurer);
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        // Optional initialization logic
    }
    
    @Override
    public void destroy() {
        // Optional cleanup logic
    }
}

Base implementation class for MapReduce programs providing default lifecycle behavior.

Configuration

MapReduceConfigurer

public interface MapReduceConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
    void setMapperResources(Resources resources);
    void setReducerResources(Resources resources);
    void setDriverResources(Resources resources);
}

Interface for configuring MapReduce programs including resource allocation and dataset usage.

MapReduceSpecification

public class MapReduceSpecification implements ProgramSpecification {
    public String getName();
    public String getDescription();
    public String getClassName();
    public Map<String, String> getProperties();
    public Resources getMapperResources();
    public Resources getReducerResources();
    public Resources getDriverResources();
    public Set<String> getDatasets();
}

Complete specification of a MapReduce program.

Runtime Context

MapReduceContext

public interface MapReduceContext extends RuntimeContext, DatasetContext, ServiceDiscoverer {
    <T> T getHadoopJob();
    void addInput(Input input);
    void addOutput(Output output);
    
    Map<String, String> getRuntimeArguments();
    WorkflowToken getWorkflowToken();
    
    void setMapperResources(Resources resources);
    void setReducerResources(Resources resources);
    void setNumReducers(int numReducers);
}

Runtime context available to MapReduce programs providing access to Hadoop job configuration, input/output specification, and CDAP services.

MapReduceTaskContext

public interface MapReduceTaskContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
    extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>, DatasetContext {
    
    Metrics getMetrics();
    ServiceDiscoverer getServiceDiscoverer();
    PluginContext getPluginContext();
    
    WorkflowToken getWorkflowToken();
    
    String getNamespace();
    String getApplicationName();
    String getProgramName();
    String getRunId();
}

Task-level context available within mapper and reducer implementations.

Usage Examples

Basic MapReduce Program

public class WordCountMapReduce extends AbstractMapReduce {
    
    @Override
    public void configure(MapReduceConfigurer configurer) {
        configurer.setName("WordCount");
        configurer.setDescription("Counts words in text files");
        
        // Configure resources
        configurer.setMapperResources(new Resources(1024));  // 1GB for mappers
        configurer.setReducerResources(new Resources(2048)); // 2GB for reducers
        
        // Use datasets
        configurer.useDataset("textFiles");
        configurer.useDataset("wordCounts");
    }
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        // Configure input and output
        context.addInput(Input.ofDataset("textFiles"));
        context.addOutput(Output.ofDataset("wordCounts"));
    }
}

MapReduce with Dataset Access

public class CustomerDataProcessor extends AbstractMapReduce {
    
    @Override
    public void configure(MapReduceConfigurer configurer) {
        configurer.setName("CustomerProcessor");
        configurer.useDataset("customers");
        configurer.useDataset("processedCustomers");
    }
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        job.setMapperClass(CustomerMapper.class);
        job.setReducerClass(CustomerReducer.class);
        
        context.addInput(Input.ofDataset("customers"));
        context.addOutput(Output.ofDataset("processedCustomers"));
    }
    
    public static class CustomerMapper extends Mapper<byte[], Customer, Text, Customer> {
        private Metrics metrics;
        private KeyValueTable lookupTable;
        
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            MapReduceTaskContext<byte[], Customer, Text, Customer> cdapContext = 
                (MapReduceTaskContext<byte[], Customer, Text, Customer>) context;
            
            metrics = cdapContext.getMetrics();
            lookupTable = cdapContext.getDataset("lookupTable");
        }
        
        @Override
        protected void map(byte[] key, Customer customer, Context context) 
            throws IOException, InterruptedException {
            
            // Process customer data
            if (customer.isActive()) {
                metrics.count("active.customers", 1);
                
                // Lookup additional data
                byte[] additionalData = lookupTable.read(customer.getId().getBytes());
                if (additionalData != null) {
                    customer.setAdditionalInfo(new String(additionalData));
                }
                
                context.write(new Text(customer.getRegion()), customer);
            }
        }
    }
}

MapReduce with Plugin Usage

public class PluginBasedMapReduce extends AbstractMapReduce {
    
    @Override
    public void configure(MapReduceConfigurer configurer) {
        configurer.setName("PluginProcessor");
        configurer.usePlugin("transform", "customerTransform", "transform1", 
                           PluginProperties.builder()
                               .add("field", "customerName")
                               .add("operation", "uppercase")
                               .build());
    }
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        job.setMapperClass(PluginMapper.class);
        
        context.addInput(Input.ofDataset("rawData"));
        context.addOutput(Output.ofDataset("transformedData"));
    }
    
    public static class PluginMapper extends Mapper<byte[], Record, byte[], Record> {
        private CustomerTransform transformer;
        
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            MapReduceTaskContext<byte[], Record, byte[], Record> cdapContext = 
                (MapReduceTaskContext<byte[], Record, byte[], Record>) context;
            
            transformer = cdapContext.getPluginContext().newPluginInstance("transform1");
        }
        
        @Override
        protected void map(byte[] key, Record record, Context context) 
            throws IOException, InterruptedException {
            
            Record transformedRecord = transformer.transform(record);
            context.write(key, transformedRecord);
        }
    }
}

MapReduce with Workflow Integration

public class WorkflowMapReduce extends AbstractMapReduce {
    
    @Override
    public void configure(MapReduceConfigurer configurer) {
        configurer.setName("WorkflowProcessor");
        configurer.useDataset("workflowData");
    }
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        // Access workflow token to get data from previous workflow nodes
        WorkflowToken token = context.getWorkflowToken();
        String inputPath = token.get("inputPath").toString();
        int batchSize = Integer.parseInt(token.get("batchSize").toString());
        
        Job job = context.getHadoopJob();
        job.setMapperClass(WorkflowAwareMapper.class);
        job.getConfiguration().set("input.path", inputPath);
        job.getConfiguration().setInt("batch.size", batchSize);
        
        context.addInput(Input.ofDataset("workflowData"));
        context.addOutput(Output.ofDataset("processedData"));
        
        // Write results back to workflow token
        token.put("processed.records", "0"); // Will be updated by mapper
    }
}

Input/Output Configuration

Input Sources

public class Input {
    public static Input ofDataset(String datasetName);
    public static Input ofDataset(String datasetName, Map<String, String> arguments);
    public static Input ofDataset(String datasetName, DatasetStateSplitter splitter);
}

Output Destinations

public class Output {
    public static Output ofDataset(String datasetName);
    public static Output ofDataset(String datasetName, Map<String, String> arguments);
}

These classes provide fluent APIs for configuring MapReduce input sources and output destinations, supporting various CDAP datasets and external data sources.

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api

docs

annotations.md

application-framework.md

dataset-management.md

index.md

mapreduce-programs.md

plugin-framework.md

scheduling.md

service-programs.md

spark-programs.md

system-services.md

transactions.md

worker-programs.md

workflow-programs.md

tile.json