or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language-processing.mddata-wrappers.mdfile-utilities.mdindex.mdinput-output-formats.mdjob-configuration.mdmapreduce-processing.mdserialization.md

input-output-formats.mddocs/

0

# Input and Output Formats

1

2

Specialized InputFormat and OutputFormat implementations for reading and writing Avro data in various formats within MapReduce jobs. These formats provide seamless integration between Avro's schema-based serialization and Hadoop's distributed file system, supporting both legacy and modern MapReduce APIs.

3

4

## Capabilities

5

6

### Legacy API Input Formats

7

8

Input formats for the legacy `org.apache.hadoop.mapred` API that read Avro data and present it as wrapped objects.

9

10

```java { .api }

11

public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {

12

public RecordReader<AvroWrapper<T>, NullWritable> getRecordReader(

13

InputSplit split, JobConf job, Reporter reporter) throws IOException;

14

15

// Configuration constants

16

public static final String IGNORE_FILES_WITHOUT_EXTENSION_KEY = "avro.mapred.ignore.inputs.without.extension";

17

public static final boolean IGNORE_INPUTS_WITHOUT_EXTENSION_DEFAULT = true;

18

}

19

20

public class AvroAsTextInputFormat<T> extends AvroInputFormat<T> {

21

// Reads Avro data but presents as text representation

22

}

23

24

public class AvroUtf8InputFormat extends AvroInputFormat<Utf8> {

25

// Specialized for reading Avro UTF-8 string data

26

}

27

```

28

29

#### Usage Example

30

31

```java

32

import org.apache.avro.mapred.AvroInputFormat;

33

import org.apache.avro.mapred.AvroJob;

34

import org.apache.hadoop.mapred.JobConf;

35

36

// Configure job for Avro input

37

JobConf job = new JobConf();

38

job.setInputFormat(AvroInputFormat.class);

39

AvroJob.setInputSchema(job, userSchema);

40

41

// Input format will read Avro container files and produce AvroWrapper<T> keys

42

```

43

44

### Legacy API Output Formats

45

46

Output formats for writing Avro data from MapReduce jobs using the legacy API.

47

48

```java { .api }

49

public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {

50

public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(

51

FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException;

52

}

53

54

public class AvroTextOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {

55

// Writes Avro data as text representation

56

}

57

```

58

59

#### Usage Example

60

61

```java

62

import org.apache.avro.mapred.AvroOutputFormat;

63

import org.apache.avro.mapred.AvroJob;

64

65

// Configure job for Avro output

66

job.setOutputFormat(AvroOutputFormat.class);

67

AvroJob.setOutputSchema(job, outputSchema);

68

AvroJob.setOutputCodec(job, "snappy");

69

70

// Output format will write AvroWrapper<T> data as Avro container files

71

```

72

73

### New API Input Formats

74

75

Input formats for the modern `org.apache.hadoop.mapreduce` API with enhanced key-value separation.

76

77

```java { .api }

78

public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {

79

public RecordReader<AvroKey<T>, NullWritable> createRecordReader(

80

InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

81

}

82

83

public class AvroKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {

84

public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(

85

InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

86

}

87

88

public class CombineAvroKeyValueFileInputFormat<K,V> extends CombineFileInputFormat<AvroKey<K>, AvroValue<V>> {

89

// Optimized for processing many small Avro files

90

public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(

91

InputSplit split, TaskAttemptContext context) throws IOException;

92

}

93

```

94

95

#### Usage Example

96

97

```java

98

import org.apache.avro.mapreduce.AvroKeyInputFormat;

99

import org.apache.avro.mapreduce.AvroKeyValueInputFormat;

100

import org.apache.avro.mapreduce.AvroJob;

101

import org.apache.hadoop.mapreduce.Job;

102

103

// Single key input format

104

Job job = Job.getInstance();

105

job.setInputFormatClass(AvroKeyInputFormat.class);

106

AvroJob.setInputKeySchema(job, keySchema);

107

108

// Key-value input format

109

job.setInputFormatClass(AvroKeyValueInputFormat.class);

110

AvroJob.setInputKeySchema(job, keySchema);

111

AvroJob.setInputValueSchema(job, valueSchema);

112

113

// For many small files

114

job.setInputFormatClass(CombineAvroKeyValueFileInputFormat.class);

115

```

116

117

### New API Output Formats

118

119

Output formats for writing Avro data using the modern MapReduce API.

120

121

```java { .api }

122

public class AvroKeyOutputFormat<T> extends FileOutputFormat<AvroKey<T>, NullWritable> {

123

public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)

124

throws IOException, InterruptedException;

125

}

126

127

public class AvroKeyValueOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {

128

public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context)

129

throws IOException, InterruptedException;

130

}

131

132

public abstract class AvroOutputFormatBase<K,V> extends FileOutputFormat<K,V> {

133

// Base class providing common functionality for Avro output formats

134

protected static class AvroRecordWriter<K,V> extends RecordWriter<K,V> {

135

public void write(K key, V value) throws IOException, InterruptedException;

136

public void close(TaskAttemptContext context) throws IOException, InterruptedException;

137

}

138

}

139

```

140

141

#### Usage Example

142

143

```java

144

import org.apache.avro.mapreduce.AvroKeyOutputFormat;

145

import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;

146

import org.apache.avro.mapreduce.AvroJob;

147

148

// Single key output format

149

job.setOutputFormatClass(AvroKeyOutputFormat.class);

150

AvroJob.setOutputKeySchema(job, outputKeySchema);

151

152

// Key-value output format

153

job.setOutputFormatClass(AvroKeyValueOutputFormat.class);

154

AvroJob.setOutputKeySchema(job, outputKeySchema);

155

AvroJob.setOutputValueSchema(job, outputValueSchema);

156

```

157

158

### Sequence File Integration

159

160

Formats for reading and writing Avro data in Hadoop SequenceFile format.

161

162

```java { .api }

163

public class AvroSequenceFileInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {

164

public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(

165

InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

166

}

167

168

public class AvroSequenceFileOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {

169

public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context)

170

throws IOException, InterruptedException;

171

}

172

173

// Legacy API equivalent

174

public class SequenceFileInputFormat<K,V> extends FileInputFormat<AvroWrapper<K>, AvroWrapper<V>> {

175

public RecordReader<AvroWrapper<K>, AvroWrapper<V>> getRecordReader(

176

InputSplit split, JobConf job, Reporter reporter) throws IOException;

177

}

178

```

179

180

#### Usage Example

181

182

```java

183

import org.apache.avro.mapreduce.AvroSequenceFileInputFormat;

184

import org.apache.avro.mapreduce.AvroSequenceFileOutputFormat;

185

186

// Read from SequenceFile with Avro serialization

187

job.setInputFormatClass(AvroSequenceFileInputFormat.class);

188

AvroJob.setInputKeySchema(job, keySchema);

189

AvroJob.setInputValueSchema(job, valueSchema);

190

191

// Write to SequenceFile with Avro serialization

192

job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);

193

AvroJob.setOutputKeySchema(job, keySchema);

194

AvroJob.setOutputValueSchema(job, valueSchema);

195

```

196

197

### Record Readers and Writers

198

199

Base classes and implementations for reading and writing Avro records.

200

201

```java { .api }

202

public abstract class AvroRecordReaderBase<K,V,T> extends RecordReader<K,V> {

203

// Base class for Avro record readers

204

public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

205

public boolean nextKeyValue() throws IOException, InterruptedException;

206

public float getProgress() throws IOException, InterruptedException;

207

public void close() throws IOException;

208

209

// Abstract methods for specific implementations

210

public abstract K getCurrentKey() throws IOException, InterruptedException;

211

public abstract V getCurrentValue() throws IOException, InterruptedException;

212

}

213

214

public class AvroKeyRecordReader<T> extends AvroRecordReaderBase<AvroKey<T>, NullWritable, T> {

215

public AvroKey<T> getCurrentKey() throws IOException, InterruptedException;

216

public NullWritable getCurrentValue() throws IOException, InterruptedException;

217

}

218

219

public class AvroKeyValueRecordReader<K,V> extends AvroRecordReaderBase<AvroKey<K>, AvroValue<V>, Pair<K,V>> {

220

public AvroKey<K> getCurrentKey() throws IOException, InterruptedException;

221

public AvroValue<V> getCurrentValue() throws IOException, InterruptedException;

222

}

223

224

public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {

225

public void write(AvroKey<T> key, NullWritable value) throws IOException, InterruptedException;

226

public void close(TaskAttemptContext context) throws IOException, InterruptedException;

227

}

228

229

public class AvroKeyValueRecordWriter<K,V> extends RecordWriter<AvroKey<K>, AvroValue<V>> {

230

public void write(AvroKey<K> key, AvroValue<V> value) throws IOException, InterruptedException;

231

public void close(TaskAttemptContext context) throws IOException, InterruptedException;

232

}

233

```

234

235

### Multiple Output Support

236

237

Support for writing to multiple output files from a single job.

238

239

```java { .api }

240

// Legacy API

241

public class AvroMultipleOutputs {

242

public AvroMultipleOutputs(JobConf job);

243

public <T> void write(String name, AvroWrapper<T> key, NullWritable value) throws IOException;

244

public void close() throws IOException;

245

246

public static void addNamedOutput(JobConf job, String name, Class<? extends OutputFormat> outputFormat, Schema schema);

247

public static void setCountersEnabled(JobConf job, boolean enabled);

248

}

249

250

// New API

251

public class org.apache.avro.mapreduce.AvroMultipleOutputs {

252

public AvroMultipleOutputs(TaskAttemptContext context);

253

public <K> void write(K key, NullWritable value, String baseOutputPath) throws IOException, InterruptedException;

254

public <K,V> void write(K key, V value, String baseOutputPath) throws IOException, InterruptedException;

255

public void close() throws IOException, InterruptedException;

256

257

public static void addNamedOutput(Job job, String name, Class<? extends OutputFormat> outputFormat,

258

Class<?> keyClass, Class<?> valueClass);

259

public static void setCountersEnabled(Job job, boolean enabled);

260

}

261

```

262

263

#### Usage Example

264

265

```java

266

import org.apache.avro.mapreduce.AvroMultipleOutputs;

267

import org.apache.avro.mapreduce.AvroKeyOutputFormat;

268

269

// Configure multiple outputs

270

AvroMultipleOutputs.addNamedOutput(job, "users", AvroKeyOutputFormat.class, AvroKey.class, NullWritable.class);

271

AvroMultipleOutputs.addNamedOutput(job, "events", AvroKeyOutputFormat.class, AvroKey.class, NullWritable.class);

272

273

// In reducer

274

public class MyReducer extends Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable> {

275

private AvroMultipleOutputs multipleOutputs;

276

277

protected void setup(Context context) {

278

multipleOutputs = new AvroMultipleOutputs(context);

279

}

280

281

public void reduce(Text key, Iterable<IntWritable> values, Context context) {

282

// Write to different outputs based on logic

283

if (key.toString().startsWith("user_")) {

284

multipleOutputs.write(new AvroKey<>(userRecord), NullWritable.get(), "users");

285

} else {

286

multipleOutputs.write(new AvroKey<>(eventRecord), NullWritable.get(), "events");

287

}

288

}

289

290

protected void cleanup(Context context) throws IOException, InterruptedException {

291

multipleOutputs.close();

292

}

293

}

294

```

295

296

## Configuration and Integration

297

298

### File Extensions and Filtering

299

300

Control which files are processed by input formats:

301

302

```java

303

// Ignore files without .avro extension

304

job.setBoolean(AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, true);

305

306

// Process all files regardless of extension

307

job.setBoolean(AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false);

308

```

309

310

### Compression Support

311

312

All output formats support Avro's compression codecs:

313

314

```java

315

import org.apache.avro.mapred.AvroJob;

316

import org.apache.avro.mapreduce.AvroJob;

317

318

// Legacy API

319

AvroJob.setOutputCodec(job, "snappy");

320

AvroJob.setOutputCodec(job, "deflate");

321

AvroJob.setOutputCodec(job, "bzip2");

322

323

// New API - via configuration

324

job.getConfiguration().set("avro.mapreduce.output.codec", "snappy");

325

```

326

327

### Schema Configuration Integration

328

329

Input/output formats automatically use schemas configured via AvroJob:

330

331

```java

332

// Schemas set via AvroJob are automatically picked up by formats

333

AvroJob.setInputSchema(job, inputSchema); // Used by AvroInputFormat

334

AvroJob.setOutputSchema(job, outputSchema); // Used by AvroOutputFormat

335

336

// New API with separate key/value schemas

337

AvroJob.setInputKeySchema(job, keySchema); // Used by AvroKeyInputFormat

338

AvroJob.setInputValueSchema(job, valueSchema); // Used by AvroKeyValueInputFormat

339

```

340

341

## Performance Considerations

342

343

### Small Files Optimization

344

345

Use `CombineAvroKeyValueFileInputFormat` for many small files:

346

347

```java

348

job.setInputFormatClass(CombineAvroKeyValueFileInputFormat.class);

349

350

// Configure combine parameters

351

job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", 128 * 1024 * 1024);

352

job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", 64 * 1024 * 1024);

353

```

354

355

### Memory Management

356

357

Input formats handle memory efficiently by:

358

- Reusing reader objects

359

- Supporting lazy deserialization

360

- Proper resource cleanup

361

362

## Error Handling

363

364

Common issues and solutions:

365

366

- **Schema Not Found**: Ensure schema is configured via AvroJob before setting input/output format

367

- **File Format Errors**: Verify input files are valid Avro container files

368

- **Codec Errors**: Ensure output codec is supported and available on all nodes

369

- **Split Size Issues**: For large files, tune split size parameters

370

- **Memory Issues**: For large records, increase task memory limits