or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro.mdhadoop.mdhbase.mdhcatalog.mdindex.mdjdbc.md

hadoop.mddocs/

0

# Hadoop Compatibility

1

2

Hadoop MapReduce API compatibility layer for Flink, enabling seamless reuse of existing Hadoop Mapper and Reducer implementations within Flink batch programs.

3

4

## Capabilities

5

6

### HadoopMapFunction

7

8

Wraps Hadoop Mapper (mapred API) as a Flink FlatMapFunction for seamless integration.

9

10

```java { .api }

11

/**

12

* Wraps Hadoop Mapper (mapred API) to Flink FlatMapFunction

13

* @param <KEYIN> Input key type for the Hadoop mapper

14

* @param <VALUEIN> Input value type for the Hadoop mapper

15

* @param <KEYOUT> Output key type from the Hadoop mapper

16

* @param <VALUEOUT> Output value type from the Hadoop mapper

17

*/

18

@Public

19

public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

20

extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>

21

implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {

22

23

/**

24

* Creates a HadoopMapFunction with a Hadoop mapper

25

* @param hadoopMapper The Hadoop Mapper instance to wrap

26

*/

27

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);

28

29

/**

30

* Creates a HadoopMapFunction with a Hadoop mapper and job configuration

31

* @param hadoopMapper The Hadoop Mapper instance to wrap

32

* @param conf Hadoop JobConf with configuration parameters

33

*/

34

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);

35

36

/**

37

* Returns the type information for the output tuples

38

* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>

39

*/

40

public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();

41

}

42

```

43

44

**Usage Example:**

45

46

```java

47

import org.apache.flink.api.java.ExecutionEnvironment;

48

import org.apache.flink.api.java.DataSet;

49

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;

50

import org.apache.flink.api.java.tuple.Tuple2;

51

import org.apache.hadoop.mapred.Mapper;

52

import org.apache.hadoop.mapred.JobConf;

53

import org.apache.hadoop.io.Text;

54

import org.apache.hadoop.io.LongWritable;

55

56

// Your existing Hadoop Mapper

57

public class WordCountMapper implements Mapper<LongWritable, Text, Text, LongWritable> {

58

private final static LongWritable one = new LongWritable(1);

59

private Text word = new Text();

60

61

public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output,

62

Reporter reporter) throws IOException {

63

String[] words = value.toString().split("\\s+");

64

for (String w : words) {

65

word.set(w);

66

output.collect(word, one);

67

}

68

}

69

70

public void configure(JobConf job) {}

71

public void close() throws IOException {}

72

}

73

74

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

75

76

// Wrap Hadoop mapper in Flink function

77

HadoopMapFunction<LongWritable, Text, Text, LongWritable> mapFunction =

78

new HadoopMapFunction<>(new WordCountMapper());

79

80

// Use in Flink program

81

DataSet<Tuple2<LongWritable, Text>> input = // ... your input data

82

DataSet<Tuple2<Text, LongWritable>> words = input.flatMap(mapFunction);

83

```

84

85

### HadoopReduceFunction

86

87

Wraps Hadoop Reducer (mapred API) as a non-combinable Flink GroupReduceFunction.

88

89

```java { .api }

90

/**

91

* Wraps Hadoop Reducer (mapred API) to non-combinable Flink GroupReduceFunction

92

* @param <KEYIN> Input key type for the Hadoop reducer

93

* @param <VALUEIN> Input value type for the Hadoop reducer

94

* @param <KEYOUT> Output key type from the Hadoop reducer

95

* @param <VALUEOUT> Output value type from the Hadoop reducer

96

*/

97

@Public

98

public class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

99

extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>

100

implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {

101

102

/**

103

* Creates a HadoopReduceFunction with a Hadoop reducer

104

* @param hadoopReducer The Hadoop Reducer instance to wrap

105

*/

106

public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);

107

108

/**

109

* Creates a HadoopReduceFunction with a Hadoop reducer and job configuration

110

* @param hadoopReducer The Hadoop Reducer instance to wrap

111

* @param conf Hadoop JobConf with configuration parameters

112

*/

113

public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);

114

115

/**

116

* Returns the type information for the output tuples

117

* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>

118

*/

119

public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();

120

}

121

```

122

123

**Usage Example:**

124

125

```java

126

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;

127

import org.apache.hadoop.mapred.Reducer;

128

import org.apache.hadoop.mapred.OutputCollector;

129

import org.apache.hadoop.mapred.Reporter;

130

131

// Your existing Hadoop Reducer

132

public class WordCountReducer implements Reducer<Text, LongWritable, Text, LongWritable> {

133

public void reduce(Text key, Iterator<LongWritable> values,

134

OutputCollector<Text, LongWritable> output, Reporter reporter)

135

throws IOException {

136

long count = 0;

137

while (values.hasNext()) {

138

count += values.next().get();

139

}

140

output.collect(key, new LongWritable(count));

141

}

142

143

public void configure(JobConf job) {}

144

public void close() throws IOException {}

145

}

146

147

// Wrap Hadoop reducer in Flink function

148

HadoopReduceFunction<Text, LongWritable, Text, LongWritable> reduceFunction =

149

new HadoopReduceFunction<>(new WordCountReducer());

150

151

// Use in Flink program (after grouping by key)

152

DataSet<Tuple2<Text, LongWritable>> wordCounts = words

153

.groupBy(0) // Group by key (position 0 in tuple)

154

.reduceGroup(reduceFunction);

155

```

156

157

### HadoopReduceCombineFunction

158

159

Wraps both Hadoop Reducer and Combiner (mapred API) as a combinable Flink GroupReduceFunction for optimized processing.

160

161

```java { .api }

162

/**

163

* Wraps Hadoop Reducer and Combiner (mapred API) to combinable Flink GroupReduceFunction

164

* @param <KEYIN> Input key type for the Hadoop reducer

165

* @param <VALUEIN> Input value type for the Hadoop reducer

166

* @param <KEYOUT> Output key type from the Hadoop reducer

167

* @param <VALUEOUT> Output value type from the Hadoop reducer

168

*/

169

@Public

170

public class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

171

extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>

172

implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,

173

ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {

174

175

/**

176

* Creates a HadoopReduceCombineFunction with separate reducer and combiner

177

* @param hadoopReducer The Hadoop Reducer instance to wrap

178

* @param hadoopCombiner The Hadoop Combiner instance to wrap

179

*/

180

public HadoopReduceCombineFunction(

181

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,

182

Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner);

183

184

/**

185

* Creates a HadoopReduceCombineFunction with reducer, combiner, and job configuration

186

* @param hadoopReducer The Hadoop Reducer instance to wrap

187

* @param hadoopCombiner The Hadoop Combiner instance to wrap

188

* @param conf Hadoop JobConf with configuration parameters

189

*/

190

public HadoopReduceCombineFunction(

191

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,

192

Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner,

193

JobConf conf);

194

195

/**

196

* Returns the type information for the output tuples

197

* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>

198

*/

199

public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();

200

}

201

```

202

203

**Usage Example:**

204

205

```java

206

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;

207

208

// Use the same reducer as both combiner and reducer for efficiency

209

WordCountReducer reducer = new WordCountReducer();

210

WordCountReducer combiner = new WordCountReducer();

211

212

// Wrap with combine functionality

213

HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable> reduceCombineFunction =

214

new HadoopReduceCombineFunction<>(reducer, combiner);

215

216

// Use in Flink program with automatic combining

217

DataSet<Tuple2<Text, LongWritable>> wordCounts = words

218

.groupBy(0)

219

.reduceGroup(reduceCombineFunction);

220

```

221

222

### HadoopOutputCollector

223

224

Wraps Flink Collector as Hadoop OutputCollector for compatibility with Hadoop mapper/reducer implementations.

225

226

```java { .api }

227

/**

228

* Wraps Flink OutputCollector as Hadoop OutputCollector

229

* @param <KEY> Key type for collected output

230

* @param <VALUE> Value type for collected output

231

*/

232

public class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {

233

234

/**

235

* Creates a new HadoopOutputCollector

236

*/

237

public HadoopOutputCollector();

238

239

/**

240

* Sets the wrapped Flink collector

241

* @param flinkCollector The Flink collector to wrap

242

*/

243

public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector);

244

245

/**

246

* Collects a key-value pair (implementation of Hadoop OutputCollector interface)

247

* @param key The key to collect

248

* @param val The value to collect

249

*/

250

public void collect(final KEY key, final VALUE val) throws IOException;

251

}

252

```

253

254

### HadoopTupleUnwrappingIterator

255

256

Wraps Flink Tuple2 iterator to provide an iterator over values only, compatible with Hadoop reducer input format.

257

258

```java { .api }

259

/**

260

* Wraps Flink Tuple2 iterator into iterator over values only

261

* @param <KEY> Key type of the tuples

262

* @param <VALUE> Value type of the tuples

263

*/

264

public class HadoopTupleUnwrappingIterator<KEY,VALUE>

265

extends TupleUnwrappingIterator<VALUE, KEY>

266

implements java.io.Serializable {

267

268

/**

269

* Creates a new HadoopTupleUnwrappingIterator

270

* @param keySerializer Serializer for the key type

271

*/

272

public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer);

273

274

/**

275

* Sets the Flink iterator to wrap

276

* @param iterator The Flink Tuple2 iterator

277

*/

278

public void set(final Iterator<Tuple2<KEY,VALUE>> iterator);

279

280

/**

281

* Checks if more elements are available

282

* @return true if more elements exist, false otherwise

283

*/

284

public boolean hasNext();

285

286

/**

287

* Returns the next value in the iteration

288

* @return The next value

289

*/

290

public VALUE next();

291

292

/**

293

* Returns the current key associated with the last returned value

294

* @return The current key

295

*/

296

public KEY getCurrentKey();

297

298

/**

299

* Remove operation is not supported

300

* @throws UnsupportedOperationException Always thrown

301

*/

302

public void remove();

303

}

304

```

305

306

## Complete Word Count Example

307

308

Here's a complete example showing how to use Hadoop MapReduce components in a Flink program:

309

310

```java

311

import org.apache.flink.api.java.ExecutionEnvironment;

312

import org.apache.flink.api.java.DataSet;

313

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;

314

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;

315

import org.apache.flink.api.java.tuple.Tuple2;

316

import org.apache.hadoop.mapred.*;

317

import org.apache.hadoop.io.Text;

318

import org.apache.hadoop.io.LongWritable;

319

320

public class FlinkHadoopWordCount {

321

public static void main(String[] args) throws Exception {

322

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

323

324

// Read input text

325

DataSet<String> text = env.fromElements(

326

"Hello World",

327

"Hello Flink",

328

"Hello Hadoop"

329

);

330

331

// Convert to Hadoop input format (line number, text)

332

DataSet<Tuple2<LongWritable, Text>> hadoopInput = text

333

.map((String line) -> new Tuple2<>(

334

new LongWritable(0),

335

new Text(line)

336

));

337

338

// Use Hadoop Mapper

339

HadoopMapFunction<LongWritable, Text, Text, LongWritable> mapFunction =

340

new HadoopMapFunction<>(new WordCountMapper());

341

342

DataSet<Tuple2<Text, LongWritable>> words = hadoopInput.flatMap(mapFunction);

343

344

// Use Hadoop Reducer with Combiner

345

HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable> reduceFunction =

346

new HadoopReduceCombineFunction<>(

347

new WordCountReducer(), // reducer

348

new WordCountReducer() // combiner (same logic)

349

);

350

351

DataSet<Tuple2<Text, LongWritable>> wordCounts = words

352

.groupBy(0)

353

.reduceGroup(reduceFunction);

354

355

wordCounts.print();

356

env.execute("Hadoop-Flink Word Count");

357

}

358

}

359

```

360

361

## Common Types

362

363

```java { .api }

364

import org.apache.flink.api.common.functions.RichFlatMapFunction;

365

import org.apache.flink.api.common.functions.RichGroupReduceFunction;

366

import org.apache.flink.api.common.functions.GroupCombineFunction;

367

import org.apache.flink.api.common.typeinfo.TypeInformation;

368

import org.apache.flink.util.Collector;

369

import org.apache.flink.api.java.tuple.Tuple2;

370

import org.apache.hadoop.mapred.Mapper;

371

import org.apache.hadoop.mapred.Reducer;

372

import org.apache.hadoop.mapred.OutputCollector;

373

import org.apache.hadoop.mapred.Reporter;

374

import org.apache.hadoop.mapred.JobConf;

375

import java.io.Serializable;

376

import java.util.Iterator;

377

```