0
# MapReduce Programs
1
2
MapReduce programs in CDAP provide distributed batch processing capabilities built on Apache Hadoop MapReduce, with integrated access to CDAP datasets, metrics, and services.
3
4
## Core MapReduce Interfaces
5
6
### MapReduce Program Interface
7
8
```java { .api }
9
public interface MapReduce extends ProgramLifecycle<MapReduceContext> {
10
void configure(MapReduceConfigurer configurer);
11
}
12
```
13
14
Base interface for MapReduce programs. Implementations must provide configuration logic and can optionally implement lifecycle methods.
15
16
### AbstractMapReduce
17
18
```java { .api }
19
public abstract class AbstractMapReduce implements MapReduce {
20
public abstract void configure(MapReduceConfigurer configurer);
21
22
@Override
23
public void initialize(MapReduceContext context) throws Exception {
24
// Optional initialization logic
25
}
26
27
@Override
28
public void destroy() {
29
// Optional cleanup logic
30
}
31
}
32
```
33
34
Base implementation class for MapReduce programs providing default lifecycle behavior.
35
36
## Configuration
37
38
### MapReduceConfigurer
39
40
```java { .api }
41
public interface MapReduceConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
42
void setMapperResources(Resources resources);
43
void setReducerResources(Resources resources);
44
void setDriverResources(Resources resources);
45
}
46
```
47
48
Interface for configuring MapReduce programs including resource allocation and dataset usage.
49
50
### MapReduceSpecification
51
52
```java { .api }
53
public class MapReduceSpecification implements ProgramSpecification {
54
public String getName();
55
public String getDescription();
56
public String getClassName();
57
public Map<String, String> getProperties();
58
public Resources getMapperResources();
59
public Resources getReducerResources();
60
public Resources getDriverResources();
61
public Set<String> getDatasets();
62
}
63
```
64
65
Complete specification of a MapReduce program.
66
67
## Runtime Context
68
69
### MapReduceContext
70
71
```java { .api }
72
public interface MapReduceContext extends RuntimeContext, DatasetContext, ServiceDiscoverer {
73
<T> T getHadoopJob();
74
void addInput(Input input);
75
void addOutput(Output output);
76
77
Map<String, String> getRuntimeArguments();
78
WorkflowToken getWorkflowToken();
79
80
void setMapperResources(Resources resources);
81
void setReducerResources(Resources resources);
82
void setNumReducers(int numReducers);
83
}
84
```
85
86
Runtime context available to MapReduce programs providing access to Hadoop job configuration, input/output specification, and CDAP services.
87
88
### MapReduceTaskContext
89
90
```java { .api }
91
public interface MapReduceTaskContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
92
extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>, DatasetContext {
93
94
Metrics getMetrics();
95
ServiceDiscoverer getServiceDiscoverer();
96
PluginContext getPluginContext();
97
98
WorkflowToken getWorkflowToken();
99
100
String getNamespace();
101
String getApplicationName();
102
String getProgramName();
103
String getRunId();
104
}
105
```
106
107
Task-level context available within mapper and reducer implementations.
108
109
## Usage Examples
110
111
### Basic MapReduce Program
112
113
```java
114
public class WordCountMapReduce extends AbstractMapReduce {
115
116
@Override
117
public void configure(MapReduceConfigurer configurer) {
118
configurer.setName("WordCount");
119
configurer.setDescription("Counts words in text files");
120
121
// Configure resources
122
configurer.setMapperResources(new Resources(1024)); // 1GB for mappers
123
configurer.setReducerResources(new Resources(2048)); // 2GB for reducers
124
125
// Use datasets
126
configurer.useDataset("textFiles");
127
configurer.useDataset("wordCounts");
128
}
129
130
@Override
131
public void initialize(MapReduceContext context) throws Exception {
132
Job job = context.getHadoopJob();
133
job.setMapperClass(WordCountMapper.class);
134
job.setReducerClass(WordCountReducer.class);
135
job.setMapOutputKeyClass(Text.class);
136
job.setMapOutputValueClass(IntWritable.class);
137
138
// Configure input and output
139
context.addInput(Input.ofDataset("textFiles"));
140
context.addOutput(Output.ofDataset("wordCounts"));
141
}
142
}
143
```
144
145
### MapReduce with Dataset Access
146
147
```java
148
public class CustomerDataProcessor extends AbstractMapReduce {
149
150
@Override
151
public void configure(MapReduceConfigurer configurer) {
152
configurer.setName("CustomerProcessor");
153
configurer.useDataset("customers");
154
configurer.useDataset("processedCustomers");
155
}
156
157
@Override
158
public void initialize(MapReduceContext context) throws Exception {
159
Job job = context.getHadoopJob();
160
job.setMapperClass(CustomerMapper.class);
161
job.setReducerClass(CustomerReducer.class);
162
163
context.addInput(Input.ofDataset("customers"));
164
context.addOutput(Output.ofDataset("processedCustomers"));
165
}
166
167
public static class CustomerMapper extends Mapper<byte[], Customer, Text, Customer> {
168
private Metrics metrics;
169
private KeyValueTable lookupTable;
170
171
@Override
172
protected void setup(Context context) throws IOException, InterruptedException {
173
MapReduceTaskContext<byte[], Customer, Text, Customer> cdapContext =
174
(MapReduceTaskContext<byte[], Customer, Text, Customer>) context;
175
176
metrics = cdapContext.getMetrics();
177
lookupTable = cdapContext.getDataset("lookupTable");
178
}
179
180
@Override
181
protected void map(byte[] key, Customer customer, Context context)
182
throws IOException, InterruptedException {
183
184
// Process customer data
185
if (customer.isActive()) {
186
metrics.count("active.customers", 1);
187
188
// Lookup additional data
189
byte[] additionalData = lookupTable.read(customer.getId().getBytes());
190
if (additionalData != null) {
191
customer.setAdditionalInfo(new String(additionalData));
192
}
193
194
context.write(new Text(customer.getRegion()), customer);
195
}
196
}
197
}
198
}
199
```
200
201
### MapReduce with Plugin Usage
202
203
```java
204
public class PluginBasedMapReduce extends AbstractMapReduce {
205
206
@Override
207
public void configure(MapReduceConfigurer configurer) {
208
configurer.setName("PluginProcessor");
209
configurer.usePlugin("transform", "customerTransform", "transform1",
210
PluginProperties.builder()
211
.add("field", "customerName")
212
.add("operation", "uppercase")
213
.build());
214
}
215
216
@Override
217
public void initialize(MapReduceContext context) throws Exception {
218
Job job = context.getHadoopJob();
219
job.setMapperClass(PluginMapper.class);
220
221
context.addInput(Input.ofDataset("rawData"));
222
context.addOutput(Output.ofDataset("transformedData"));
223
}
224
225
public static class PluginMapper extends Mapper<byte[], Record, byte[], Record> {
226
private CustomerTransform transformer;
227
228
@Override
229
protected void setup(Context context) throws IOException, InterruptedException {
230
MapReduceTaskContext<byte[], Record, byte[], Record> cdapContext =
231
(MapReduceTaskContext<byte[], Record, byte[], Record>) context;
232
233
transformer = cdapContext.getPluginContext().newPluginInstance("transform1");
234
}
235
236
@Override
237
protected void map(byte[] key, Record record, Context context)
238
throws IOException, InterruptedException {
239
240
Record transformedRecord = transformer.transform(record);
241
context.write(key, transformedRecord);
242
}
243
}
244
}
245
```
246
247
### MapReduce with Workflow Integration
248
249
```java
250
public class WorkflowMapReduce extends AbstractMapReduce {
251
252
@Override
253
public void configure(MapReduceConfigurer configurer) {
254
configurer.setName("WorkflowProcessor");
255
configurer.useDataset("workflowData");
256
}
257
258
@Override
259
public void initialize(MapReduceContext context) throws Exception {
260
// Access workflow token to get data from previous workflow nodes
261
WorkflowToken token = context.getWorkflowToken();
262
String inputPath = token.get("inputPath").toString();
263
int batchSize = Integer.parseInt(token.get("batchSize").toString());
264
265
Job job = context.getHadoopJob();
266
job.setMapperClass(WorkflowAwareMapper.class);
267
job.getConfiguration().set("input.path", inputPath);
268
job.getConfiguration().setInt("batch.size", batchSize);
269
270
context.addInput(Input.ofDataset("workflowData"));
271
context.addOutput(Output.ofDataset("processedData"));
272
273
// Write results back to workflow token
274
token.put("processed.records", "0"); // Will be updated by mapper
275
}
276
}
277
```
278
279
## Input/Output Configuration
280
281
### Input Sources
282
283
```java { .api }
284
public class Input {
285
public static Input ofDataset(String datasetName);
286
public static Input ofDataset(String datasetName, Map<String, String> arguments);
287
public static Input ofDataset(String datasetName, DatasetStateSplitter splitter);
288
}
289
```
290
291
### Output Destinations
292
293
```java { .api }
294
public class Output {
295
public static Output ofDataset(String datasetName);
296
public static Output ofDataset(String datasetName, Map<String, String> arguments);
297
}
298
```
299
300
These classes provide fluent APIs for configuring MapReduce input sources and output destinations, supporting various CDAP datasets and external data sources.