or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdscala-api.mdtype-system.md

output-formats.mddocs/

0

# Output Format Integration

1

2

The Output Format Integration capability enables writing Flink DataSets to Hadoop OutputFormats, providing seamless integration with Hadoop ecosystem storage systems and custom output processing pipelines.

3

4

## Overview

5

6

Flink's Hadoop compatibility layer wraps Hadoop OutputFormats to accept data from Flink DataSets. The integration supports both legacy MapRed API and modern MapReduce API, automatically converting Flink Tuple2 objects or Scala tuples to Hadoop key-value pairs.

7

8

## HadoopOutputFormat Classes

9

10

### MapRed HadoopOutputFormat (Java)

11

12

```java { .api }

13

@Public

14

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

15

16

// Constructor with JobConf

17

public HadoopOutputFormat(

18

org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,

19

JobConf job);

20

21

// Constructor with OutputCommitter and JobConf

22

public HadoopOutputFormat(

23

org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,

24

Class<OutputCommitter> outputCommitterClass,

25

JobConf job);

26

27

// Write a record to the output

28

public void writeRecord(Tuple2<K, V> record) throws IOException;

29

}

30

```

31

32

### MapReduce HadoopOutputFormat (Java)

33

34

```java { .api }

35

@Public

36

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

37

38

// Constructor with Job

39

public HadoopOutputFormat(

40

org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,

41

Job job);

42

43

// Write a record to the output

44

public void writeRecord(Tuple2<K, V> record) throws IOException;

45

}

46

```

47

48

### MapRed HadoopOutputFormat (Scala)

49

50

```scala { .api }

51

@Public

52

class HadoopOutputFormat[K, V] extends HadoopOutputFormatBase[K, V, (K, V)] {

53

54

// Constructor with JobConf

55

def this(mapredOutputFormat: OutputFormat[K, V], job: JobConf);

56

57

// Constructor with OutputCommitter and JobConf

58

def this(

59

mapredOutputFormat: OutputFormat[K, V],

60

outputCommitterClass: Class[OutputCommitter],

61

job: JobConf);

62

63

// Write a record to the output

64

def writeRecord(record: (K, V)): Unit;

65

}

66

```

67

68

## Usage Examples

69

70

### Writing Text Files (Java)

71

72

```java

73

import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;

74

import org.apache.hadoop.mapred.TextOutputFormat;

75

import org.apache.hadoop.mapred.JobConf;

76

import org.apache.hadoop.io.Text;

77

import org.apache.hadoop.io.NullWritable;

78

79

// Configure output

80

JobConf conf = new JobConf();

81

conf.setOutputFormat(TextOutputFormat.class);

82

conf.setOutputKeyClass(NullWritable.class);

83

conf.setOutputValueClass(Text.class);

84

TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/output"));

85

86

// Create output format

87

HadoopOutputFormat<NullWritable, Text> hadoopOutput =

88

new HadoopOutputFormat<>(new TextOutputFormat<>(), conf);

89

90

// Prepare data for output

91

DataSet<String> textLines = env.fromElements("Line 1", "Line 2", "Line 3");

92

DataSet<Tuple2<NullWritable, Text>> outputData = textLines.map(

93

line -> new Tuple2<>(NullWritable.get(), new Text(line))

94

);

95

96

// Write to Hadoop output

97

outputData.output(hadoopOutput);

98

```

99

100

### Writing Sequence Files (Java)

101

102

```java

103

import org.apache.hadoop.mapred.SequenceFileOutputFormat;

104

import org.apache.hadoop.io.IntWritable;

105

import org.apache.hadoop.io.Text;

106

107

// Configure sequence file output

108

JobConf conf = new JobConf();

109

conf.setOutputFormat(SequenceFileOutputFormat.class);

110

conf.setOutputKeyClass(IntWritable.class);

111

conf.setOutputValueClass(Text.class);

112

SequenceFileOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/sequence-output"));

113

114

// Create output format

115

HadoopOutputFormat<IntWritable, Text> seqOutput =

116

new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), conf);

117

118

// Prepare key-value data

119

DataSet<Tuple2<IntWritable, Text>> keyValueData = env.fromElements(

120

new Tuple2<>(new IntWritable(1), new Text("First")),

121

new Tuple2<>(new IntWritable(2), new Text("Second")),

122

new Tuple2<>(new IntWritable(3), new Text("Third"))

123

);

124

125

// Write sequence file

126

keyValueData.output(seqOutput);

127

```

128

129

### Writing with MapReduce API (Java)

130

131

```java

132

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;

133

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

134

import org.apache.hadoop.mapreduce.Job;

135

import org.apache.hadoop.io.Text;

136

import org.apache.hadoop.io.NullWritable;

137

138

// Configure MapReduce job

139

Job job = Job.getInstance();

140

job.setOutputFormatClass(TextOutputFormat.class);

141

job.setOutputKeyClass(NullWritable.class);

142

job.setOutputValueClass(Text.class);

143

TextOutputFormat.setOutputPath(job, new Path("hdfs://namenode:port/mapreduce-output"));

144

145

// Create MapReduce output format

146

HadoopOutputFormat<NullWritable, Text> mapreduceOutput =

147

new HadoopOutputFormat<>(new TextOutputFormat<>(), job);

148

149

// Write data

150

DataSet<Tuple2<NullWritable, Text>> outputData = textLines.map(

151

line -> new Tuple2<>(NullWritable.get(), new Text(line))

152

);

153

outputData.output(mapreduceOutput);

154

```

155

156

### Scala Usage

157

158

```scala

159

import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat

160

import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}

161

import org.apache.hadoop.io.{Text, NullWritable}

162

import org.apache.hadoop.fs.Path

163

164

// Configure output

165

val conf = new JobConf()

166

conf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])

167

conf.setOutputKeyClass(classOf[NullWritable])

168

conf.setOutputValueClass(classOf[Text])

169

TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/scala-output"))

170

171

// Create output format

172

val hadoopOutput = new HadoopOutputFormat(new TextOutputFormat[NullWritable, Text](), conf)

173

174

// Prepare and write data

175

val textLines = env.fromElements("Scala line 1", "Scala line 2", "Scala line 3")

176

val outputData = textLines.map(line => (NullWritable.get(), new Text(line)))

177

outputData.output(hadoopOutput)

178

```

179

180

### Custom Output Formats

181

182

```java

183

import com.example.CustomOutputFormat;

184

import com.example.CustomKey;

185

import com.example.CustomValue;

186

187

// Configure custom output format

188

JobConf conf = new JobConf();

189

conf.setOutputFormat(CustomOutputFormat.class);

190

conf.setOutputKeyClass(CustomKey.class);

191

conf.setOutputValueClass(CustomValue.class);

192

conf.set("custom.output.property", "custom-value");

193

194

// Use custom output format

195

HadoopOutputFormat<CustomKey, CustomValue> customOutput =

196

new HadoopOutputFormat<>(new CustomOutputFormat(), conf);

197

198

// Process and write custom data

199

DataSet<Tuple2<CustomKey, CustomValue>> customData = processedData.map(

200

data -> new Tuple2<>(new CustomKey(data.getId()), new CustomValue(data.getContent()))

201

);

202

customData.output(customOutput);

203

```

204

205

## Output Committer Integration

206

207

Hadoop OutputFormats often use OutputCommitters to manage the output lifecycle. The Hadoop compatibility layer properly integrates with these committers.

208

209

```java

210

import org.apache.hadoop.mapred.FileOutputCommitter;

211

212

// Specify custom OutputCommitter

213

JobConf conf = new JobConf();

214

conf.setOutputFormat(TextOutputFormat.class);

215

// OutputCommitter is automatically handled, but can be customized if needed

216

217

HadoopOutputFormat<NullWritable, Text> outputWithCommitter =

218

new HadoopOutputFormat<>(

219

new TextOutputFormat<>(),

220

FileOutputCommitter.class, // Custom committer class

221

conf

222

);

223

```

224

225

## Partitioning and Multiple Outputs

226

227

When writing to partitioned outputs or multiple files:

228

229

```java

230

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

231

232

// Configure multiple output files

233

JobConf conf = new JobConf();

234

conf.setOutputFormat(MultipleTextOutputFormat.class);

235

conf.setOutputKeyClass(Text.class);

236

conf.setOutputValueClass(Text.class);

237

238

// The key determines the output file name

239

DataSet<Tuple2<Text, Text>> partitionedData = processedData.map(

240

data -> new Tuple2<>(

241

new Text("partition-" + data.getPartition()), // File name prefix

242

new Text(data.getContent()) // Content

243

)

244

);

245

246

HadoopOutputFormat<Text, Text> multiOutput =

247

new HadoopOutputFormat<>(new MultipleTextOutputFormat<>(), conf);

248

partitionedData.output(multiOutput);

249

```

250

251

## Error Handling

252

253

Output format operations may encounter various errors:

254

255

```java

256

try {

257

outputData.output(hadoopOutput);

258

env.execute("Hadoop Output Job");

259

} catch (IOException e) {

260

// Handle I/O errors during writing

261

logger.error("Failed to write to Hadoop output: " + e.getMessage());

262

} catch (Exception e) {

263

// Handle other execution errors

264

logger.error("Job execution failed: " + e.getMessage());

265

}

266

```

267

268

Common exceptions include:

269

- `IOException` - File system or network errors during writing

270

- `IllegalArgumentException` - Invalid configuration or parameters

271

- `RuntimeException` - Various Hadoop-related runtime errors

272

- `JobExecutionException` - Flink job execution failures

273

274

## Configuration Best Practices

275

276

### Setting Output Paths

277

278

```java

279

// Always use absolute paths for distributed file systems

280

TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/full/path/to/output"));

281

282

// For local file system (testing only)

283

TextOutputFormat.setOutputPath(conf, new Path("file:///tmp/local/output"));

284

```

285

286

### Compression Configuration

287

288

```java

289

// Enable compression for text output

290

conf.setBoolean("mapred.output.compress", true);

291

conf.setClass("mapred.output.compression.codec",

292

org.apache.hadoop.io.compress.GzipCodec.class,

293

CompressionCodec.class);

294

295

// For sequence files

296

SequenceFileOutputFormat.setCompressOutput(conf, true);

297

SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);

298

```

299

300

### Performance Tuning

301

302

```java

303

// Set appropriate block size for HDFS

304

conf.setLong("dfs.block.size", 134217728); // 128MB

305

306

// Configure buffer sizes

307

conf.setInt("io.file.buffer.size", 65536); // 64KB

308

309

// Set replication factor

310

conf.setInt("dfs.replication", 3);

311

```