or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

function-wrappers.mdindex.mdinput-output-formats.mdtype-system.mdutility-classes.md

input-output-formats.mddocs/

0

# Input and Output Formats

1

2

Comprehensive wrapper classes that enable Hadoop InputFormats and OutputFormats to work seamlessly with Flink DataSets. The library provides complete support for both the legacy MapRed API (org.apache.hadoop.mapred) and the modern MapReduce API (org.apache.hadoop.mapreduce).

3

4

## Capabilities

5

6

### HadoopInputs Factory Class

7

8

Central factory class providing convenient methods to create Flink InputFormat wrappers for various Hadoop InputFormat types.

9

10

```java { .api }

11

/**

12

* Utility class to use Apache Hadoop InputFormats with Apache Flink

13

*/

14

public class HadoopInputs {

15

16

/**

17

* Creates a Flink InputFormat wrapper for Hadoop mapred FileInputFormat with JobConf

18

* @param mapredInputFormat The Hadoop FileInputFormat to wrap

19

* @param key The key class type

20

* @param value The value class type

21

* @param inputPath Path to input files

22

* @param job JobConf for Hadoop configuration

23

* @return HadoopInputFormat wrapper for use with Flink

24

*/

25

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

26

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

27

Class<K> key, Class<V> value, String inputPath, JobConf job

28

);

29

30

/**

31

* Creates a Flink InputFormat wrapper for Hadoop mapred FileInputFormat with default JobConf

32

* @param mapredInputFormat The Hadoop FileInputFormat to wrap

33

* @param key The key class type

34

* @param value The value class type

35

* @param inputPath Path to input files

36

* @return HadoopInputFormat wrapper for use with Flink

37

*/

38

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(

39

org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,

40

Class<K> key, Class<V> value, String inputPath

41

);

42

43

/**

44

* Creates a Flink InputFormat for reading Hadoop sequence files

45

* @param key The key class type

46

* @param value The value class type

47

* @param inputPath Path to sequence files

48

* @return HadoopInputFormat wrapper for reading sequence files

49

* @throws IOException if sequence file access fails

50

*/

51

public static <K, V> HadoopInputFormat<K, V> readSequenceFile(

52

Class<K> key, Class<V> value, String inputPath

53

) throws IOException;

54

55

/**

56

* Creates a Flink InputFormat wrapper for any Hadoop mapred InputFormat

57

* @param mapredInputFormat The Hadoop InputFormat to wrap

58

* @param key The key class type

59

* @param value The value class type

60

* @param job JobConf for Hadoop configuration

61

* @return HadoopInputFormat wrapper for use with Flink

62

*/

63

public static <K, V> HadoopInputFormat<K, V> createHadoopInput(

64

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

65

Class<K> key, Class<V> value, JobConf job

66

);

67

68

/**

69

* Creates a Flink InputFormat wrapper for Hadoop mapreduce FileInputFormat with Job

70

* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap

71

* @param key The key class type

72

* @param value The value class type

73

* @param inputPath Path to input files

74

* @param job Job for Hadoop configuration

75

* @return HadoopInputFormat wrapper for use with Flink

76

* @throws IOException if file system access fails

77

*/

78

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(

79

org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,

80

Class<K> key, Class<V> value, String inputPath, Job job

81

) throws IOException;

82

83

/**

84

* Creates a Flink InputFormat wrapper for Hadoop mapreduce FileInputFormat with default Job

85

* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap

86

* @param key The key class type

87

* @param value The value class type

88

* @param inputPath Path to input files

89

* @return HadoopInputFormat wrapper for use with Flink

90

* @throws IOException if file system access fails

91

*/

92

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(

93

org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,

94

Class<K> key, Class<V> value, String inputPath

95

) throws IOException;

96

97

/**

98

* Creates a Flink InputFormat wrapper for any Hadoop mapreduce InputFormat

99

* @param mapreduceInputFormat The Hadoop InputFormat to wrap

100

* @param key The key class type

101

* @param value The value class type

102

* @param job Job for Hadoop configuration

103

* @return HadoopInputFormat wrapper for use with Flink

104

*/

105

public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(

106

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

107

Class<K> key, Class<V> value, Job job

108

);

109

}

110

```

111

112

**Usage Examples:**

113

114

```java

115

import org.apache.flink.hadoopcompatibility.HadoopInputs;

116

import org.apache.flink.api.java.ExecutionEnvironment;

117

import org.apache.hadoop.io.LongWritable;

118

import org.apache.hadoop.io.Text;

119

import org.apache.hadoop.mapred.TextInputFormat;

120

import org.apache.hadoop.mapred.JobConf;

121

122

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

123

124

// Read text files using MapRed API

125

DataSet<Tuple2<LongWritable, Text>> textData = env.createInput(

126

HadoopInputs.readHadoopFile(

127

new TextInputFormat(),

128

LongWritable.class,

129

Text.class,

130

"hdfs://path/to/input/*.txt"

131

)

132

);

133

134

// Read sequence files

135

DataSet<Tuple2<Text, IntWritable>> sequenceData = env.createInput(

136

HadoopInputs.readSequenceFile(

137

Text.class,

138

IntWritable.class,

139

"hdfs://path/to/sequence/files"

140

)

141

);

142

143

// Custom InputFormat with configuration

144

JobConf conf = new JobConf();

145

conf.set("custom.property", "value");

146

DataSet<Tuple2<Text, MyWritable>> customData = env.createInput(

147

HadoopInputs.createHadoopInput(

148

new MyCustomInputFormat(),

149

Text.class,

150

MyWritable.class,

151

conf

152

)

153

);

154

```

155

156

### MapReduce API InputFormat

157

158

InputFormat implementation for the modern Hadoop MapReduce API (org.apache.hadoop.mapreduce).

159

160

```java { .api }

161

/**

162

* InputFormat implementation allowing to use Hadoop (mapreduce) InputFormats with Flink

163

* @param <K> The key type

164

* @param <V> The value type

165

*/

166

@Public

167

public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, org.apache.hadoop.mapreduce.InputFormat<K, V>> {

168

169

/**

170

* Creates a HadoopInputFormat wrapper with Job configuration

171

* @param mapreduceInputFormat The Hadoop InputFormat to wrap

172

* @param key The key class type

173

* @param value The value class type

174

* @param job Job for Hadoop configuration

175

*/

176

public HadoopInputFormat(

177

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

178

Class<K> key, Class<V> value, Job job

179

);

180

181

/**

182

* Creates a HadoopInputFormat wrapper with default configuration

183

* @param mapreduceInputFormat The Hadoop InputFormat to wrap

184

* @param key The key class type

185

* @param value The value class type

186

* @throws IOException if default Job creation fails

187

*/

188

public HadoopInputFormat(

189

org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,

190

Class<K> key, Class<V> value

191

) throws IOException;

192

}

193

```

194

195

### MapReduce API OutputFormat

196

197

OutputFormat implementation for the modern Hadoop MapReduce API (org.apache.hadoop.mapreduce).

198

199

```java { .api }

200

/**

201

* OutputFormat implementation allowing to use Hadoop (mapreduce) OutputFormats with Flink

202

* @param <K> The key type

203

* @param <V> The value type

204

*/

205

@Public

206

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, org.apache.hadoop.mapreduce.OutputFormat<K, V>> {

207

208

/**

209

* Creates a HadoopOutputFormat wrapper with Job configuration

210

* @param mapreduceOutputFormat The Hadoop OutputFormat to wrap

211

* @param job Job for Hadoop configuration

212

*/

213

public HadoopOutputFormat(

214

org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,

215

Job job

216

);

217

}

218

```

219

220

### MapRed API InputFormat

221

222

InputFormat implementation for the legacy Hadoop MapRed API (org.apache.hadoop.mapred).

223

224

```java { .api }

225

/**

226

* Wrapper for using HadoopInputFormats (mapred-variant) with Flink

227

* @param <K> The key type

228

* @param <V> The value type

229

*/

230

@Public

231

public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, org.apache.hadoop.mapred.InputFormat<K, V>> {

232

233

/**

234

* Creates a HadoopInputFormat wrapper with JobConf configuration

235

* @param mapredInputFormat The Hadoop InputFormat to wrap

236

* @param key The key class type

237

* @param value The value class type

238

* @param job JobConf for Hadoop configuration

239

*/

240

public HadoopInputFormat(

241

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

242

Class<K> key, Class<V> value, JobConf job

243

);

244

245

/**

246

* Creates a HadoopInputFormat wrapper with default JobConf

247

* @param mapredInputFormat The Hadoop InputFormat to wrap

248

* @param key The key class type

249

* @param value The value class type

250

*/

251

public HadoopInputFormat(

252

org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,

253

Class<K> key, Class<V> value

254

);

255

}

256

```

257

258

### MapRed API OutputFormat

259

260

OutputFormat implementation for the legacy Hadoop MapRed API (org.apache.hadoop.mapred).

261

262

```java { .api }

263

/**

264

* Wrapper for using HadoopOutputFormats (mapred-variant) with Flink

265

* @param <K> The key type

266

* @param <V> The value type

267

*/

268

@Public

269

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, org.apache.hadoop.mapred.OutputFormat<K, V>> {

270

271

/**

272

* Creates a HadoopOutputFormat wrapper with JobConf configuration

273

* @param mapredOutputFormat The Hadoop OutputFormat to wrap

274

* @param job JobConf for Hadoop configuration

275

*/

276

public HadoopOutputFormat(

277

org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,

278

JobConf job

279

);

280

281

/**

282

* Creates a HadoopOutputFormat wrapper with custom OutputCommitter and JobConf

283

* @param mapredOutputFormat The Hadoop OutputFormat to wrap

284

* @param outputCommitterClass Custom OutputCommitter class

285

* @param job JobConf for Hadoop configuration

286

*/

287

public HadoopOutputFormat(

288

org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,

289

Class<OutputCommitter> outputCommitterClass, JobConf job

290

);

291

}

292

```

293

294

**Usage Examples:**

295

296

```java

297

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;

298

import org.apache.hadoop.mapreduce.Job;

299

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

300

import org.apache.hadoop.io.Text;

301

import org.apache.hadoop.io.NullWritable;

302

303

// Write to text files using MapReduce API

304

Job outputJob = Job.getInstance();

305

outputJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", "\t");

306

307

DataSet<Tuple2<NullWritable, Text>> results = // ... your data

308

results.output(new HadoopOutputFormat<>(

309

new TextOutputFormat<NullWritable, Text>(),

310

outputJob

311

));

312

```

313

314

## Key Design Patterns

315

316

### Tuple2 Integration

317

All data is exposed as Flink Tuple2<K,V> objects where f0 is the key and f1 is the value, providing seamless integration with Flink's DataSet API.

318

319

### Type Safety

320

All classes are parameterized with key (K) and value (V) types, ensuring compile-time type safety and preventing runtime ClassCastExceptions.

321

322

### Configuration Support

323

Full support for both Hadoop JobConf (MapRed API) and Job (MapReduce API) configurations, allowing fine-grained control over Hadoop behavior.

324

325

### Error Handling

326

Proper IOException propagation for file system and configuration errors, with detailed error messages for troubleshooting integration issues.