or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

function-wrappers.mdindex.mdinput-output-formats.mdtype-system.mdutility-classes.md

function-wrappers.mddocs/

0

# Function Wrappers

1

2

Wrapper classes that adapt Hadoop Mapper and Reducer implementations to work as Flink functions, enabling reuse of existing MapReduce logic within Flink applications. These wrappers bridge the gap between Hadoop's MapReduce programming model and Flink's functional API.

3

4

## Capabilities

5

6

### HadoopMapFunction

7

8

Wrapper that maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction, allowing existing Hadoop Mapper implementations to be used directly in Flink transformations.

9

10

```java { .api }

11

/**

12

* Wrapper that maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction

13

* @param <KEYIN> Input key type

14

* @param <VALUEIN> Input value type

15

* @param <KEYOUT> Output key type

16

* @param <VALUEOUT> Output value type

17

*/

18

public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

19

implements FlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,

20

ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {

21

22

/**

23

* Creates a HadoopMapFunction wrapper with default JobConf

24

* @param hadoopMapper The Hadoop Mapper to wrap

25

*/

26

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);

27

28

/**

29

* Creates a HadoopMapFunction wrapper with custom JobConf

30

* @param hadoopMapper The Hadoop Mapper to wrap

31

* @param conf JobConf for Hadoop configuration

32

*/

33

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);

34

35

/**

36

* Opens and configures the mapper for processing

37

* @param openContext Flink's open context for initialization

38

* @throws Exception if mapper configuration fails

39

*/

40

public void open(OpenContext openContext) throws Exception;

41

42

/**

43

* Processes input records using the wrapped Hadoop Mapper

44

* @param value Input tuple containing key-value pair

45

* @param out Collector for output tuples

46

* @throws Exception if mapping operation fails

47

*/

48

public void flatMap(

49

Tuple2<KEYIN, VALUEIN> value,

50

Collector<Tuple2<KEYOUT, VALUEOUT>> out

51

) throws Exception;

52

53

/**

54

* Returns output type information for type safety

55

* @return TypeInformation for output tuples

56

*/

57

public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();

58

}

59

```

60

61

**Usage Examples:**

62

63

```java

64

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;

65

import org.apache.flink.api.java.ExecutionEnvironment;

66

import org.apache.flink.api.java.DataSet;

67

import org.apache.hadoop.io.LongWritable;

68

import org.apache.hadoop.io.Text;

69

import org.apache.hadoop.mapred.Mapper;

70

import org.apache.hadoop.mapred.OutputCollector;

71

import org.apache.hadoop.mapred.Reporter;

72

import org.apache.hadoop.mapred.JobConf;

73

74

// Custom Hadoop Mapper

75

public class WordTokenizer implements Mapper<LongWritable, Text, Text, IntWritable> {

76

private final static IntWritable one = new IntWritable(1);

77

private Text word = new Text();

78

79

@Override

80

public void map(LongWritable key, Text value,

81

OutputCollector<Text, IntWritable> output, Reporter reporter) {

82

String[] words = value.toString().toLowerCase().split("\\s+");

83

for (String w : words) {

84

if (!w.isEmpty()) {

85

word.set(w);

86

output.collect(word, one);

87

}

88

}

89

}

90

91

@Override

92

public void configure(JobConf job) {}

93

94

@Override

95

public void close() {}

96

}

97

98

// Use in Flink application

99

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

100

101

DataSet<Tuple2<LongWritable, Text>> lines = // ... input data

102

103

// Apply Hadoop Mapper as Flink function

104

DataSet<Tuple2<Text, IntWritable>> words = lines.flatMap(

105

new HadoopMapFunction<>(new WordTokenizer())

106

);

107

108

// Continue with Flink operations

109

DataSet<Tuple2<Text, IntWritable>> wordCounts = words

110

.groupBy(0)

111

.sum(1);

112

```

113

114

### HadoopReduceFunction

115

116

Wrapper that maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction, enabling existing Hadoop Reducer logic to work with Flink's grouped operations.

117

118

```java { .api }

119

/**

120

* Wrapper that maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction

121

* @param <KEYIN> Input key type

122

* @param <VALUEIN> Input value type

123

* @param <KEYOUT> Output key type

124

* @param <VALUEOUT> Output value type

125

*/

126

public class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

127

implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,

128

ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {

129

130

/**

131

* Creates a HadoopReduceFunction wrapper with default JobConf

132

* @param hadoopReducer The Hadoop Reducer to wrap

133

*/

134

public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);

135

136

/**

137

* Creates a HadoopReduceFunction wrapper with custom JobConf

138

* @param hadoopReducer The Hadoop Reducer to wrap

139

* @param conf JobConf for Hadoop configuration

140

*/

141

public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);

142

143

/**

144

* Opens and configures the reducer for processing

145

* @param openContext Flink's open context for initialization

146

* @throws Exception if reducer configuration fails

147

*/

148

public void open(OpenContext openContext) throws Exception;

149

150

/**

151

* Reduces input records using the wrapped Hadoop Reducer

152

* @param values Iterable of input tuples with the same key

153

* @param out Collector for output tuples

154

* @throws Exception if reduce operation fails

155

*/

156

public void reduce(

157

Iterable<Tuple2<KEYIN, VALUEIN>> values,

158

Collector<Tuple2<KEYOUT, VALUEOUT>> out

159

) throws Exception;

160

161

/**

162

* Returns output type information for type safety

163

* @return TypeInformation for output tuples

164

*/

165

public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();

166

}

167

```

168

169

**Usage Examples:**

170

171

```java

172

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;

173

import org.apache.hadoop.mapred.Reducer;

174

import org.apache.hadoop.mapred.OutputCollector;

175

import org.apache.hadoop.mapred.Reporter;

176

import java.util.Iterator;

177

178

// Custom Hadoop Reducer

179

public class WordCountReducer implements Reducer<Text, IntWritable, Text, IntWritable> {

180

private IntWritable result = new IntWritable();

181

182

@Override

183

public void reduce(Text key, Iterator<IntWritable> values,

184

OutputCollector<Text, IntWritable> output, Reporter reporter) {

185

int sum = 0;

186

while (values.hasNext()) {

187

sum += values.next().get();

188

}

189

result.set(sum);

190

output.collect(key, result);

191

}

192

193

@Override

194

public void configure(JobConf job) {}

195

196

@Override

197

public void close() {}

198

}

199

200

// Use in Flink application

201

DataSet<Tuple2<Text, IntWritable>> words = // ... word data grouped by key

202

203

DataSet<Tuple2<Text, IntWritable>> wordCounts = words

204

.groupBy(0)

205

.reduceGroup(new HadoopReduceFunction<>(new WordCountReducer()));

206

```

207

208

### HadoopReduceCombineFunction

209

210

Wrapper that maps both a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction, providing optimal performance through pre-aggregation.

211

212

```java { .api }

213

/**

214

* Wrapper that maps Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction

215

* @param <KEYIN> Input key type

216

* @param <VALUEIN> Input value type

217

* @param <KEYOUT> Output key type

218

* @param <VALUEOUT> Output value type

219

*/

220

public class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

221

implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,

222

GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,

223

ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {

224

225

/**

226

* Creates a HadoopReduceCombineFunction wrapper with default JobConf

227

* @param hadoopReducer The Hadoop Reducer to wrap

228

* @param hadoopCombiner The Hadoop Combiner to wrap

229

*/

230

public HadoopReduceCombineFunction(

231

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,

232

Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner

233

);

234

235

/**

236

* Creates a HadoopReduceCombineFunction wrapper with custom JobConf

237

* @param hadoopReducer The Hadoop Reducer to wrap

238

* @param hadoopCombiner The Hadoop Combiner to wrap

239

* @param conf JobConf for Hadoop configuration

240

*/

241

public HadoopReduceCombineFunction(

242

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,

243

Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner,

244

JobConf conf

245

);

246

247

/**

248

* Opens and configures reducer and combiner for processing

249

* @param openContext Flink's open context for initialization

250

* @throws Exception if configuration fails

251

*/

252

public void open(OpenContext openContext) throws Exception;

253

254

/**

255

* Reduces input records using the wrapped Hadoop Reducer

256

* @param values Iterable of input tuples with the same key

257

* @param out Collector for output tuples

258

* @throws Exception if reduce operation fails

259

*/

260

public void reduce(

261

Iterable<Tuple2<KEYIN, VALUEIN>> values,

262

Collector<Tuple2<KEYOUT, VALUEOUT>> out

263

) throws Exception;

264

265

/**

266

* Combines input records using the wrapped Hadoop Combiner for pre-aggregation

267

* @param values Iterable of input tuples to combine

268

* @param out Collector for combined tuples

269

* @throws Exception if combine operation fails

270

*/

271

public void combine(

272

Iterable<Tuple2<KEYIN, VALUEIN>> values,

273

Collector<Tuple2<KEYIN, VALUEIN>> out

274

) throws Exception;

275

276

/**

277

* Returns output type information for type safety

278

* @return TypeInformation for output tuples

279

*/

280

public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();

281

}

282

```

283

284

**Usage Examples:**

285

286

```java

287

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;

288

289

// Hadoop Combiner (same as reducer in this case)

290

public class WordCountCombiner implements Reducer<Text, IntWritable, Text, IntWritable> {

291

private IntWritable result = new IntWritable();

292

293

@Override

294

public void reduce(Text key, Iterator<IntWritable> values,

295

OutputCollector<Text, IntWritable> output, Reporter reporter) {

296

int sum = 0;

297

while (values.hasNext()) {

298

sum += values.next().get();

299

}

300

result.set(sum);

301

output.collect(key, result);

302

}

303

304

@Override public void configure(JobConf job) {}

305

@Override public void close() {}

306

}

307

308

// Use combinable reduce function

309

DataSet<Tuple2<Text, IntWritable>> words = // ... word data

310

311

DataSet<Tuple2<Text, IntWritable>> wordCounts = words

312

.groupBy(0)

313

.reduceGroup(new HadoopReduceCombineFunction<>(

314

new WordCountReducer(), // Final reducer

315

new WordCountCombiner() // Pre-aggregation combiner

316

));

317

```

318

319

## Integration Patterns

320

321

### Configuration Management

322

323

All function wrappers support Hadoop configuration through JobConf:

324

325

```java

326

JobConf conf = new JobConf();

327

conf.set("mapred.textoutputformat.separator", "\t");

328

conf.setInt("mapred.max.split.size", 1024 * 1024);

329

330

HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunc =

331

new HadoopMapFunction<>(new WordTokenizer(), conf);

332

```

333

334

### Error Handling

335

336

Function wrappers properly propagate Hadoop exceptions:

337

338

- **Configuration errors**: Invalid JobConf settings are reported during open()

339

- **Processing errors**: Mapper/Reducer exceptions are propagated to Flink

340

- **Resource cleanup**: Proper cleanup of Hadoop resources on failure or completion

341

342

### Performance Optimization

343

344

Best practices for optimal performance:

345

346

1. **Use combiners**: HadoopReduceCombineFunction for pre-aggregation

347

2. **Configure parallelism**: Adjust Flink parallelism based on data size

348

3. **Memory management**: Configure appropriate heap sizes for Hadoop operations

349

4. **Reuse objects**: Hadoop's object reuse patterns are preserved

350

351

## Migration from MapReduce

352

353

### Direct Translation

354

355

Existing MapReduce jobs can be directly translated to Flink:

356

357

```java

358

// Original MapReduce job structure

359

Job job = Job.getInstance(conf, "word count");

360

job.setJarByClass(WordCount.class);

361

job.setMapperClass(WordTokenizer.class);

362

job.setCombinerClass(WordCountReducer.class);

363

job.setReducerClass(WordCountReducer.class);

364

365

// Equivalent Flink application

366

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

367

DataSet<Tuple2<LongWritable, Text>> input = // ... input

368

DataSet<Tuple2<Text, IntWritable>> result = input

369

.flatMap(new HadoopMapFunction<>(new WordTokenizer()))

370

.groupBy(0)

371

.reduceGroup(new HadoopReduceCombineFunction<>(

372

new WordCountReducer(),

373

new WordCountReducer() // Same as combiner

374

));

375

```

376

377

### Advantages over MapReduce

378

379

1. **Iterative algorithms**: Flink's cyclic data flows vs MapReduce's acyclic model

380

2. **Memory management**: Flink's managed memory vs Hadoop's disk-based shuffling

381

3. **Real-time processing**: Stream processing capabilities alongside batch

382

4. **Lower latency**: Reduced job startup and coordination overhead