or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-hadoop-compatibility

Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-hadoop-compatibility@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-compatibility@2.1.0

0

# Flink Hadoop Compatibility

1

2

Flink Hadoop Compatibility is an Apache Flink library that provides seamless integration between Apache Flink and Apache Hadoop ecosystems. It enables Flink applications to use existing Hadoop InputFormats, OutputFormats, and MapReduce functions without modification, supporting both the legacy mapred API and the newer mapreduce API.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-hadoop-compatibility

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-hadoop-compatibility</artifactId>

14

<version>2.1.0</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.flink.hadoopcompatibility.HadoopInputs;

22

import org.apache.flink.hadoopcompatibility.HadoopUtils;

23

import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;

24

import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;

25

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;

26

import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;

27

import org.apache.flink.api.java.typeutils.WritableTypeInfo;

28

```

29

30

## Basic Usage

31

32

```java

33

import org.apache.flink.hadoopcompatibility.HadoopInputs;

34

import org.apache.flink.api.java.ExecutionEnvironment;

35

import org.apache.flink.api.java.DataSet;

36

import org.apache.flink.api.java.tuple.Tuple2;

37

import org.apache.hadoop.io.LongWritable;

38

import org.apache.hadoop.io.Text;

39

import org.apache.hadoop.mapred.TextInputFormat;

40

41

// Create execution environment

42

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

43

44

// Create Hadoop input format wrapper

45

HadoopInputFormat<LongWritable, Text> hadoopInput =

46

HadoopInputs.readHadoopFile(

47

new TextInputFormat(),

48

LongWritable.class,

49

Text.class,

50

"hdfs://input/path"

51

);

52

53

// Use as Flink DataSet source

54

DataSet<Tuple2<LongWritable, Text>> dataset = env.createInput(hadoopInput);

55

56

// Process data using Flink operations

57

dataset.map(record -> record.f1.toString().toUpperCase())

58

.print();

59

```

60

61

## Architecture

62

63

Flink Hadoop Compatibility is built around several key components:

64

65

- **Input/Output Format Wrappers**: Adapts Hadoop InputFormats and OutputFormats to work with Flink's data processing model

66

- **API Version Support**: Dual support for legacy mapred API and newer mapreduce API variants

67

- **Type System Integration**: Custom TypeInformation and serialization for Hadoop Writable types

68

- **Function Wrappers**: Adapters that convert Hadoop Mappers and Reducers into Flink functions

69

- **Utility Classes**: Helper methods for configuration parsing and format creation

70

71

All wrapped formats produce and consume `Tuple2<K, V>` objects where `f0` is the key and `f1` is the value, maintaining compatibility with Hadoop's key-value paradigm.

72

73

## Capabilities

74

75

### Hadoop Input Integration

76

77

Core functionality for reading data from Hadoop InputFormats, supporting both file-based and generic input sources with full type safety.

78

79

```java { .api }

80

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

81

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

82

Class<K> key,

83

Class<V> value,

84

String inputPath

85

);

86

87

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

88

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

89

Class<K> key,

90

Class<V> value,

91

String inputPath,

92

JobConf job

93

);

94

95

public static <K, V> HadoopInputFormat<K, V> createHadoopInput(

96

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

97

Class<K> key,

98

Class<V> value,

99

JobConf job

100

);

101

```

102

103

[Input Formats](./input-formats.md)

104

105

### Hadoop Output Integration

106

107

Functionality for writing data to Hadoop OutputFormats, enabling Flink applications to output data in Hadoop-compatible formats.

108

109

```java { .api }

110

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

111

public HadoopOutputFormat(

112

org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,

113

JobConf job

114

);

115

116

public void writeRecord(Tuple2<K, V> record) throws IOException;

117

}

118

```

119

120

[Output Formats](./output-formats.md)

121

122

### MapReduce Function Integration

123

124

Wrappers that convert Hadoop Mappers and Reducers into Flink-compatible functions, enabling reuse of existing MapReduce logic.

125

126

```java { .api }

127

public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

128

extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {

129

130

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);

131

132

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);

133

134

public void flatMap(

135

Tuple2<KEYIN, VALUEIN> value,

136

Collector<Tuple2<KEYOUT, VALUEOUT>> out

137

) throws Exception;

138

}

139

```

140

141

[MapReduce Functions](./mapreduce-functions.md)

142

143

### Type System Integration

144

145

Custom type information and serialization support for Hadoop Writable types, ensuring seamless integration with Flink's type system.

146

147

```java { .api }

148

public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {

149

public WritableTypeInfo(Class<T> typeClass);

150

151

static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);

152

153

public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);

154

}

155

```

156

157

[Type System](./type-system.md)

158

159

### Configuration Utilities

160

161

Utility functions for handling Hadoop configuration and command-line argument parsing.

162

163

```java { .api }

164

public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;

165

```

166

167

[Utilities](./utilities.md)

168

169

## Common Types

170

171

```java { .api }

172

// Core Flink tuple type used throughout the API

173

import org.apache.flink.api.java.tuple.Tuple2;

174

175

// Hadoop configuration types

176

import org.apache.hadoop.mapred.JobConf;

177

import org.apache.hadoop.mapreduce.Job;

178

179

// Flink type system integration

180

import org.apache.flink.api.common.typeinfo.TypeInformation;

181

import org.apache.flink.api.java.typeutils.TypeExtractor;

182

```