0
# Apache Flink Hadoop Compatibility
1
2
Apache Flink Hadoop Compatibility provides a comprehensive integration layer between Apache Flink and Hadoop MapReduce ecosystems. It enables seamless use of existing Hadoop InputFormats and OutputFormats within Flink applications, supporting both legacy MapRed API and modern MapReduce API, with full Java and Scala language bindings.
3
4
## Package Information
5
6
- **Package Name**: flink-hadoop-compatibility_2.11
7
- **Package Type**: Maven
8
- **Language**: Java/Scala
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
14
<version>1.14.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
**Java:**
21
```java
22
import org.apache.flink.hadoopcompatibility.HadoopInputs;
23
import org.apache.flink.hadoopcompatibility.HadoopUtils;
24
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
25
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
26
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
27
```
28
29
**Scala:**
30
```scala
31
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
32
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat
33
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat
34
```
35
36
## Basic Usage
37
38
**Reading Hadoop Files with Java:**
39
```java
40
import org.apache.flink.hadoopcompatibility.HadoopInputs;
41
import org.apache.flink.api.java.ExecutionEnvironment;
42
import org.apache.flink.api.java.DataSet;
43
import org.apache.flink.api.java.tuple.Tuple2;
44
import org.apache.hadoop.mapred.TextInputFormat;
45
import org.apache.hadoop.io.LongWritable;
46
import org.apache.hadoop.io.Text;
47
48
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
49
50
// Create Hadoop InputFormat for reading text files
51
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
52
HadoopInputs.readHadoopFile(
53
new TextInputFormat(),
54
LongWritable.class,
55
Text.class,
56
"hdfs://path/to/input"
57
)
58
);
59
60
// Process the data
61
DataSet<String> lines = input.map(tuple -> tuple.f1.toString());
62
```
63
64
**Reading Hadoop Files with Scala:**
65
```scala
66
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
67
import org.apache.flink.api.scala._
68
import org.apache.hadoop.mapred.TextInputFormat
69
import org.apache.hadoop.io.{LongWritable, Text}
70
71
val env = ExecutionEnvironment.getExecutionEnvironment
72
73
// Create Hadoop InputFormat for reading text files
74
val input: DataSet[(LongWritable, Text)] = env.createInput(
75
HadoopInputs.readHadoopFile(
76
new TextInputFormat(),
77
classOf[LongWritable],
78
classOf[Text],
79
"hdfs://path/to/input"
80
)
81
)
82
83
// Process the data
84
val lines = input.map(_._2.toString)
85
```
86
87
## Architecture
88
89
The Hadoop Compatibility library is structured around several key architectural components:
90
91
- **Entry Point Utilities**: `HadoopInputs` classes provide convenient factory methods for creating Flink wrappers
92
- **Input/Output Format Wrappers**: Bridge classes that adapt Hadoop formats to Flink's InputFormat and OutputFormat interfaces
93
- **Type System Integration**: `WritableTypeInfo` and related classes ensure proper serialization of Hadoop types within Flink
94
- **Dual API Support**: Complete coverage of both legacy MapRed API and modern MapReduce API
95
- **Language Bindings**: Native Java and Scala APIs with appropriate language conventions
96
- **MapReduce Function Compatibility**: Wrappers to use Hadoop Mapper and Reducer functions directly in Flink
97
98
## Capabilities
99
100
### Input Format Integration
101
102
Primary utilities for reading data from Hadoop InputFormats into Flink DataSets, supporting both file-based and custom InputFormats with automatic type conversion.
103
104
```java { .api }
105
// Java MapRed API
106
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
107
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
108
Class<K> key, Class<V> value, String inputPath, JobConf job);
109
110
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
111
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
112
Class<K> key, Class<V> value, JobConf job);
113
114
// Java MapReduce API
115
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
116
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
117
Class<K> key, Class<V> value, String inputPath, Job job) throws IOException;
118
```
119
120
[Input Format Integration](./input-formats.md)
121
122
### Output Format Integration
123
124
Complete support for writing Flink DataSets to Hadoop OutputFormats, enabling integration with existing Hadoop data storage systems and custom output processing.
125
126
```java { .api }
127
// Java MapRed OutputFormat wrapper
128
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
129
public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job);
130
public void writeRecord(Tuple2<K, V> record) throws IOException;
131
}
132
133
// Java MapReduce OutputFormat wrapper
134
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
135
public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job);
136
public void writeRecord(Tuple2<K, V> record) throws IOException;
137
}
138
```
139
140
[Output Format Integration](./output-formats.md)
141
142
### Type System and Serialization
143
144
Advanced type system integration allowing Hadoop Writable types to work seamlessly within Flink's type system, with optimized serialization and comparison.
145
146
```java { .api }
147
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
148
public WritableTypeInfo(Class<T> typeClass);
149
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig);
150
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);
151
}
152
```
153
154
[Type System Integration](./type-system.md)
155
156
### MapReduce Function Compatibility
157
158
Direct integration of Hadoop Mapper and Reducer functions into Flink workflows, supporting both simple and complex MapReduce patterns with combine functionality.
159
160
```java { .api }
161
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
162
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
163
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
164
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
165
}
166
167
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
168
extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
169
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
170
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
171
}
172
```
173
174
[MapReduce Functions](./mapreduce-functions.md)
175
176
### Scala API Bindings
177
178
Native Scala API providing idiomatic interfaces with implicit type information, tuple syntax, and functional programming patterns for Hadoop integration.
179
180
```scala { .api }
181
object HadoopInputs {
182
def readHadoopFile[K, V](
183
mapredInputFormat: MapredFileInputFormat[K, V],
184
key: Class[K], value: Class[V], inputPath: String, job: JobConf)
185
(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
186
187
def createHadoopInput[K, V](
188
mapredInputFormat: MapredInputFormat[K, V],
189
key: Class[K], value: Class[V], job: JobConf)
190
(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
191
}
192
```
193
194
[Scala API](./scala-api.md)
195
196
### Configuration and Utilities
197
198
Utility functions for Hadoop configuration management, command-line argument parsing, and seamless integration with existing Hadoop tooling and workflows.
199
200
```java { .api }
201
public class HadoopUtils {
202
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
203
}
204
```
205
206
[Configuration Utilities](./configuration.md)
207
208
## Common Type Definitions
209
210
```java { .api }
211
// Core Flink types used throughout the API
212
import org.apache.flink.api.java.tuple.Tuple2;
213
import org.apache.flink.api.common.typeinfo.TypeInformation;
214
import org.apache.flink.api.java.ExecutionEnvironment;
215
import org.apache.flink.api.java.DataSet;
216
217
// Hadoop configuration types
218
import org.apache.hadoop.mapred.JobConf;
219
import org.apache.hadoop.mapreduce.Job;
220
221
// Common Hadoop Writable types
222
import org.apache.hadoop.io.Writable;
223
import org.apache.hadoop.io.Text;
224
import org.apache.hadoop.io.LongWritable;
225
import org.apache.hadoop.io.IntWritable;
226
```