0
# Flink Hadoop Compatibility
1
2
Flink Hadoop Compatibility is an Apache Flink library that provides seamless integration between Apache Flink and Apache Hadoop ecosystems. It enables Flink applications to use existing Hadoop InputFormats, OutputFormats, and MapReduce functions without modification, supporting both the legacy mapred API and the newer mapreduce API.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-hadoop-compatibility
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-hadoop-compatibility</artifactId>
14
<version>2.1.0</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.hadoopcompatibility.HadoopInputs;
22
import org.apache.flink.hadoopcompatibility.HadoopUtils;
23
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
24
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
25
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
26
import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;
27
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
28
```
29
30
## Basic Usage
31
32
```java
33
import org.apache.flink.hadoopcompatibility.HadoopInputs;
34
import org.apache.flink.api.java.ExecutionEnvironment;
35
import org.apache.flink.api.java.DataSet;
36
import org.apache.flink.api.java.tuple.Tuple2;
37
import org.apache.hadoop.io.LongWritable;
38
import org.apache.hadoop.io.Text;
39
import org.apache.hadoop.mapred.TextInputFormat;
40
41
// Create execution environment
42
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
43
44
// Create Hadoop input format wrapper
45
HadoopInputFormat<LongWritable, Text> hadoopInput =
46
HadoopInputs.readHadoopFile(
47
new TextInputFormat(),
48
LongWritable.class,
49
Text.class,
50
"hdfs://input/path"
51
);
52
53
// Use as Flink DataSet source
54
DataSet<Tuple2<LongWritable, Text>> dataset = env.createInput(hadoopInput);
55
56
// Process data using Flink operations
57
dataset.map(record -> record.f1.toString().toUpperCase())
58
.print();
59
```
60
61
## Architecture
62
63
Flink Hadoop Compatibility is built around several key components:
64
65
- **Input/Output Format Wrappers**: Adapts Hadoop InputFormats and OutputFormats to work with Flink's data processing model
66
- **API Version Support**: Dual support for legacy mapred API and newer mapreduce API variants
67
- **Type System Integration**: Custom TypeInformation and serialization for Hadoop Writable types
68
- **Function Wrappers**: Adapters that convert Hadoop Mappers and Reducers into Flink functions
69
- **Utility Classes**: Helper methods for configuration parsing and format creation
70
71
All wrapped formats produce and consume `Tuple2<K, V>` objects where `f0` is the key and `f1` is the value, maintaining compatibility with Hadoop's key-value paradigm.
72
73
## Capabilities
74
75
### Hadoop Input Integration
76
77
Core functionality for reading data from Hadoop InputFormats, supporting both file-based and generic input sources with full type safety.
78
79
```java { .api }
80
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
81
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
82
Class<K> key,
83
Class<V> value,
84
String inputPath
85
);
86
87
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
88
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
89
Class<K> key,
90
Class<V> value,
91
String inputPath,
92
JobConf job
93
);
94
95
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
96
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
97
Class<K> key,
98
Class<V> value,
99
JobConf job
100
);
101
```
102
103
[Input Formats](./input-formats.md)
104
105
### Hadoop Output Integration
106
107
Functionality for writing data to Hadoop OutputFormats, enabling Flink applications to output data in Hadoop-compatible formats.
108
109
```java { .api }
110
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
111
public HadoopOutputFormat(
112
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
113
JobConf job
114
);
115
116
public void writeRecord(Tuple2<K, V> record) throws IOException;
117
}
118
```
119
120
[Output Formats](./output-formats.md)
121
122
### MapReduce Function Integration
123
124
Wrappers that convert Hadoop Mappers and Reducers into Flink-compatible functions, enabling reuse of existing MapReduce logic.
125
126
```java { .api }
127
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
128
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
129
130
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
131
132
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
133
134
public void flatMap(
135
Tuple2<KEYIN, VALUEIN> value,
136
Collector<Tuple2<KEYOUT, VALUEOUT>> out
137
) throws Exception;
138
}
139
```
140
141
[MapReduce Functions](./mapreduce-functions.md)
142
143
### Type System Integration
144
145
Custom type information and serialization support for Hadoop Writable types, ensuring seamless integration with Flink's type system.
146
147
```java { .api }
148
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
149
public WritableTypeInfo(Class<T> typeClass);
150
151
static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
152
153
public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);
154
}
155
```
156
157
[Type System](./type-system.md)
158
159
### Configuration Utilities
160
161
Utility functions for handling Hadoop configuration and command-line argument parsing.
162
163
```java { .api }
164
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
165
```
166
167
[Utilities](./utilities.md)
168
169
## Common Types
170
171
```java { .api }
172
// Core Flink tuple type used throughout the API
173
import org.apache.flink.api.java.tuple.Tuple2;
174
175
// Hadoop configuration types
176
import org.apache.hadoop.mapred.JobConf;
177
import org.apache.hadoop.mapreduce.Job;
178
179
// Flink type system integration
180
import org.apache.flink.api.common.typeinfo.TypeInformation;
181
import org.apache.flink.api.java.typeutils.TypeExtractor;
182
```