or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language-processing.mddata-wrappers.mdfile-utilities.mdindex.mdinput-output-formats.mdjob-configuration.mdmapreduce-processing.mdserialization.md

mapreduce-processing.mddocs/

0

# MapReduce Processing Framework

1

2

Base classes and utilities for implementing Avro-aware mappers and reducers in the legacy MapReduce API. This framework provides abstract base classes that handle Avro data serialization and deserialization automatically, allowing developers to focus on business logic while maintaining type safety and schema evolution support.

3

4

## Capabilities

5

6

### Avro Mapper Base Class

7

8

Abstract base class for implementing mappers that process Avro data in the legacy MapReduce API.

9

10

```java { .api }

11

public abstract class AvroMapper<IN,OUT> extends Configured implements JobConfigurable, Closeable {

12

// Abstract method to implement business logic

13

public abstract void map(IN datum, AvroCollector<OUT> collector, Reporter reporter)

14

throws IOException;

15

16

// Lifecycle methods (from interfaces)

17

public void configure(JobConf jobConf);

18

public void close() throws IOException;

19

}

20

```

21

22

#### Usage Example

23

24

```java

25

import org.apache.avro.mapred.AvroMapper;

26

import org.apache.avro.mapred.AvroCollector;

27

import org.apache.avro.generic.GenericRecord;

28

import org.apache.hadoop.mapred.Reporter;

29

30

// Implement mapper for processing user records

31

public class UserMapper extends AvroMapper<GenericRecord, GenericRecord> {

32

33

@Override

34

public void map(GenericRecord user, AvroCollector<GenericRecord> collector, Reporter reporter)

35

throws IOException {

36

37

// Access Avro data directly (no wrapper needed)

38

String name = user.get("name").toString();

39

Integer age = (Integer) user.get("age");

40

41

// Filter and transform data

42

if (age >= 18) {

43

GenericRecord output = new GenericRecordBuilder(outputSchema)

44

.set("name", name.toUpperCase())

45

.set("age", age)

46

.set("category", "adult")

47

.build();

48

49

// Collect output (automatically wrapped)

50

collector.collect(output);

51

}

52

53

// Update counters

54

reporter.incrCounter("USER_PROCESSING", "TOTAL_USERS", 1);

55

if (age >= 18) {

56

reporter.incrCounter("USER_PROCESSING", "ADULT_USERS", 1);

57

}

58

}

59

60

@Override

61

public void configure(JobConf jobConf) {

62

// Initialize mapper with job configuration

63

this.outputSchema = AvroJob.getMapOutputSchema(jobConf);

64

}

65

66

private Schema outputSchema;

67

}

68

```

69

70

### Avro Reducer Base Class

71

72

Abstract base class for implementing reducers that process grouped Avro data.

73

74

```java { .api }

75

public abstract class AvroReducer<K,V,OUT> extends Configured implements JobConfigurable, Closeable {

76

// Abstract method for business logic

77

public abstract void reduce(K key, Iterable<V> values, AvroCollector<OUT> collector, Reporter reporter)

78

throws IOException;

79

80

// Lifecycle methods (from interfaces)

81

public void configure(JobConf jobConf);

82

public void close() throws IOException;

83

}

84

```

85

86

#### Usage Example

87

88

```java

89

import org.apache.avro.mapred.AvroReducer;

90

import org.apache.avro.mapred.AvroCollector;

91

import org.apache.avro.generic.GenericRecord;

92

import org.apache.hadoop.mapred.Reporter;

93

94

// Implement reducer for aggregating user data by department

95

public class UserAggregateReducer extends AvroReducer<CharSequence, GenericRecord, GenericRecord> {

96

97

@Override

98

public void reduce(CharSequence department, Iterable<GenericRecord> users,

99

AvroCollector<GenericRecord> collector, Reporter reporter)

100

throws IOException {

101

102

int totalUsers = 0;

103

int totalAge = 0;

104

List<String> userNames = new ArrayList<>();

105

106

// Process all users in this department

107

for (GenericRecord user : users) {

108

totalUsers++;

109

totalAge += (Integer) user.get("age");

110

userNames.add(user.get("name").toString());

111

}

112

113

// Create aggregated output

114

GenericRecord summary = new GenericRecordBuilder(outputSchema)

115

.set("department", department.toString())

116

.set("user_count", totalUsers)

117

.set("average_age", totalAge / totalUsers)

118

.set("user_names", userNames)

119

.build();

120

121

collector.collect(summary);

122

123

// Update counters

124

reporter.incrCounter("AGGREGATION", "DEPARTMENTS_PROCESSED", 1);

125

reporter.incrCounter("AGGREGATION", "USERS_AGGREGATED", totalUsers);

126

}

127

128

@Override

129

public void configure(JobConf jobConf) {

130

this.outputSchema = AvroJob.getOutputSchema(jobConf);

131

}

132

133

private Schema outputSchema;

134

}

135

```

136

137

### Avro Collector

138

139

Abstract collector interface for gathering output from mappers and reducers.

140

141

```java { .api }

142

public abstract class AvroCollector<T> extends Configured {

143

// Core collection method

144

public abstract void collect(T datum) throws IOException;

145

}

146

```

147

148

The framework provides concrete implementations that automatically wrap collected data in AvroWrapper objects for integration with Hadoop's MapReduce infrastructure.

149

150

#### Usage Example

151

152

```java

153

// In mapper or reducer

154

public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) {

155

// Process input

156

GenericRecord output = processRecord(input);

157

158

// Collect output - automatically wrapped for Hadoop

159

collector.collect(output);

160

161

// Multiple outputs are supported

162

if (shouldEmitSummary(input)) {

163

GenericRecord summary = createSummary(input);

164

collector.collect(summary);

165

}

166

}

167

```

168

169

### Hadoop Integration Classes

170

171

Classes that bridge Avro processing with standard Hadoop mappers and reducers.

172

173

```java { .api }

174

public abstract class HadoopMapper<IN,OUT> extends AvroMapper<IN,OUT> {

175

// Integration with standard Hadoop Mapper interface

176

}

177

178

public abstract class HadoopReducer<K,V,OUT> extends AvroReducer<K,V,OUT> {

179

// Integration with standard Hadoop Reducer interface

180

}

181

182

public abstract class HadoopCombiner<K,V,OUT> extends AvroReducer<K,V,OUT> {

183

// Specialized for combine operations

184

}

185

186

public abstract class HadoopReducerBase<K,V,OUT> extends AvroReducer<K,V,OUT> {

187

// Base class with additional Hadoop-specific functionality

188

}

189

```

190

191

### Map Collector Implementation

192

193

Specific collector implementation for map phase output.

194

195

```java { .api }

196

public class MapCollector<T> extends AvroCollector<T> {

197

// Constructor

198

public MapCollector(OutputCollector<AvroWrapper<T>, NullWritable> collector);

199

200

// Collect implementation

201

public void collect(T datum) throws IOException;

202

}

203

```

204

205

## Complete Example: Word Count

206

207

Here's a complete example showing mapper and reducer implementation:

208

209

### Word Count Mapper

210

211

```java

212

import org.apache.avro.mapred.AvroMapper;

213

import org.apache.avro.mapred.AvroCollector;

214

import org.apache.avro.mapred.Pair;

215

import org.apache.avro.util.Utf8;

216

217

public class WordCountMapper extends AvroMapper<Utf8, Pair<Utf8, Integer>> {

218

219

@Override

220

public void map(Utf8 line, AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter)

221

throws IOException {

222

223

// Split line into words

224

String[] words = line.toString().toLowerCase().split("\\W+");

225

226

// Emit each word with count of 1

227

for (String word : words) {

228

if (!word.isEmpty()) {

229

Pair<Utf8, Integer> pair = new Pair<>(new Utf8(word), 1);

230

collector.collect(pair);

231

}

232

}

233

234

reporter.incrCounter("WORDS", "LINES_PROCESSED", 1);

235

reporter.incrCounter("WORDS", "WORDS_EMITTED", words.length);

236

}

237

}

238

```

239

240

### Word Count Reducer

241

242

```java

243

import org.apache.avro.mapred.AvroReducer;

244

import org.apache.avro.mapred.AvroCollector;

245

import org.apache.avro.mapred.Pair;

246

import org.apache.avro.util.Utf8;

247

248

public class WordCountReducer extends AvroReducer<Utf8, Integer, Pair<Utf8, Integer>> {

249

250

@Override

251

public void reduce(Utf8 word, Iterable<Integer> counts,

252

AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter)

253

throws IOException {

254

255

// Sum all counts for this word

256

int totalCount = 0;

257

for (Integer count : counts) {

258

totalCount += count;

259

}

260

261

// Emit word with total count

262

Pair<Utf8, Integer> result = new Pair<>(word, totalCount);

263

collector.collect(result);

264

265

reporter.incrCounter("WORDS", "UNIQUE_WORDS", 1);

266

}

267

}

268

```

269

270

### Job Configuration

271

272

```java

273

import org.apache.avro.mapred.AvroJob;

274

import org.apache.avro.mapred.AvroInputFormat;

275

import org.apache.avro.mapred.AvroOutputFormat;

276

import org.apache.hadoop.mapred.JobConf;

277

278

// Configure word count job

279

JobConf job = new JobConf();

280

job.setJobName("Avro Word Count");

281

282

// Set input/output formats

283

job.setInputFormat(AvroInputFormat.class);

284

job.setOutputFormat(AvroOutputFormat.class);

285

286

// Configure schemas

287

Schema stringSchema = Schema.create(Schema.Type.STRING);

288

Schema pairSchema = Pair.getPairSchema(stringSchema, Schema.create(Schema.Type.INT));

289

290

AvroJob.setInputSchema(job, stringSchema);

291

AvroJob.setMapOutputSchema(job, pairSchema);

292

AvroJob.setOutputSchema(job, pairSchema);

293

294

// Set mapper and reducer classes

295

AvroJob.setMapperClass(job, WordCountMapper.class);

296

AvroJob.setReducerClass(job, WordCountReducer.class);

297

298

// Set input/output paths

299

FileInputFormat.setInputPaths(job, new Path("/input"));

300

FileOutputFormat.setOutputPath(job, new Path("/output"));

301

302

// Run job

303

JobClient.runJob(job);

304

```

305

306

## Advanced Patterns

307

308

### Custom Initialization

309

310

```java

311

public class ConfigurableMapper extends AvroMapper<GenericRecord, GenericRecord> {

312

private Schema outputSchema;

313

private String filterField;

314

private Object filterValue;

315

316

@Override

317

public void configure(JobConf jobConf) {

318

// Get schemas and configuration

319

this.outputSchema = AvroJob.getMapOutputSchema(jobConf);

320

this.filterField = jobConf.get("filter.field");

321

this.filterValue = jobConf.get("filter.value");

322

}

323

324

@Override

325

public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)

326

throws IOException {

327

328

// Use configuration in processing

329

if (input.get(filterField).equals(filterValue)) {

330

collector.collect(transformRecord(input));

331

}

332

}

333

}

334

```

335

336

### Multiple Output Types

337

338

```java

339

public class MultiOutputMapper extends AvroMapper<GenericRecord, GenericRecord> {

340

341

@Override

342

public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)

343

throws IOException {

344

345

String recordType = input.get("type").toString();

346

347

switch (recordType) {

348

case "user":

349

collector.collect(processUser(input));

350

break;

351

case "event":

352

collector.collect(processEvent(input));

353

break;

354

case "transaction":

355

collector.collect(processTransaction(input));

356

break;

357

}

358

}

359

}

360

```

361

362

### Error Handling

363

364

```java

365

public class RobustMapper extends AvroMapper<GenericRecord, GenericRecord> {

366

367

@Override

368

public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)

369

throws IOException {

370

371

try {

372

// Process record

373

GenericRecord output = processRecord(input);

374

collector.collect(output);

375

376

reporter.incrCounter("PROCESSING", "SUCCESS", 1);

377

378

} catch (Exception e) {

379

// Log error and continue processing

380

System.err.println("Failed to process record: " + input + ", error: " + e.getMessage());

381

reporter.incrCounter("PROCESSING", "ERRORS", 1);

382

383

// Optionally emit error record

384

GenericRecord errorRecord = createErrorRecord(input, e);

385

collector.collect(errorRecord);

386

}

387

}

388

}

389

```

390

391

## Performance Considerations

392

393

### Object Reuse

394

395

```java

396

public class EfficientMapper extends AvroMapper<GenericRecord, GenericRecord> {

397

private GenericRecord reusableOutput;

398

399

@Override

400

public void configure(JobConf jobConf) {

401

Schema outputSchema = AvroJob.getMapOutputSchema(jobConf);

402

this.reusableOutput = new GenericData.Record(outputSchema);

403

}

404

405

@Override

406

public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)

407

throws IOException {

408

409

// Reuse output object to reduce GC pressure

410

reusableOutput.put("field1", input.get("field1"));

411

reusableOutput.put("field2", processField(input.get("field2")));

412

413

collector.collect(reusableOutput);

414

}

415

}

416

```

417

418

### Memory Management

419

420

```java

421

public class MemoryEfficientReducer extends AvroReducer<Utf8, GenericRecord, GenericRecord> {

422

423

@Override

424

public void reduce(Utf8 key, Iterable<GenericRecord> values,

425

AvroCollector<GenericRecord> collector, Reporter reporter)

426

throws IOException {

427

428

// Process values in streaming fashion to avoid loading all into memory

429

int count = 0;

430

GenericRecord first = null;

431

432

for (GenericRecord value : values) {

433

if (first == null) {

434

first = GenericData.get().deepCopy(value.getSchema(), value);

435

}

436

count++;

437

438

// Process without accumulating

439

if (count % 1000 == 0) {

440

// Periodically report progress

441

reporter.progress();

442

}

443

}

444

445

// Create summary without holding all values

446

GenericRecord summary = createSummary(key, first, count);

447

collector.collect(summary);

448

}

449

}

450

```

451

452

## Error Handling

453

454

Common issues and solutions:

455

456

- **Schema Mismatch**: Ensure input/output schemas are properly configured via AvroJob

457

- **NullPointerException**: Check for null values in Avro records before processing

458

- **ClassCastException**: Verify data types match schema expectations

459

- **Configuration Errors**: Ensure mapper/reducer classes are properly registered with AvroJob

460

- **Memory Issues**: Use object reuse patterns and streaming processing for large datasets