Apache Flink Hadoop Compatibility library that enables interoperability between Apache Flink and Apache Hadoop MapReduce
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-compatibility_2.12@1.20.00
# Apache Flink Hadoop Compatibility
1
2
Apache Flink Hadoop Compatibility provides comprehensive compatibility layers that enable Flink applications to seamlessly integrate with Apache Hadoop MapReduce ecosystem components. It offers adapter classes and utilities for using Hadoop InputFormats and OutputFormats within Flink jobs, supporting both the legacy MapRed API and the newer MapReduce API.
3
4
## Package Information
5
6
- **Package Name**: flink-hadoop-compatibility_2.12
7
- **Package Type**: Maven
8
- **Language**: Java/Scala
9
- **Installation**: `<dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.12</artifactId><version>1.20.2</version></dependency>`
10
11
## Core Imports
12
13
```java
14
// Factory classes and utilities
15
import org.apache.flink.hadoopcompatibility.HadoopInputs;
16
import org.apache.flink.hadoopcompatibility.HadoopUtils;
17
18
// MapReduce API (modern) - InputFormat and OutputFormat
19
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
20
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
21
22
// MapRed API (legacy) - InputFormat and OutputFormat
23
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
24
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
25
26
// Function wrappers for MapRed API
27
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
28
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
29
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
30
31
// Type system integration
32
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
33
```
34
35
## Basic Usage
36
37
```java
38
import org.apache.flink.hadoopcompatibility.HadoopInputs;
39
import org.apache.flink.api.java.ExecutionEnvironment;
40
import org.apache.flink.api.java.DataSet;
41
import org.apache.flink.api.java.tuple.Tuple2;
42
import org.apache.flink.util.Collector;
43
import org.apache.hadoop.io.LongWritable;
44
import org.apache.hadoop.io.Text;
45
import org.apache.hadoop.mapred.TextInputFormat;
46
47
// Create Flink execution environment
48
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
49
50
// Read Hadoop text files using legacy MapRed API
51
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
52
HadoopInputs.readHadoopFile(
53
new TextInputFormat(),
54
LongWritable.class,
55
Text.class,
56
"hdfs://path/to/input"
57
)
58
);
59
60
// Process data
61
DataSet<String> words = input
62
.flatMap((Tuple2<LongWritable, Text> line, Collector<String> out) -> {
63
for (String word : line.f1.toString().split("\\s+")) {
64
if (!word.isEmpty()) {
65
out.collect(word.toLowerCase());
66
}
67
}
68
})
69
.returns(String.class);
70
```
71
72
## Architecture
73
74
Apache Flink Hadoop Compatibility is built around several key components:
75
76
- **Input/Output Format Wrappers**: Bridge classes that adapt Hadoop InputFormats and OutputFormats to work with Flink's execution engine
77
- **Type System Integration**: WritableTypeInfo and related classes that enable Hadoop Writable types to work seamlessly with Flink's type system
78
- **Function Wrappers**: Adapters that allow Hadoop Mapper and Reducer implementations to be used as Flink functions
79
- **Utility Classes**: Helper methods and factory functions for common Hadoop integration scenarios
80
- **Dual API Support**: Complete support for both legacy MapRed API (org.apache.hadoop.mapred) and newer MapReduce API (org.apache.hadoop.mapreduce)
81
82
## Capabilities
83
84
### Input and Output Formats
85
86
Comprehensive wrapper classes that enable Hadoop InputFormats and OutputFormats to work seamlessly with Flink DataSets. Supports both legacy MapRed and modern MapReduce APIs.
87
88
```java { .api }
89
// Factory methods for creating input format wrappers
90
class HadoopInputs {
91
static <K, V> HadoopInputFormat<K, V> readHadoopFile(
92
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
93
Class<K> key, Class<V> value, String inputPath
94
);
95
96
static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
97
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
98
Class<K> key, Class<V> value, String inputPath
99
) throws IOException;
100
}
101
```
102
103
[Input and Output Formats](./input-output-formats.md)
104
105
### Type System Integration
106
107
Type information and serialization support for Hadoop Writable types, enabling seamless integration with Flink's type system and runtime.
108
109
```java { .api }
110
class WritableTypeInfo<T extends Writable> extends TypeInformation<T> {
111
WritableTypeInfo(Class<T> typeClass);
112
static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
113
}
114
```
115
116
[Type System Integration](./type-system.md)
117
118
### Function Wrappers
119
120
Wrapper classes that adapt Hadoop Mapper and Reducer implementations to work as Flink functions, enabling reuse of existing MapReduce logic.
121
122
```java { .api }
123
class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
124
implements FlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
125
126
HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
127
HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
128
}
129
130
class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
131
implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
132
133
HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
134
HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
135
}
136
```
137
138
[Function Wrappers](./function-wrappers.md)
139
140
### Utility Classes
141
142
Helper methods and utility functions that simplify common Hadoop integration tasks, including parameter parsing and configuration management.
143
144
```java { .api }
145
class HadoopUtils {
146
static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
147
}
148
```
149
150
[Utility Classes](./utility-classes.md)
151
152
## Error Handling
153
154
The library handles various integration scenarios and errors:
155
156
- **IOException**: Thrown by factory methods when Hadoop configuration or file system access fails
157
- **Configuration Errors**: Proper error propagation when Hadoop JobConf or Job configuration is invalid
158
- **Type Mismatches**: Clear error messages when key/value types don't match Hadoop InputFormat expectations
159
- **Serialization Issues**: Detailed error reporting for Writable serialization problems
160
161
## Migration Notes
162
163
**Scala API Deprecation**: All Scala APIs (org.apache.flink.api.scala.hadoop.*) are deprecated as of Flink 1.20.2 per FLIP-265 and will be removed in a future major version. Users should migrate to the Java APIs which provide equivalent functionality with better long-term support.