or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdscala-api.mdtype-system.md

index.mddocs/

0

# Apache Flink Hadoop Compatibility

1

2

Apache Flink Hadoop Compatibility provides a comprehensive integration layer between Apache Flink and Hadoop MapReduce ecosystems. It enables seamless use of existing Hadoop InputFormats and OutputFormats within Flink applications, supporting both legacy MapRed API and modern MapReduce API, with full Java and Scala language bindings.

3

4

## Package Information

5

6

- **Package Name**: flink-hadoop-compatibility_2.11

7

- **Package Type**: Maven

8

- **Language**: Java/Scala

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-hadoop-compatibility_2.11</artifactId>

14

<version>1.14.6</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

**Java:**

21

```java

22

import org.apache.flink.hadoopcompatibility.HadoopInputs;

23

import org.apache.flink.hadoopcompatibility.HadoopUtils;

24

import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;

25

import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;

26

import org.apache.flink.api.java.typeutils.WritableTypeInfo;

27

```

28

29

**Scala:**

30

```scala

31

import org.apache.flink.hadoopcompatibility.scala.HadoopInputs

32

import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat

33

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat

34

```

35

36

## Basic Usage

37

38

**Reading Hadoop Files with Java:**

39

```java

40

import org.apache.flink.hadoopcompatibility.HadoopInputs;

41

import org.apache.flink.api.java.ExecutionEnvironment;

42

import org.apache.flink.api.java.DataSet;

43

import org.apache.flink.api.java.tuple.Tuple2;

44

import org.apache.hadoop.mapred.TextInputFormat;

45

import org.apache.hadoop.io.LongWritable;

46

import org.apache.hadoop.io.Text;

47

48

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

49

50

// Create Hadoop InputFormat for reading text files

51

DataSet<Tuple2<LongWritable, Text>> input = env.createInput(

52

HadoopInputs.readHadoopFile(

53

new TextInputFormat(),

54

LongWritable.class,

55

Text.class,

56

"hdfs://path/to/input"

57

)

58

);

59

60

// Process the data

61

DataSet<String> lines = input.map(tuple -> tuple.f1.toString());

62

```

63

64

**Reading Hadoop Files with Scala:**

65

```scala

66

import org.apache.flink.hadoopcompatibility.scala.HadoopInputs

67

import org.apache.flink.api.scala._

68

import org.apache.hadoop.mapred.TextInputFormat

69

import org.apache.hadoop.io.{LongWritable, Text}

70

71

val env = ExecutionEnvironment.getExecutionEnvironment

72

73

// Create Hadoop InputFormat for reading text files

74

val input: DataSet[(LongWritable, Text)] = env.createInput(

75

HadoopInputs.readHadoopFile(

76

new TextInputFormat(),

77

classOf[LongWritable],

78

classOf[Text],

79

"hdfs://path/to/input"

80

)

81

)

82

83

// Process the data

84

val lines = input.map(_._2.toString)

85

```

86

87

## Architecture

88

89

The Hadoop Compatibility library is structured around several key architectural components:

90

91

- **Entry Point Utilities**: `HadoopInputs` classes provide convenient factory methods for creating Flink wrappers

92

- **Input/Output Format Wrappers**: Bridge classes that adapt Hadoop formats to Flink's InputFormat and OutputFormat interfaces

93

- **Type System Integration**: `WritableTypeInfo` and related classes ensure proper serialization of Hadoop types within Flink

94

- **Dual API Support**: Complete coverage of both legacy MapRed API and modern MapReduce API

95

- **Language Bindings**: Native Java and Scala APIs with appropriate language conventions

96

- **MapReduce Function Compatibility**: Wrappers to use Hadoop Mapper and Reducer functions directly in Flink

97

98

## Capabilities

99

100

### Input Format Integration

101

102

Primary utilities for reading data from Hadoop InputFormats into Flink DataSets, supporting both file-based and custom InputFormats with automatic type conversion.

103

104

```java { .api }

105

// Java MapRed API

106

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

107

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

108

Class<K> key, Class<V> value, String inputPath, JobConf job);

109

110

public static <K, V> HadoopInputFormat<K, V> createHadoopInput(

111

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

112

Class<K> key, Class<V> value, JobConf job);

113

114

// Java MapReduce API

115

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(

116

org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,

117

Class<K> key, Class<V> value, String inputPath, Job job) throws IOException;

118

```

119

120

[Input Format Integration](./input-formats.md)

121

122

### Output Format Integration

123

124

Complete support for writing Flink DataSets to Hadoop OutputFormats, enabling integration with existing Hadoop data storage systems and custom output processing.

125

126

```java { .api }

127

// Java MapRed OutputFormat wrapper

128

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

129

public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job);

130

public void writeRecord(Tuple2<K, V> record) throws IOException;

131

}

132

133

// Java MapReduce OutputFormat wrapper

134

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

135

public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job);

136

public void writeRecord(Tuple2<K, V> record) throws IOException;

137

}

138

```

139

140

[Output Format Integration](./output-formats.md)

141

142

### Type System and Serialization

143

144

Advanced type system integration allowing Hadoop Writable types to work seamlessly within Flink's type system, with optimized serialization and comparison.

145

146

```java { .api }

147

public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {

148

public WritableTypeInfo(Class<T> typeClass);

149

public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig);

150

public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);

151

}

152

```

153

154

[Type System Integration](./type-system.md)

155

156

### MapReduce Function Compatibility

157

158

Direct integration of Hadoop Mapper and Reducer functions into Flink workflows, supporting both simple and complex MapReduce patterns with combine functionality.

159

160

```java { .api }

161

public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

162

extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {

163

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);

164

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);

165

}

166

167

public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

168

extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {

169

public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);

170

public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);

171

}

172

```

173

174

[MapReduce Functions](./mapreduce-functions.md)

175

176

### Scala API Bindings

177

178

Native Scala API providing idiomatic interfaces with implicit type information, tuple syntax, and functional programming patterns for Hadoop integration.

179

180

```scala { .api }

181

object HadoopInputs {

182

def readHadoopFile[K, V](

183

mapredInputFormat: MapredFileInputFormat[K, V],

184

key: Class[K], value: Class[V], inputPath: String, job: JobConf)

185

(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

186

187

def createHadoopInput[K, V](

188

mapredInputFormat: MapredInputFormat[K, V],

189

key: Class[K], value: Class[V], job: JobConf)

190

(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

191

}

192

```

193

194

[Scala API](./scala-api.md)

195

196

### Configuration and Utilities

197

198

Utility functions for Hadoop configuration management, command-line argument parsing, and seamless integration with existing Hadoop tooling and workflows.

199

200

```java { .api }

201

public class HadoopUtils {

202

public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;

203

}

204

```

205

206

[Configuration Utilities](./configuration.md)

207

208

## Common Type Definitions

209

210

```java { .api }

211

// Core Flink types used throughout the API

212

import org.apache.flink.api.java.tuple.Tuple2;

213

import org.apache.flink.api.common.typeinfo.TypeInformation;

214

import org.apache.flink.api.java.ExecutionEnvironment;

215

import org.apache.flink.api.java.DataSet;

216

217

// Hadoop configuration types

218

import org.apache.hadoop.mapred.JobConf;

219

import org.apache.hadoop.mapreduce.Job;

220

221

// Common Hadoop Writable types

222

import org.apache.hadoop.io.Writable;

223

import org.apache.hadoop.io.Text;

224

import org.apache.hadoop.io.LongWritable;

225

import org.apache.hadoop.io.IntWritable;

226

```