or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdtype-system.mdutilities.md

input-formats.mddocs/

0

# Hadoop Input Formats

1

2

Comprehensive integration for reading data from Hadoop InputFormats in Flink applications. Supports both legacy mapred API and newer mapreduce API with automatic key-value pair conversion to Flink Tuple2 objects.

3

4

## Capabilities

5

6

### File Input Format Reading (mapred API)

7

8

Creates Flink InputFormat wrappers for Hadoop FileInputFormats using the legacy mapred API.

9

10

```java { .api }

11

/**

12

* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapred API)

13

* @param mapredInputFormat The Hadoop FileInputFormat to wrap

14

* @param key The class of the key type

15

* @param value The class of the value type

16

* @param inputPath The path to read input data from

17

* @param job JobConf configuration for the Hadoop job

18

* @return A Flink InputFormat that wraps the Hadoop FileInputFormat

19

*/

20

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

21

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

22

Class<K> key,

23

Class<V> value,

24

String inputPath,

25

JobConf job

26

);

27

28

/**

29

* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapred API) with default JobConf

30

* @param mapredInputFormat The Hadoop FileInputFormat to wrap

31

* @param key The class of the key type

32

* @param value The class of the value type

33

* @param inputPath The path to read input data from

34

* @return A Flink InputFormat that wraps the Hadoop FileInputFormat

35

*/

36

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

37

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

38

Class<K> key,

39

Class<V> value,

40

String inputPath

41

);

42

```

43

44

**Usage Example:**

45

46

```java

47

import org.apache.flink.hadoopcompatibility.HadoopInputs;

48

import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;

49

import org.apache.hadoop.mapred.TextInputFormat;

50

import org.apache.hadoop.mapred.JobConf;

51

import org.apache.hadoop.io.LongWritable;

52

import org.apache.hadoop.io.Text;

53

54

// Reading text files using TextInputFormat

55

HadoopInputFormat<LongWritable, Text> textInput =

56

HadoopInputs.readHadoopFile(

57

new TextInputFormat(),

58

LongWritable.class,

59

Text.class,

60

"hdfs://data/input.txt"

61

);

62

63

// With custom JobConf

64

JobConf jobConf = new JobConf();

65

jobConf.set("mapreduce.input.fileinputformat.split.minsize", "1048576");

66

67

HadoopInputFormat<LongWritable, Text> configuredInput =

68

HadoopInputs.readHadoopFile(

69

new TextInputFormat(),

70

LongWritable.class,

71

Text.class,

72

"hdfs://data/input.txt",

73

jobConf

74

);

75

```

76

77

### File Input Format Reading (mapreduce API)

78

79

Creates Flink InputFormat wrappers for Hadoop FileInputFormats using the newer mapreduce API.

80

81

```java { .api }

82

/**

83

* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapreduce API)

84

* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap

85

* @param key The class of the key type

86

* @param value The class of the value type

87

* @param inputPath The path to read input data from

88

* @param job Job configuration for the Hadoop job

89

* @return A Flink InputFormat that wraps the Hadoop FileInputFormat

90

* @throws IOException if the Job configuration cannot be processed

91

*/

92

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(

93

org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,

94

Class<K> key,

95

Class<V> value,

96

String inputPath,

97

Job job

98

) throws IOException;

99

100

/**

101

* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapreduce API) with default Job

102

* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap

103

* @param key The class of the key type

104

* @param value The class of the value type

105

* @param inputPath The path to read input data from

106

* @return A Flink InputFormat that wraps the Hadoop FileInputFormat

107

* @throws IOException if the Job configuration cannot be created

108

*/

109

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(

110

org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,

111

Class<K> key,

112

Class<V> value,

113

String inputPath

114

) throws IOException;

115

```

116

117

**Usage Example:**

118

119

```java

120

import org.apache.flink.hadoopcompatibility.HadoopInputs;

121

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

122

import org.apache.hadoop.mapreduce.Job;

123

import org.apache.hadoop.io.LongWritable;

124

import org.apache.hadoop.io.Text;

125

126

// Reading text files using mapreduce API

127

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> mapreduceInput =

128

HadoopInputs.readHadoopFile(

129

new TextInputFormat(),

130

LongWritable.class,

131

Text.class,

132

"hdfs://data/input.txt"

133

);

134

135

// With custom Job configuration

136

Job job = Job.getInstance();

137

job.getConfiguration().set("mapreduce.input.textinputformat.record.delimiter", "\n");

138

139

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> configuredMapreduceInput =

140

HadoopInputs.readHadoopFile(

141

new TextInputFormat(),

142

LongWritable.class,

143

Text.class,

144

"hdfs://data/input.txt",

145

job

146

);

147

```

148

149

### Sequence File Reading

150

151

Specialized method for reading Hadoop sequence files with automatic format configuration.

152

153

```java { .api }

154

/**

155

* Creates a Flink InputFormat to read a Hadoop sequence file for the given key and value classes

156

* @param key The class of the key type

157

* @param value The class of the value type

158

* @param inputPath The path to the sequence file

159

* @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat

160

* @throws IOException if the sequence file cannot be accessed

161

*/

162

public static <K, V> HadoopInputFormat<K, V> readSequenceFile(

163

Class<K> key,

164

Class<V> value,

165

String inputPath

166

) throws IOException;

167

```

168

169

**Usage Example:**

170

171

```java

172

import org.apache.flink.hadoopcompatibility.HadoopInputs;

173

import org.apache.hadoop.io.IntWritable;

174

import org.apache.hadoop.io.Text;

175

176

// Reading sequence files

177

HadoopInputFormat<IntWritable, Text> sequenceInput =

178

HadoopInputs.readSequenceFile(

179

IntWritable.class,

180

Text.class,

181

"hdfs://data/sequence.seq"

182

);

183

```

184

185

### Generic Input Format Creation (mapred API)

186

187

Creates Flink InputFormat wrappers for any Hadoop InputFormat using the mapred API.

188

189

```java { .api }

190

/**

191

* Creates a Flink InputFormat that wraps the given Hadoop InputFormat (mapred API)

192

* @param mapredInputFormat The Hadoop InputFormat to wrap

193

* @param key The class of the key type

194

* @param value The class of the value type

195

* @param job JobConf configuration for the Hadoop job

196

* @return A Flink InputFormat that wraps the Hadoop InputFormat

197

*/

198

public static <K, V> HadoopInputFormat<K, V> createHadoopInput(

199

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

200

Class<K> key,

201

Class<V> value,

202

JobConf job

203

);

204

```

205

206

**Usage Example:**

207

208

```java

209

import org.apache.flink.hadoopcompatibility.HadoopInputs;

210

import org.apache.hadoop.mapred.KeyValueTextInputFormat;

211

import org.apache.hadoop.mapred.JobConf;

212

import org.apache.hadoop.io.Text;

213

214

// Using KeyValueTextInputFormat

215

JobConf jobConf = new JobConf();

216

jobConf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");

217

218

HadoopInputFormat<Text, Text> keyValueInput =

219

HadoopInputs.createHadoopInput(

220

new KeyValueTextInputFormat(),

221

Text.class,

222

Text.class,

223

jobConf

224

);

225

```

226

227

### Generic Input Format Creation (mapreduce API)

228

229

Creates Flink InputFormat wrappers for any Hadoop InputFormat using the mapreduce API.

230

231

```java { .api }

232

/**

233

* Creates a Flink InputFormat that wraps the given Hadoop InputFormat (mapreduce API)

234

* @param mapreduceInputFormat The Hadoop InputFormat to wrap

235

* @param key The class of the key type

236

* @param value The class of the value type

237

* @param job Job configuration for the Hadoop job

238

* @return A Flink InputFormat that wraps the Hadoop InputFormat

239

*/

240

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(

241

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

242

Class<K> key,

243

Class<V> value,

244

Job job

245

);

246

```

247

248

**Usage Example:**

249

250

```java

251

import org.apache.flink.hadoopcompatibility.HadoopInputs;

252

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

253

import org.apache.hadoop.mapreduce.Job;

254

import org.apache.hadoop.io.Text;

255

256

// Using mapreduce KeyValueTextInputFormat

257

Job job = Job.getInstance();

258

job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

259

260

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<Text, Text> mapreduceKeyValueInput =

261

HadoopInputs.createHadoopInput(

262

new KeyValueTextInputFormat(),

263

Text.class,

264

Text.class,

265

job

266

);

267

```

268

269

## Input Format Classes

270

271

### HadoopInputFormat (mapred API)

272

273

Wrapper class for Hadoop InputFormats using the mapred API.

274

275

```java { .api }

276

public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>

277

implements ResultTypeQueryable<Tuple2<K, V>> {

278

279

/**

280

* Constructor with full configuration

281

* @param mapredInputFormat The Hadoop InputFormat to wrap

282

* @param key The class of the key type

283

* @param value The class of the value type

284

* @param job JobConf configuration

285

*/

286

public HadoopInputFormat(

287

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

288

Class<K> key,

289

Class<V> value,

290

JobConf job

291

);

292

293

/**

294

* Constructor with default JobConf

295

* @param mapredInputFormat The Hadoop InputFormat to wrap

296

* @param key The class of the key type

297

* @param value The class of the value type

298

*/

299

public HadoopInputFormat(

300

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

301

Class<K> key,

302

Class<V> value

303

);

304

305

/**

306

* Read the next record from the Hadoop InputFormat

307

* @param record Reusable record object

308

* @return The next record as a Tuple2

309

* @throws IOException if reading fails

310

*/

311

public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;

312

313

/**

314

* Get type information for the produced Tuple2 type

315

* @return TypeInformation for Tuple2<K, V>

316

*/

317

public TypeInformation<Tuple2<K, V>> getProducedType();

318

}

319

```

320

321

### HadoopInputFormat (mapreduce API)

322

323

Wrapper class for Hadoop InputFormats using the mapreduce API.

324

325

```java { .api }

326

public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>

327

implements ResultTypeQueryable<Tuple2<K, V>> {

328

329

/**

330

* Constructor with full configuration

331

* @param mapreduceInputFormat The Hadoop InputFormat to wrap

332

* @param key The class of the key type

333

* @param value The class of the value type

334

* @param job Job configuration

335

*/

336

public HadoopInputFormat(

337

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

338

Class<K> key,

339

Class<V> value,

340

Job job

341

);

342

343

/**

344

* Constructor with default Job configuration

345

* @param mapreduceInputFormat The Hadoop InputFormat to wrap

346

* @param key The class of the key type

347

* @param value The class of the value type

348

* @throws IOException if Job configuration cannot be created

349

*/

350

public HadoopInputFormat(

351

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

352

Class<K> key,

353

Class<V> value

354

) throws IOException;

355

356

/**

357

* Read the next record from the Hadoop InputFormat

358

* @param record Reusable record object

359

* @return The next record as a Tuple2

360

* @throws IOException if reading fails

361

*/

362

public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;

363

364

/**

365

* Get type information for the produced Tuple2 type

366

* @return TypeInformation for Tuple2<K, V>

367

*/

368

public TypeInformation<Tuple2<K, V>> getProducedType();

369

}

370

```

371

372

## Key Design Patterns

373

374

### Type Safety

375

All InputFormat wrappers are generically typed with `<K, V>` parameters, ensuring compile-time type safety and proper type inference in Flink applications.

376

377

### Tuple2 Convention

378

All input formats produce `Tuple2<K, V>` objects where:

379

- `f0` contains the key of type K

380

- `f1` contains the value of type V

381

382

### Configuration Flexibility

383

Both JobConf (mapred) and Job (mapreduce) configuration objects are supported, with convenient overloads providing default configurations when not specified.

384

385

### Exception Handling

386

IOException is thrown for I/O operations, maintaining consistency with Hadoop's exception handling patterns.