or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdtype-system.mdutilities.md

mapreduce-functions.mddocs/

0

# MapReduce Function Integration

1

2

Wrappers that convert Hadoop Mappers and Reducers into Flink-compatible functions, enabling reuse of existing MapReduce logic within Flink applications. Supports the legacy mapred API with automatic type conversion between Hadoop and Flink data types.

3

4

## Capabilities

5

6

### Hadoop Mapper Integration

7

8

Wrapper that converts a Hadoop Mapper into a Flink FlatMapFunction.

9

10

```java { .api }

11

public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

12

extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>

13

implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {

14

15

/**

16

* Constructor with Hadoop Mapper

17

* @param hadoopMapper The Hadoop Mapper to wrap

18

*/

19

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);

20

21

/**

22

* Constructor with Hadoop Mapper and configuration

23

* @param hadoopMapper The Hadoop Mapper to wrap

24

* @param conf JobConf configuration for the mapper

25

*/

26

public HadoopMapFunction(

27

Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,

28

JobConf conf

29

);

30

31

/**

32

* Open method called before processing starts

33

* @param openContext Runtime context for the function

34

* @throws Exception if initialization fails

35

*/

36

public void open(OpenContext openContext) throws Exception;

37

38

/**

39

* Process a single input record through the Hadoop Mapper

40

* @param value Input record as Tuple2<KEYIN, VALUEIN>

41

* @param out Collector for output records

42

* @throws Exception if processing fails

43

*/

44

public void flatMap(

45

final Tuple2<KEYIN, VALUEIN> value,

46

final Collector<Tuple2<KEYOUT, VALUEOUT>> out

47

) throws Exception;

48

49

/**

50

* Get type information for the produced output type

51

* @return TypeInformation for Tuple2<KEYOUT, VALUEOUT>

52

*/

53

public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();

54

}

55

```

56

57

**Usage Example:**

58

59

```java

60

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;

61

import org.apache.flink.api.java.DataSet;

62

import org.apache.flink.api.java.tuple.Tuple2;

63

import org.apache.hadoop.mapred.Mapper;

64

import org.apache.hadoop.mapred.JobConf;

65

import org.apache.hadoop.io.LongWritable;

66

import org.apache.hadoop.io.Text;

67

import org.apache.hadoop.io.IntWritable;

68

69

// Example Hadoop Mapper that extracts word lengths

70

public static class WordLengthMapper implements Mapper<LongWritable, Text, Text, IntWritable> {

71

public void map(LongWritable key, Text value,

72

OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

73

String[] words = value.toString().split("\\s+");

74

for (String word : words) {

75

output.collect(new Text(word), new IntWritable(word.length()));

76

}

77

}

78

79

public void configure(JobConf job) {}

80

public void close() throws IOException {}

81

}

82

83

// Wrap the Hadoop Mapper for use in Flink

84

HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunction =

85

new HadoopMapFunction<>(new WordLengthMapper());

86

87

// Use in Flink DataSet API

88

DataSet<Tuple2<LongWritable, Text>> input = // ... your input dataset

89

DataSet<Tuple2<Text, IntWritable>> mapped = input.flatMap(mapFunction);

90

```

91

92

### Hadoop Reducer Integration

93

94

Wrapper that converts a Hadoop Reducer into a Flink window function for both keyed and non-keyed streams.

95

96

```java { .api }

97

public final class HadoopReducerWrappedFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

98

extends RichWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, KEYIN, GlobalWindow>

99

implements AllWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, GlobalWindow>,

100

ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {

101

102

/**

103

* Constructor with Hadoop Reducer

104

* @param hadoopReducer The Hadoop Reducer to wrap

105

*/

106

public HadoopReducerWrappedFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);

107

108

/**

109

* Constructor with Hadoop Reducer and configuration

110

* @param hadoopReducer The Hadoop Reducer to wrap

111

* @param conf JobConf configuration for the reducer

112

*/

113

public HadoopReducerWrappedFunction(

114

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,

115

JobConf conf

116

);

117

118

/**

119

* Open method called before processing starts

120

* @param openContext Runtime context for the function

121

* @throws Exception if initialization fails

122

*/

123

public void open(OpenContext openContext) throws Exception;

124

125

/**

126

* Get type information for the produced output type

127

* @return TypeInformation for Tuple2<KEYOUT, VALUEOUT>

128

*/

129

public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();

130

131

/**

132

* Apply function for keyed windows

133

* @param key The key for this window

134

* @param globalWindow The window (always GlobalWindow)

135

* @param iterable Input records for this key

136

* @param collector Collector for output records

137

* @throws Exception if processing fails

138

*/

139

public void apply(

140

KEYIN key,

141

GlobalWindow globalWindow,

142

Iterable<Tuple2<KEYIN, VALUEIN>> iterable,

143

Collector<Tuple2<KEYOUT, VALUEOUT>> collector

144

) throws Exception;

145

146

/**

147

* Apply function for non-keyed windows (all data in single partition)

148

* @param globalWindow The window (always GlobalWindow)

149

* @param iterable All input records

150

* @param collector Collector for output records

151

* @throws Exception if processing fails

152

*/

153

public void apply(

154

GlobalWindow globalWindow,

155

Iterable<Tuple2<KEYIN, VALUEIN>> iterable,

156

Collector<Tuple2<KEYOUT, VALUEOUT>> collector

157

) throws Exception;

158

}

159

```

160

161

**Usage Example:**

162

163

```java

164

import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;

165

import org.apache.flink.api.java.DataSet;

166

import org.apache.flink.api.java.tuple.Tuple2;

167

import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;

168

import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;

169

import org.apache.hadoop.mapred.Reducer;

170

import org.apache.hadoop.io.Text;

171

import org.apache.hadoop.io.IntWritable;

172

173

// Example Hadoop Reducer that sums values by key

174

public static class SumReducer implements Reducer<Text, IntWritable, Text, IntWritable> {

175

public void reduce(Text key, Iterator<IntWritable> values,

176

OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

177

int sum = 0;

178

while (values.hasNext()) {

179

sum += values.next().get();

180

}

181

output.collect(key, new IntWritable(sum));

182

}

183

184

public void configure(JobConf job) {}

185

public void close() throws IOException {}

186

}

187

188

// Wrap the Hadoop Reducer for use in Flink

189

HadoopReducerWrappedFunction<Text, IntWritable, Text, IntWritable> reduceFunction =

190

new HadoopReducerWrappedFunction<>(new SumReducer());

191

192

// Use in Flink streaming API with keyed windows

193

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

194

DataStream<Tuple2<Text, IntWritable>> stream = // ... your input stream

195

196

stream

197

.keyBy(tuple -> tuple.f0) // Key by the Text field

198

.window(GlobalWindows.create())

199

.trigger(CountTrigger.of(100)) // Trigger every 100 elements

200

.apply(reduceFunction)

201

.print();

202

203

// Use in batch API (DataSet)

204

DataSet<Tuple2<Text, IntWritable>> dataset = // ... your input dataset

205

DataSet<Tuple2<Text, IntWritable>> reduced = dataset

206

.groupBy(0) // Group by key (f0)

207

.reduceGroup(new GroupReduceFunction<Tuple2<Text, IntWritable>, Tuple2<Text, IntWritable>>() {

208

public void reduce(Iterable<Tuple2<Text, IntWritable>> values,

209

Collector<Tuple2<Text, IntWritable>> out) throws Exception {

210

// Convert to format expected by HadoopReducerWrappedFunction

211

reduceFunction.apply(GlobalWindow.get(), values, out);

212

}

213

});

214

```

215

216

## Advanced Usage Patterns

217

218

### Configuration Integration

219

220

Passing JobConf configuration to wrapped MapReduce functions.

221

222

```java

223

import org.apache.hadoop.mapred.JobConf;

224

225

// Configure Hadoop job parameters

226

JobConf jobConf = new JobConf();

227

jobConf.set("mapreduce.map.memory.mb", "2048");

228

jobConf.set("custom.parameter", "value");

229

230

// Pass configuration to mapper

231

HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapFunction =

232

new HadoopMapFunction<>(new MyHadoopMapper(), jobConf);

233

234

// Pass configuration to reducer

235

HadoopReducerWrappedFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceFunction =

236

new HadoopReducerWrappedFunction<>(new MyHadoopReducer(), jobConf);

237

```

238

239

### Chain Map-Reduce Operations

240

241

Combining multiple Hadoop operations in a Flink pipeline.

242

243

```java

244

// Chain mapper and reducer operations

245

DataSet<Tuple2<LongWritable, Text>> input = // ... input dataset

246

247

DataSet<Tuple2<Text, IntWritable>> result = input

248

.flatMap(mapFunction1) // First Hadoop mapper

249

.flatMap(mapFunction2) // Second Hadoop mapper

250

.groupBy(0) // Group by key

251

.reduceGroup(new GroupReduceFunction<Tuple2<Text, IntWritable>, Tuple2<Text, IntWritable>>() {

252

public void reduce(Iterable<Tuple2<Text, IntWritable>> values,

253

Collector<Tuple2<Text, IntWritable>> out) throws Exception {

254

reduceFunction.apply(GlobalWindow.get(), values, out);

255

}

256

});

257

```

258

259

### Type Safety and Conversion

260

261

Handling type conversion between Flink and Hadoop types.

262

263

```java

264

// Example with custom Writable types

265

public static class CustomWritable implements Writable {

266

private String data;

267

268

public void write(DataOutput out) throws IOException {

269

out.writeUTF(data);

270

}

271

272

public void readFields(DataInput in) throws IOException {

273

data = in.readUTF();

274

}

275

276

// getters/setters...

277

}

278

279

// Mapper using custom types

280

public static class CustomMapper implements Mapper<LongWritable, Text, Text, CustomWritable> {

281

public void map(LongWritable key, Text value,

282

OutputCollector<Text, CustomWritable> output, Reporter reporter) throws IOException {

283

CustomWritable custom = new CustomWritable();

284

custom.setData(value.toString().toUpperCase());

285

output.collect(new Text("processed"), custom);

286

}

287

288

public void configure(JobConf job) {}

289

public void close() throws IOException {}

290

}

291

292

// Use with proper type information

293

HadoopMapFunction<LongWritable, Text, Text, CustomWritable> customMapFunction =

294

new HadoopMapFunction<>(new CustomMapper());

295

296

// Flink will automatically handle Writable serialization

297

DataSet<Tuple2<Text, CustomWritable>> output = input.flatMap(customMapFunction);

298

```

299

300

### Error Handling and Monitoring

301

302

Handling errors and monitoring MapReduce function execution.

303

304

```java

305

public static class MonitoredMapper implements Mapper<LongWritable, Text, Text, IntWritable> {

306

private Counter recordCounter;

307

private Counter errorCounter;

308

309

public void configure(JobConf job) {

310

// Access counters from configuration if needed

311

}

312

313

public void map(LongWritable key, Text value,

314

OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

315

try {

316

// Processing logic

317

String[] words = value.toString().split("\\s+");

318

for (String word : words) {

319

output.collect(new Text(word), new IntWritable(1));

320

reporter.progress(); // Report progress

321

}

322

reporter.incrCounter("Records", "Processed", 1);

323

} catch (Exception e) {

324

reporter.incrCounter("Records", "Errors", 1);

325

throw new IOException("Processing failed", e);

326

}

327

}

328

329

public void close() throws IOException {}

330

}

331

```

332

333

## Key Design Patterns

334

335

### Tuple2 Convention

336

- Input to mappers: `Tuple2<KEYIN, VALUEIN>` where f0=key, f1=value

337

- Output from mappers: `Tuple2<KEYOUT, VALUEOUT>` where f0=key, f1=value

338

- Input to reducers: `Iterable<Tuple2<KEYIN, VALUEIN>>` grouped by key

339

- Output from reducers: `Tuple2<KEYOUT, VALUEOUT>` where f0=key, f1=value

340

341

### Configuration Inheritance

342

JobConf configuration is passed through to the wrapped Hadoop functions, maintaining compatibility with existing MapReduce code.

343

344

### Type Information

345

Automatic type information extraction ensures proper serialization and type safety within Flink's type system.

346

347

### Progress Reporting

348

Hadoop Reporter interface is supported for progress reporting and counter updates, though some functionality may be limited in the Flink execution environment.