or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdscala-api.mdtype-system.md

input-formats.mddocs/

0

# Input Format Integration

1

2

The Input Format Integration capability provides comprehensive support for using Hadoop InputFormats within Flink applications. This enables reading data from various Hadoop-compatible sources including HDFS files, HBase tables, and custom data sources.

3

4

## Overview

5

6

Flink's Hadoop compatibility layer wraps Hadoop InputFormats to work seamlessly with Flink's DataSet API. The integration supports both legacy MapRed API and modern MapReduce API, automatically converting Hadoop key-value pairs to Flink Tuple2 objects or Scala tuples.

7

8

## HadoopInputs Utility Class (Java)

9

10

The primary entry point for creating Hadoop InputFormat wrappers in Java.

11

12

### MapRed API Methods

13

14

```java { .api }

15

// Read Hadoop FileInputFormat with custom JobConf

16

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

17

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

18

Class<K> key,

19

Class<V> value,

20

String inputPath,

21

JobConf job);

22

23

// Read Hadoop FileInputFormat with default JobConf

24

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

25

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

26

Class<K> key,

27

Class<V> value,

28

String inputPath);

29

30

// Read Hadoop SequenceFile

31

public static <K, V> HadoopInputFormat<K, V> readSequenceFile(

32

Class<K> key,

33

Class<V> value,

34

String inputPath) throws IOException;

35

36

// Create wrapper for any Hadoop InputFormat

37

public static <K, V> HadoopInputFormat<K, V> createHadoopInput(

38

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

39

Class<K> key,

40

Class<V> value,

41

JobConf job);

42

```

43

44

### MapReduce API Methods

45

46

```java { .api }

47

// Read Hadoop FileInputFormat with custom Job

48

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(

49

org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,

50

Class<K> key,

51

Class<V> value,

52

String inputPath,

53

Job job) throws IOException;

54

55

// Read Hadoop FileInputFormat with default Job

56

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(

57

org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,

58

Class<K> key,

59

Class<V> value,

60

String inputPath) throws IOException;

61

62

// Create wrapper for any MapReduce InputFormat

63

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(

64

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

65

Class<K> key,

66

Class<V> value,

67

Job job);

68

```

69

70

## HadoopInputFormat Classes

71

72

### MapRed HadoopInputFormat

73

74

```java { .api }

75

@Public

76

public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>

77

implements ResultTypeQueryable<Tuple2<K, V>> {

78

79

// Constructor with JobConf

80

public HadoopInputFormat(

81

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

82

Class<K> key,

83

Class<V> value,

84

JobConf job);

85

86

// Constructor with default JobConf

87

public HadoopInputFormat(

88

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

89

Class<K> key,

90

Class<V> value);

91

92

// Read next record from input

93

public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;

94

95

// Get type information for produced tuples

96

public TypeInformation<Tuple2<K, V>> getProducedType();

97

}

98

```

99

100

### MapReduce HadoopInputFormat

101

102

```java { .api }

103

@Public

104

public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>

105

implements ResultTypeQueryable<Tuple2<K, V>> {

106

107

// Constructor with Job

108

public HadoopInputFormat(

109

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

110

Class<K> key,

111

Class<V> value,

112

Job job);

113

114

// Constructor with default Job

115

public HadoopInputFormat(

116

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

117

Class<K> key,

118

Class<V> value) throws IOException;

119

120

// Read next record from input

121

public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;

122

123

// Get type information for produced tuples

124

public TypeInformation<Tuple2<K, V>> getProducedType();

125

}

126

```

127

128

## Scala Input Formats

129

130

### Scala HadoopInputs Object

131

132

```scala { .api }

133

object HadoopInputs {

134

// MapRed API methods

135

def readHadoopFile[K, V](

136

mapredInputFormat: MapredFileInputFormat[K, V],

137

key: Class[K],

138

value: Class[V],

139

inputPath: String,

140

job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

141

142

def readHadoopFile[K, V](

143

mapredInputFormat: MapredFileInputFormat[K, V],

144

key: Class[K],

145

value: Class[V],

146

inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

147

148

def readSequenceFile[K, V](

149

key: Class[K],

150

value: Class[V],

151

inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

152

153

def createHadoopInput[K, V](

154

mapredInputFormat: MapredInputFormat[K, V],

155

key: Class[K],

156

value: Class[V],

157

job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

158

159

// MapReduce API methods

160

def readHadoopFile[K, V](

161

mapreduceInputFormat: MapreduceFileInputFormat[K, V],

162

key: Class[K],

163

value: Class[V],

164

inputPath: String,

165

job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];

166

167

def createHadoopInput[K, V](

168

mapreduceInputFormat: MapreduceInputFormat[K, V],

169

key: Class[K],

170

value: Class[V],

171

job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];

172

}

173

```

174

175

### Scala HadoopInputFormat Classes

176

177

```scala { .api }

178

// MapRed Scala InputFormat

179

@Public

180

class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {

181

def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: JobConf);

182

def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);

183

def nextRecord(reuse: (K, V)): (K, V);

184

}

185

186

// MapReduce Scala InputFormat

187

@Public

188

class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {

189

def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: Job);

190

def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);

191

def nextRecord(reuse: (K, V)): (K, V);

192

}

193

```

194

195

## Usage Examples

196

197

### Reading Text Files

198

199

```java

200

import org.apache.flink.hadoopcompatibility.HadoopInputs;

201

import org.apache.hadoop.mapred.TextInputFormat;

202

import org.apache.hadoop.io.LongWritable;

203

import org.apache.hadoop.io.Text;

204

205

// Create input format for text files

206

DataSet<Tuple2<LongWritable, Text>> textData = env.createInput(

207

HadoopInputs.readHadoopFile(

208

new TextInputFormat(),

209

LongWritable.class,

210

Text.class,

211

"hdfs://namenode:port/path/to/textfiles"

212

)

213

);

214

```

215

216

### Reading Sequence Files

217

218

```java

219

import org.apache.hadoop.io.IntWritable;

220

import org.apache.hadoop.io.Text;

221

222

// Read sequence files with specific key-value types

223

DataSet<Tuple2<IntWritable, Text>> sequenceData = env.createInput(

224

HadoopInputs.readSequenceFile(

225

IntWritable.class,

226

Text.class,

227

"hdfs://namenode:port/path/to/sequence/files"

228

)

229

);

230

```

231

232

### Using Custom InputFormats

233

234

```java

235

import org.apache.hadoop.mapred.JobConf;

236

import com.example.CustomInputFormat;

237

import com.example.CustomKey;

238

import com.example.CustomValue;

239

240

// Configure custom input format

241

JobConf conf = new JobConf();

242

conf.setInputFormat(CustomInputFormat.class);

243

conf.set("custom.property", "value");

244

245

// Create wrapper for custom InputFormat

246

DataSet<Tuple2<CustomKey, CustomValue>> customData = env.createInput(

247

HadoopInputs.createHadoopInput(

248

new CustomInputFormat(),

249

CustomKey.class,

250

CustomValue.class,

251

conf

252

)

253

);

254

```

255

256

### Scala Usage

257

258

```scala

259

import org.apache.flink.hadoopcompatibility.scala.HadoopInputs

260

import org.apache.hadoop.mapred.TextInputFormat

261

import org.apache.hadoop.io.{LongWritable, Text}

262

263

// Read text files with Scala

264

val textData: DataSet[(LongWritable, Text)] = env.createInput(

265

HadoopInputs.readHadoopFile(

266

new TextInputFormat(),

267

classOf[LongWritable],

268

classOf[Text],

269

"hdfs://namenode:port/path/to/textfiles"

270

)

271

)

272

273

// Extract just the text content

274

val lines = textData.map(_._2.toString)

275

```

276

277

## Input Split Handling

278

279

The Hadoop compatibility layer automatically handles input split distribution across Flink's parallel execution environment.

280

281

```java { .api }

282

// Input split wrapper classes (used internally)

283

@PublicEvolving

284

public class HadoopInputSplit {

285

// MapRed input split wrapper

286

// Used internally by HadoopInputFormat

287

}

288

289

@PublicEvolving

290

public class HadoopInputSplit {

291

// MapReduce input split wrapper

292

// Used internally by HadoopInputFormat

293

}

294

```

295

296

## Error Handling

297

298

Input format operations may throw the following exceptions:

299

300

- `IOException` - When reading from input fails or configuration is invalid

301

- `ClassNotFoundException` - When specified key/value classes cannot be found

302

- `IllegalArgumentException` - When invalid parameters are provided

303

- `RuntimeException` - For various Hadoop-related runtime errors

304

305

Always handle these exceptions appropriately in your Flink programs:

306

307

```java

308

try {

309

DataSet<Tuple2<LongWritable, Text>> input = env.createInput(

310

HadoopInputs.readHadoopFile(

311

new TextInputFormat(),

312

LongWritable.class,

313

Text.class,

314

inputPath

315

)

316

);

317

} catch (IOException e) {

318

// Handle input/output errors

319

logger.error("Failed to create Hadoop input: " + e.getMessage());

320

}

321

```