or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotations.mdapplication-framework.mddataset-management.mdindex.mdmapreduce-programs.mdplugin-framework.mdscheduling.mdservice-programs.mdspark-programs.mdsystem-services.mdtransactions.mdworker-programs.mdworkflow-programs.md

mapreduce-programs.mddocs/

0

# MapReduce Programs

1

2

MapReduce programs in CDAP provide distributed batch processing capabilities built on Apache Hadoop MapReduce, with integrated access to CDAP datasets, metrics, and services.

3

4

## Core MapReduce Interfaces

5

6

### MapReduce Program Interface

7

8

```java { .api }

9

public interface MapReduce extends ProgramLifecycle<MapReduceContext> {

10

void configure(MapReduceConfigurer configurer);

11

}

12

```

13

14

Base interface for MapReduce programs. Implementations must provide configuration logic and can optionally implement lifecycle methods.

15

16

### AbstractMapReduce

17

18

```java { .api }

19

public abstract class AbstractMapReduce implements MapReduce {

20

public abstract void configure(MapReduceConfigurer configurer);

21

22

@Override

23

public void initialize(MapReduceContext context) throws Exception {

24

// Optional initialization logic

25

}

26

27

@Override

28

public void destroy() {

29

// Optional cleanup logic

30

}

31

}

32

```

33

34

Base implementation class for MapReduce programs providing default lifecycle behavior.

35

36

## Configuration

37

38

### MapReduceConfigurer

39

40

```java { .api }

41

public interface MapReduceConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {

42

void setMapperResources(Resources resources);

43

void setReducerResources(Resources resources);

44

void setDriverResources(Resources resources);

45

}

46

```

47

48

Interface for configuring MapReduce programs including resource allocation and dataset usage.

49

50

### MapReduceSpecification

51

52

```java { .api }

53

public class MapReduceSpecification implements ProgramSpecification {

54

public String getName();

55

public String getDescription();

56

public String getClassName();

57

public Map<String, String> getProperties();

58

public Resources getMapperResources();

59

public Resources getReducerResources();

60

public Resources getDriverResources();

61

public Set<String> getDatasets();

62

}

63

```

64

65

Complete specification of a MapReduce program.

66

67

## Runtime Context

68

69

### MapReduceContext

70

71

```java { .api }

72

public interface MapReduceContext extends RuntimeContext, DatasetContext, ServiceDiscoverer {

73

<T> T getHadoopJob();

74

void addInput(Input input);

75

void addOutput(Output output);

76

77

Map<String, String> getRuntimeArguments();

78

WorkflowToken getWorkflowToken();

79

80

void setMapperResources(Resources resources);

81

void setReducerResources(Resources resources);

82

void setNumReducers(int numReducers);

83

}

84

```

85

86

Runtime context available to MapReduce programs providing access to Hadoop job configuration, input/output specification, and CDAP services.

87

88

### MapReduceTaskContext

89

90

```java { .api }

91

public interface MapReduceTaskContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

92

extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>, DatasetContext {

93

94

Metrics getMetrics();

95

ServiceDiscoverer getServiceDiscoverer();

96

PluginContext getPluginContext();

97

98

WorkflowToken getWorkflowToken();

99

100

String getNamespace();

101

String getApplicationName();

102

String getProgramName();

103

String getRunId();

104

}

105

```

106

107

Task-level context available within mapper and reducer implementations.

108

109

## Usage Examples

110

111

### Basic MapReduce Program

112

113

```java

114

public class WordCountMapReduce extends AbstractMapReduce {

115

116

@Override

117

public void configure(MapReduceConfigurer configurer) {

118

configurer.setName("WordCount");

119

configurer.setDescription("Counts words in text files");

120

121

// Configure resources

122

configurer.setMapperResources(new Resources(1024)); // 1GB for mappers

123

configurer.setReducerResources(new Resources(2048)); // 2GB for reducers

124

125

// Use datasets

126

configurer.useDataset("textFiles");

127

configurer.useDataset("wordCounts");

128

}

129

130

@Override

131

public void initialize(MapReduceContext context) throws Exception {

132

Job job = context.getHadoopJob();

133

job.setMapperClass(WordCountMapper.class);

134

job.setReducerClass(WordCountReducer.class);

135

job.setMapOutputKeyClass(Text.class);

136

job.setMapOutputValueClass(IntWritable.class);

137

138

// Configure input and output

139

context.addInput(Input.ofDataset("textFiles"));

140

context.addOutput(Output.ofDataset("wordCounts"));

141

}

142

}

143

```

144

145

### MapReduce with Dataset Access

146

147

```java

148

public class CustomerDataProcessor extends AbstractMapReduce {

149

150

@Override

151

public void configure(MapReduceConfigurer configurer) {

152

configurer.setName("CustomerProcessor");

153

configurer.useDataset("customers");

154

configurer.useDataset("processedCustomers");

155

}

156

157

@Override

158

public void initialize(MapReduceContext context) throws Exception {

159

Job job = context.getHadoopJob();

160

job.setMapperClass(CustomerMapper.class);

161

job.setReducerClass(CustomerReducer.class);

162

163

context.addInput(Input.ofDataset("customers"));

164

context.addOutput(Output.ofDataset("processedCustomers"));

165

}

166

167

public static class CustomerMapper extends Mapper<byte[], Customer, Text, Customer> {

168

private Metrics metrics;

169

private KeyValueTable lookupTable;

170

171

@Override

172

protected void setup(Context context) throws IOException, InterruptedException {

173

MapReduceTaskContext<byte[], Customer, Text, Customer> cdapContext =

174

(MapReduceTaskContext<byte[], Customer, Text, Customer>) context;

175

176

metrics = cdapContext.getMetrics();

177

lookupTable = cdapContext.getDataset("lookupTable");

178

}

179

180

@Override

181

protected void map(byte[] key, Customer customer, Context context)

182

throws IOException, InterruptedException {

183

184

// Process customer data

185

if (customer.isActive()) {

186

metrics.count("active.customers", 1);

187

188

// Lookup additional data

189

byte[] additionalData = lookupTable.read(customer.getId().getBytes());

190

if (additionalData != null) {

191

customer.setAdditionalInfo(new String(additionalData));

192

}

193

194

context.write(new Text(customer.getRegion()), customer);

195

}

196

}

197

}

198

}

199

```

200

201

### MapReduce with Plugin Usage

202

203

```java

204

public class PluginBasedMapReduce extends AbstractMapReduce {

205

206

@Override

207

public void configure(MapReduceConfigurer configurer) {

208

configurer.setName("PluginProcessor");

209

configurer.usePlugin("transform", "customerTransform", "transform1",

210

PluginProperties.builder()

211

.add("field", "customerName")

212

.add("operation", "uppercase")

213

.build());

214

}

215

216

@Override

217

public void initialize(MapReduceContext context) throws Exception {

218

Job job = context.getHadoopJob();

219

job.setMapperClass(PluginMapper.class);

220

221

context.addInput(Input.ofDataset("rawData"));

222

context.addOutput(Output.ofDataset("transformedData"));

223

}

224

225

public static class PluginMapper extends Mapper<byte[], Record, byte[], Record> {

226

private CustomerTransform transformer;

227

228

@Override

229

protected void setup(Context context) throws IOException, InterruptedException {

230

MapReduceTaskContext<byte[], Record, byte[], Record> cdapContext =

231

(MapReduceTaskContext<byte[], Record, byte[], Record>) context;

232

233

transformer = cdapContext.getPluginContext().newPluginInstance("transform1");

234

}

235

236

@Override

237

protected void map(byte[] key, Record record, Context context)

238

throws IOException, InterruptedException {

239

240

Record transformedRecord = transformer.transform(record);

241

context.write(key, transformedRecord);

242

}

243

}

244

}

245

```

246

247

### MapReduce with Workflow Integration

248

249

```java

250

public class WorkflowMapReduce extends AbstractMapReduce {

251

252

@Override

253

public void configure(MapReduceConfigurer configurer) {

254

configurer.setName("WorkflowProcessor");

255

configurer.useDataset("workflowData");

256

}

257

258

@Override

259

public void initialize(MapReduceContext context) throws Exception {

260

// Access workflow token to get data from previous workflow nodes

261

WorkflowToken token = context.getWorkflowToken();

262

String inputPath = token.get("inputPath").toString();

263

int batchSize = Integer.parseInt(token.get("batchSize").toString());

264

265

Job job = context.getHadoopJob();

266

job.setMapperClass(WorkflowAwareMapper.class);

267

job.getConfiguration().set("input.path", inputPath);

268

job.getConfiguration().setInt("batch.size", batchSize);

269

270

context.addInput(Input.ofDataset("workflowData"));

271

context.addOutput(Output.ofDataset("processedData"));

272

273

// Write results back to workflow token

274

token.put("processed.records", "0"); // Will be updated by mapper

275

}

276

}

277

```

278

279

## Input/Output Configuration

280

281

### Input Sources

282

283

```java { .api }

284

public class Input {

285

public static Input ofDataset(String datasetName);

286

public static Input ofDataset(String datasetName, Map<String, String> arguments);

287

public static Input ofDataset(String datasetName, DatasetStateSplitter splitter);

288

}

289

```

290

291

### Output Destinations

292

293

```java { .api }

294

public class Output {

295

public static Output ofDataset(String datasetName);

296

public static Output ofDataset(String datasetName, Map<String, String> arguments);

297

}

298

```

299

300

These classes provide fluent APIs for configuring MapReduce input sources and output destinations, supporting various CDAP datasets and external data sources.