or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdscala-api.mdtype-system.md

mapreduce-functions.mddocs/

0

# MapReduce Functions

1

2

The MapReduce Functions capability enables direct integration of Hadoop Mapper and Reducer functions into Flink workflows, allowing reuse of existing MapReduce logic within Flink's DataSet API while maintaining compatibility with Hadoop's programming model.

3

4

## Overview

5

6

Flink's Hadoop compatibility layer provides wrapper classes that adapt Hadoop MapReduce functions to work as Flink operators. This enables gradual migration from MapReduce to Flink by allowing existing Mapper and Reducer implementations to run within Flink pipelines without modification.

7

8

## HadoopMapFunction

9

10

Wrapper that adapts a Hadoop Mapper to a Flink FlatMapFunction.

11

12

```java { .api }

13

@Public

14

public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

15

extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>

16

implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {

17

18

// Constructor with Mapper only (uses default JobConf)

19

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);

20

21

// Constructor with Mapper and custom JobConf

22

public HadoopMapFunction(

23

Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,

24

JobConf conf);

25

26

// Flink lifecycle method

27

public void open(Configuration parameters) throws Exception;

28

29

// Main processing method

30

public void flatMap(

31

final Tuple2<KEYIN, VALUEIN> value,

32

final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;

33

34

// Type information method

35

public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();

36

}

37

```

38

39

## HadoopReduceFunction

40

41

Wrapper that adapts a Hadoop Reducer to a non-combinable Flink GroupReduceFunction.

42

43

```java { .api }

44

@Public

45

public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

46

extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>

47

implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {

48

49

// Constructor with Reducer only (uses default JobConf)

50

public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);

51

52

// Constructor with Reducer and custom JobConf

53

public HadoopReduceFunction(

54

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,

55

JobConf conf);

56

57

// Flink lifecycle method

58

public void open(Configuration parameters) throws Exception;

59

60

// Main processing method

61

public void reduce(

62

final Iterable<Tuple2<KEYIN, VALUEIN>> values,

63

final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;

64

65

// Type information method

66

public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();

67

}

68

```

69

70

## HadoopReduceCombineFunction

71

72

Wrapper that adapts both Hadoop Reducer and Combiner to a combinable Flink GroupReduceFunction.

73

74

```java { .api }

75

@Public

76

public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

77

extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>

78

implements GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,

79

ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {

80

81

// Constructor with Reducer and Combiner (uses default JobConf)

82

public HadoopReduceCombineFunction(

83

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,

84

Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner);

85

86

// Constructor with Reducer, Combiner, and custom JobConf

87

public HadoopReduceCombineFunction(

88

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,

89

Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner,

90

JobConf conf);

91

92

// Flink lifecycle method

93

public void open(Configuration parameters) throws Exception;

94

95

// Main reduce processing method

96

public void reduce(

97

final Iterable<Tuple2<KEYIN, VALUEIN>> values,

98

final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;

99

100

// Combine processing method for optimization

101

public void combine(

102

final Iterable<Tuple2<KEYIN, VALUEIN>> values,

103

final Collector<Tuple2<KEYIN, VALUEIN>> out) throws Exception;

104

105

// Type information method

106

public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();

107

}

108

```

109

110

## Usage Examples

111

112

### Basic WordCount with Hadoop MapReduce Functions

113

114

```java

115

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;

116

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;

117

import org.apache.hadoop.mapred.Mapper;

118

import org.apache.hadoop.mapred.Reducer;

119

import org.apache.hadoop.io.Text;

120

import org.apache.hadoop.io.IntWritable;

121

import org.apache.hadoop.io.LongWritable;

122

123

// Hadoop Mapper implementation

124

public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

125

private final static IntWritable one = new IntWritable(1);

126

private Text word = new Text();

127

128

@Override

129

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,

130

Reporter reporter) throws IOException {

131

StringTokenizer tokenizer = new StringTokenizer(value.toString());

132

while (tokenizer.hasMoreTokens()) {

133

word.set(tokenizer.nextToken().toLowerCase());

134

output.collect(word, one);

135

}

136

}

137

}

138

139

// Hadoop Reducer implementation

140

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

141

private IntWritable result = new IntWritable();

142

143

@Override

144

public void reduce(Text key, Iterator<IntWritable> values,

145

OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

146

int sum = 0;

147

while (values.hasNext()) {

148

sum += values.next().get();

149

}

150

result.set(sum);

151

output.collect(key, result);

152

}

153

}

154

155

// Use in Flink pipeline

156

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

157

158

// Read input data

159

DataSet<Tuple2<LongWritable, Text>> input = env.createInput(

160

HadoopInputs.readHadoopFile(/* input format configuration */)

161

);

162

163

// Apply Hadoop mapper

164

DataSet<Tuple2<Text, IntWritable>> mappedData = input

165

.flatMap(new HadoopMapFunction<>(new TokenizerMapper()));

166

167

// Group by key and apply Hadoop reducer

168

DataSet<Tuple2<Text, IntWritable>> result = mappedData

169

.groupBy(0)

170

.reduceGroup(new HadoopReduceFunction<>(new IntSumReducer()));

171

```

172

173

### Using Combiner for Optimization

174

175

```java

176

// Combiner that performs partial aggregation

177

public static class IntSumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

178

private IntWritable result = new IntWritable();

179

180

@Override

181

public void reduce(Text key, Iterator<IntWritable> values,

182

OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

183

int sum = 0;

184

while (values.hasNext()) {

185

sum += values.next().get();

186

}

187

result.set(sum);

188

output.collect(key, result);

189

}

190

}

191

192

// Use HadoopReduceCombineFunction for better performance

193

DataSet<Tuple2<Text, IntWritable>> optimizedResult = mappedData

194

.groupBy(0)

195

.reduceGroup(new HadoopReduceCombineFunction<>(

196

new IntSumReducer(), // Final reducer

197

new IntSumCombiner() // Combiner for pre-aggregation

198

));

199

```

200

201

### Custom Configuration

202

203

```java

204

import org.apache.hadoop.mapred.JobConf;

205

206

// Configure Hadoop MapReduce job settings

207

JobConf jobConf = new JobConf();

208

jobConf.set("mapreduce.job.name", "Flink-Hadoop Integration");

209

jobConf.set("custom.parameter", "custom-value");

210

jobConf.setInt("custom.int.parameter", 42);

211

212

// Use custom configuration with MapReduce functions

213

HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapperWithConfig =

214

new HadoopMapFunction<>(new TokenizerMapper(), jobConf);

215

216

HadoopReduceFunction<Text, IntWritable, Text, IntWritable> reducerWithConfig =

217

new HadoopReduceFunction<>(new IntSumReducer(), jobConf);

218

219

// Apply in pipeline

220

DataSet<Tuple2<Text, IntWritable>> result = input

221

.flatMap(mapperWithConfig)

222

.groupBy(0)

223

.reduceGroup(reducerWithConfig);

224

```

225

226

### Complex Data Processing

227

228

```java

229

// Example with custom Writable types

230

public static class DataRecord implements Writable {

231

private String category;

232

private double value;

233

private long timestamp;

234

235

// Writable implementation...

236

}

237

238

public static class CategoryKey implements Writable {

239

private String category;

240

private int hour;

241

242

// Writable implementation...

243

}

244

245

// Mapper that processes complex records

246

public static class DataProcessor extends Mapper<LongWritable, DataRecord, CategoryKey, DataRecord> {

247

private CategoryKey outputKey = new CategoryKey();

248

249

@Override

250

public void map(LongWritable key, DataRecord value,

251

OutputCollector<CategoryKey, DataRecord> output, Reporter reporter) throws IOException {

252

// Extract hour from timestamp

253

int hour = (int) (value.getTimestamp() / 3600000) % 24;

254

255

outputKey.setCategory(value.getCategory());

256

outputKey.setHour(hour);

257

258

output.collect(outputKey, value);

259

}

260

}

261

262

// Reducer that aggregates by category and hour

263

public static class CategoryAggregator extends Reducer<CategoryKey, DataRecord, CategoryKey, DataRecord> {

264

private DataRecord result = new DataRecord();

265

266

@Override

267

public void reduce(CategoryKey key, Iterator<DataRecord> values,

268

OutputCollector<CategoryKey, DataRecord> output, Reporter reporter) throws IOException {

269

double sum = 0.0;

270

int count = 0;

271

272

while (values.hasNext()) {

273

sum += values.next().getValue();

274

count++;

275

}

276

277

result.setCategory(key.getCategory());

278

result.setValue(sum / count); // Average

279

result.setTimestamp(System.currentTimeMillis());

280

281

output.collect(key, result);

282

}

283

}

284

285

// Use in Flink pipeline

286

DataSet<Tuple2<CategoryKey, DataRecord>> aggregatedData = rawData

287

.flatMap(new HadoopMapFunction<>(new DataProcessor()))

288

.groupBy(0)

289

.reduceGroup(new HadoopReduceFunction<>(new CategoryAggregator()));

290

```

291

292

## Performance Considerations

293

294

### Object Reuse

295

296

```java

297

// Enable object reuse for better performance with Hadoop functions

298

env.getConfig().enableObjectReuse();

299

300

// This is particularly beneficial when using Hadoop functions as they

301

// typically create many temporary objects

302

```

303

304

### Combiner Usage

305

306

```java

307

// Always use combiners when possible for commutative and associative operations

308

// This reduces network traffic and improves performance

309

310

// Good candidates for combiners:

311

// - Sum operations

312

// - Count operations

313

// - Min/Max operations

314

// - Set union operations

315

316

// Bad candidates for combiners:

317

// - Operations that need to see all values

318

// - Non-associative operations

319

// - Operations with side effects

320

```

321

322

### Configuration Tuning

323

324

```java

325

JobConf conf = new JobConf();

326

327

// Configure memory settings

328

conf.setInt("mapreduce.map.memory.mb", 1024);

329

conf.setInt("mapreduce.reduce.memory.mb", 2048);

330

331

// Configure JVM options

332

conf.set("mapreduce.map.java.opts", "-Xmx800m");

333

conf.set("mapreduce.reduce.java.opts", "-Xmx1600m");

334

335

// Configure buffer sizes

336

conf.setInt("io.sort.mb", 256);

337

conf.setFloat("io.sort.spill.percent", 0.8f);

338

```

339

340

## Error Handling

341

342

```java

343

try {

344

DataSet<Tuple2<Text, IntWritable>> result = input

345

.flatMap(new HadoopMapFunction<>(new TokenizerMapper()))

346

.groupBy(0)

347

.reduceGroup(new HadoopReduceFunction<>(new IntSumReducer()));

348

349

result.print();

350

env.execute();

351

352

} catch (Exception e) {

353

// Handle various exceptions

354

if (e.getCause() instanceof IOException) {

355

logger.error("I/O error in Hadoop function: " + e.getMessage());

356

} else if (e.getCause() instanceof InterruptedException) {

357

logger.error("Hadoop function was interrupted: " + e.getMessage());

358

} else {

359

logger.error("Unexpected error: " + e.getMessage());

360

}

361

}

362

```

363

364

## Migration Best Practices

365

366

### Gradual Migration Strategy

367

368

1. **Start with Input/Output**: Use Hadoop InputFormats and OutputFormats with native Flink operations

369

2. **Migrate Logic Gradually**: Replace Hadoop functions one by one with native Flink operations

370

3. **Optimize Performance**: Use Flink-native operations for better performance where possible

371

4. **Maintain Compatibility**: Keep Hadoop functions for complex logic that's hard to rewrite

372

373

### Testing Hadoop Functions in Flink

374

375

```java

376

// Create test data

377

List<Tuple2<LongWritable, Text>> testInput = Arrays.asList(

378

new Tuple2<>(new LongWritable(1), new Text("hello world")),

379

new Tuple2<>(new LongWritable(2), new Text("hello flink"))

380

);

381

382

DataSet<Tuple2<LongWritable, Text>> input = env.fromCollection(testInput);

383

384

// Test mapper output

385

DataSet<Tuple2<Text, IntWritable>> mapperOutput = input

386

.flatMap(new HadoopMapFunction<>(new TokenizerMapper()));

387

388

// Collect and verify results in tests

389

List<Tuple2<Text, IntWritable>> results = mapperOutput.collect();

390

assertEquals(3, results.size()); // "hello", "world", "hello", "flink"

391

```

392

393

### Common Migration Patterns

394

395

```java

396

// Replace Hadoop Identity operations with Flink map

397

// Before: Hadoop IdentityMapper

398

DataSet<Tuple2<K, V>> output = input.map(tuple -> tuple);

399

400

// Replace simple aggregations with Flink reduce

401

// Before: Hadoop sum reducer

402

DataSet<Tuple2<Text, IntWritable>> sums = input

403

.groupBy(0)

404

.reduce((a, b) -> new Tuple2<>(a.f0, new IntWritable(a.f1.get() + b.f1.get())));

405

406

// Replace filtering with Flink filter

407

// Before: Hadoop filtering mapper

408

DataSet<Tuple2<K, V>> filtered = input.filter(tuple -> someCondition(tuple.f1));

409

```