or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdtype-system.mdutilities.md

output-formats.mddocs/

0

# Hadoop Output Formats

1

2

Integration for writing data to Hadoop OutputFormats from Flink applications. Supports both legacy mapred API and newer mapreduce API with automatic conversion from Flink Tuple2 objects to Hadoop key-value pairs.

3

4

## Capabilities

5

6

### Output Format Wrapper (mapred API)

7

8

Wrapper class for Hadoop OutputFormats using the legacy mapred API.

9

10

```java { .api }

11

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

12

13

/**

14

* Constructor with basic configuration

15

* @param mapredOutputFormat The Hadoop OutputFormat to wrap

16

* @param job JobConf configuration for the Hadoop job

17

*/

18

public HadoopOutputFormat(

19

org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,

20

JobConf job

21

);

22

23

/**

24

* Constructor with custom OutputCommitter

25

* @param mapredOutputFormat The Hadoop OutputFormat to wrap

26

* @param outputCommitterClass Class of the OutputCommitter to use

27

* @param job JobConf configuration for the Hadoop job

28

*/

29

public HadoopOutputFormat(

30

org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,

31

Class<OutputCommitter> outputCommitterClass,

32

JobConf job

33

);

34

35

/**

36

* Write a record to the Hadoop OutputFormat

37

* @param record The record to write as a Tuple2<K, V>

38

* @throws IOException if writing fails

39

*/

40

public void writeRecord(Tuple2<K, V> record) throws IOException;

41

}

42

```

43

44

**Usage Example:**

45

46

```java

47

import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;

48

import org.apache.flink.api.java.DataSet;

49

import org.apache.flink.api.java.tuple.Tuple2;

50

import org.apache.hadoop.mapred.TextOutputFormat;

51

import org.apache.hadoop.mapred.JobConf;

52

import org.apache.hadoop.io.LongWritable;

53

import org.apache.hadoop.io.Text;

54

import org.apache.hadoop.fs.Path;

55

56

// Configure JobConf for output

57

JobConf jobConf = new JobConf();

58

jobConf.set("mapred.output.dir", "hdfs://output/path");

59

jobConf.setOutputFormat(TextOutputFormat.class);

60

jobConf.setOutputKeyClass(LongWritable.class);

61

jobConf.setOutputValueClass(Text.class);

62

63

// Create Hadoop output format wrapper

64

HadoopOutputFormat<LongWritable, Text> hadoopOutput =

65

new HadoopOutputFormat<>(

66

new TextOutputFormat<LongWritable, Text>(),

67

jobConf

68

);

69

70

// Use with Flink DataSet

71

DataSet<Tuple2<LongWritable, Text>> dataset = // ... your dataset

72

dataset.output(hadoopOutput);

73

```

74

75

### Output Format Wrapper (mapreduce API)

76

77

Wrapper class for Hadoop OutputFormats using the newer mapreduce API.

78

79

```java { .api }

80

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

81

82

/**

83

* Constructor for mapreduce OutputFormat

84

* @param mapreduceOutputFormat The Hadoop OutputFormat to wrap

85

* @param job Job configuration for the Hadoop job

86

*/

87

public HadoopOutputFormat(

88

org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,

89

Job job

90

);

91

92

/**

93

* Write a record to the Hadoop OutputFormat

94

* @param record The record to write as a Tuple2<K, V>

95

* @throws IOException if writing fails

96

*/

97

public void writeRecord(Tuple2<K, V> record) throws IOException;

98

}

99

```

100

101

**Usage Example:**

102

103

```java

104

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;

105

import org.apache.flink.api.java.DataSet;

106

import org.apache.flink.api.java.tuple.Tuple2;

107

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

108

import org.apache.hadoop.mapreduce.Job;

109

import org.apache.hadoop.io.LongWritable;

110

import org.apache.hadoop.io.Text;

111

import org.apache.hadoop.fs.Path;

112

113

// Configure Job for output

114

Job job = Job.getInstance();

115

job.setOutputFormatClass(TextOutputFormat.class);

116

job.setOutputKeyClass(LongWritable.class);

117

job.setOutputValueClass(Text.class);

118

TextOutputFormat.setOutputPath(job, new Path("hdfs://output/path"));

119

120

// Create Hadoop output format wrapper

121

org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<LongWritable, Text> mapreduceOutput =

122

new org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<>(

123

new TextOutputFormat<LongWritable, Text>(),

124

job

125

);

126

127

// Use with Flink DataSet

128

DataSet<Tuple2<LongWritable, Text>> dataset = // ... your dataset

129

dataset.output(mapreduceOutput);

130

```

131

132

## Common Output Format Examples

133

134

### Text File Output

135

136

Writing plain text files using TextOutputFormat.

137

138

**mapred API:**

139

140

```java

141

import org.apache.hadoop.mapred.TextOutputFormat;

142

import org.apache.hadoop.mapred.JobConf;

143

import org.apache.hadoop.io.NullWritable;

144

import org.apache.hadoop.io.Text;

145

146

JobConf jobConf = new JobConf();

147

jobConf.set("mapred.output.dir", "hdfs://output/text");

148

jobConf.setOutputFormat(TextOutputFormat.class);

149

150

HadoopOutputFormat<NullWritable, Text> textOutput =

151

new HadoopOutputFormat<>(

152

new TextOutputFormat<NullWritable, Text>(),

153

jobConf

154

);

155

156

// Convert strings to Tuple2<NullWritable, Text>

157

DataSet<String> stringDataset = // ... your string dataset

158

DataSet<Tuple2<NullWritable, Text>> tuples = stringDataset.map(

159

s -> new Tuple2<>(NullWritable.get(), new Text(s))

160

);

161

tuples.output(textOutput);

162

```

163

164

**mapreduce API:**

165

166

```java

167

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

168

import org.apache.hadoop.mapreduce.Job;

169

import org.apache.hadoop.io.NullWritable;

170

import org.apache.hadoop.io.Text;

171

172

Job job = Job.getInstance();

173

job.setOutputFormatClass(TextOutputFormat.class);

174

TextOutputFormat.setOutputPath(job, new Path("hdfs://output/text"));

175

176

org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<NullWritable, Text> textOutput =

177

new org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<>(

178

new TextOutputFormat<NullWritable, Text>(),

179

job

180

);

181

```

182

183

### Sequence File Output

184

185

Writing Hadoop sequence files with key-value pairs.

186

187

**mapred API:**

188

189

```java

190

import org.apache.hadoop.mapred.SequenceFileOutputFormat;

191

import org.apache.hadoop.mapred.JobConf;

192

import org.apache.hadoop.io.IntWritable;

193

import org.apache.hadoop.io.Text;

194

195

JobConf jobConf = new JobConf();

196

jobConf.set("mapred.output.dir", "hdfs://output/sequence");

197

jobConf.setOutputFormat(SequenceFileOutputFormat.class);

198

jobConf.setOutputKeyClass(IntWritable.class);

199

jobConf.setOutputValueClass(Text.class);

200

201

HadoopOutputFormat<IntWritable, Text> sequenceOutput =

202

new HadoopOutputFormat<>(

203

new SequenceFileOutputFormat<IntWritable, Text>(),

204

jobConf

205

);

206

207

DataSet<Tuple2<IntWritable, Text>> keyValuePairs = // ... your dataset

208

keyValuePairs.output(sequenceOutput);

209

```

210

211

### Multiple Output Files

212

213

Using MultipleTextOutputFormat to write to multiple files based on keys.

214

215

```java

216

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

217

import org.apache.hadoop.mapred.JobConf;

218

import org.apache.hadoop.io.Text;

219

220

// Custom MultipleTextOutputFormat that partitions by key prefix

221

public static class KeyPartitionedOutput extends MultipleTextOutputFormat<Text, Text> {

222

@Override

223

protected String generateFileNameForKeyValue(Text key, Text value, String name) {

224

return key.toString().substring(0, 1) + "/" + name;

225

}

226

}

227

228

JobConf jobConf = new JobConf();

229

jobConf.set("mapred.output.dir", "hdfs://output/partitioned");

230

jobConf.setOutputFormat(KeyPartitionedOutput.class);

231

232

HadoopOutputFormat<Text, Text> partitionedOutput =

233

new HadoopOutputFormat<>(

234

new KeyPartitionedOutput(),

235

jobConf

236

);

237

```

238

239

## Configuration Patterns

240

241

### Basic Configuration

242

243

Standard configuration for common output scenarios.

244

245

```java

246

// mapred API configuration

247

JobConf jobConf = new JobConf();

248

jobConf.set("mapred.output.dir", outputPath);

249

jobConf.setOutputFormat(outputFormatClass);

250

jobConf.setOutputKeyClass(keyClass);

251

jobConf.setOutputValueClass(valueClass);

252

253

// mapreduce API configuration

254

Job job = Job.getInstance();

255

job.setOutputFormatClass(outputFormatClass);

256

job.setOutputKeyClass(keyClass);

257

job.setOutputValueClass(valueClass);

258

OutputFormat.setOutputPath(job, new Path(outputPath));

259

```

260

261

### Compression Configuration

262

263

Enabling compression for output files.

264

265

```java

266

// Enable compression in JobConf

267

JobConf jobConf = new JobConf();

268

jobConf.setBoolean("mapred.output.compress", true);

269

jobConf.setClass("mapred.output.compression.codec",

270

GzipCodec.class, CompressionCodec.class);

271

jobConf.set("mapred.output.compression.type", "BLOCK");

272

273

// Enable compression in Job

274

Job job = Job.getInstance();

275

job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", true);

276

job.getConfiguration().setClass("mapreduce.output.fileoutputformat.compress.codec",

277

GzipCodec.class, CompressionCodec.class);

278

```

279

280

### Custom Output Committer

281

282

Using custom OutputCommitter for advanced output coordination.

283

284

```java

285

import org.apache.hadoop.mapred.FileOutputCommitter;

286

287

// Custom committer that moves files to final location

288

public class CustomOutputCommitter extends FileOutputCommitter {

289

@Override

290

public void commitJob(JobContext context) throws IOException {

291

super.commitJob(context);

292

// Custom logic after job completion

293

}

294

}

295

296

HadoopOutputFormat<K, V> outputFormat =

297

new HadoopOutputFormat<>(

298

hadoopOutputFormat,

299

CustomOutputCommitter.class,

300

jobConf

301

);

302

```

303

304

## Key Design Patterns

305

306

### Tuple2 Input Convention

307

All output formats consume `Tuple2<K, V>` objects where:

308

- `f0` contains the key to be written

309

- `f1` contains the value to be written

310

311

### Configuration Flexibility

312

Both JobConf (mapred) and Job (mapreduce) configuration objects are supported, allowing use of existing Hadoop configuration patterns.

313

314

### OutputCommitter Support

315

Custom OutputCommitter classes can be specified for advanced output coordination and cleanup operations.

316

317

### Exception Handling

318

IOException is thrown for write operations, maintaining consistency with Hadoop's exception handling patterns.