0
# Job Configuration
1
2
Comprehensive utilities for configuring MapReduce jobs with Avro schemas, data models, and processing classes. This functionality is split between two parallel APIs: the legacy `org.apache.avro.mapred.AvroJob` for older Hadoop MapReduce and `org.apache.avro.mapreduce.AvroJob` for the modern API.
3
4
## Capabilities
5
6
### Legacy API Job Configuration
7
8
Configuration utilities for the legacy `org.apache.hadoop.mapred` API, providing schema management and class configuration for Avro-based MapReduce jobs.
9
10
```java { .api }
11
public class AvroJob {
12
// Schema Configuration
13
public static void setInputSchema(JobConf job, Schema schema);
14
public static Schema getInputSchema(Configuration job);
15
public static void setMapOutputSchema(JobConf job, Schema schema);
16
public static Schema getMapOutputSchema(Configuration job);
17
public static void setOutputSchema(JobConf job, Schema schema);
18
public static Schema getOutputSchema(Configuration job);
19
20
// Processing Class Configuration
21
public static void setMapperClass(JobConf job, Class<? extends AvroMapper> c);
22
public static void setCombinerClass(JobConf job, Class<? extends AvroReducer> c);
23
public static void setReducerClass(JobConf job, Class<? extends AvroReducer> c);
24
25
// Data Model Configuration
26
public static void setDataModelClass(JobConf job, Class<? extends GenericData> modelClass);
27
public static Class<? extends GenericData> getDataModelClass(Configuration conf);
28
public static GenericData createDataModel(Configuration conf);
29
public static GenericData createInputDataModel(Configuration conf);
30
public static GenericData createMapOutputDataModel(Configuration conf);
31
32
// Input/Output Format Configuration
33
public static void setInputSequenceFile(JobConf job);
34
public static void setReflect(JobConf job);
35
public static void setInputReflect(JobConf job);
36
public static void setMapOutputReflect(JobConf job);
37
38
// Output Configuration
39
public static void setOutputCodec(JobConf job, String codec);
40
public static void setOutputMeta(JobConf job, String key, String value);
41
public static void setOutputMeta(JobConf job, String key, long value);
42
public static void setOutputMeta(JobConf job, String key, byte[] value);
43
}
44
```
45
46
#### Usage Example
47
48
```java
49
import org.apache.avro.Schema;
50
import org.apache.avro.mapred.AvroJob;
51
import org.apache.hadoop.mapred.JobConf;
52
53
// Parse schemas
54
Schema userSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}");
55
Schema outputSchema = Schema.parse("{\"type\":\"record\",\"name\":\"UserCount\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"int\"}]}");
56
57
// Configure job
58
JobConf job = new JobConf();
59
AvroJob.setInputSchema(job, userSchema);
60
AvroJob.setMapOutputSchema(job, userSchema);
61
AvroJob.setOutputSchema(job, outputSchema);
62
AvroJob.setMapperClass(job, UserMapper.class);
63
AvroJob.setReducerClass(job, UserReducer.class);
64
65
// Set compression and metadata
66
AvroJob.setOutputCodec(job, "snappy");
67
AvroJob.setOutputMeta(job, "created.by", "MyApplication");
68
```
69
70
### New API Job Configuration
71
72
Configuration utilities for the modern `org.apache.hadoop.mapreduce` API, supporting separate key and value schemas for more flexible data processing patterns.
73
74
```java { .api }
75
public class org.apache.avro.mapreduce.AvroJob {
76
// Input Schema Configuration
77
public static void setInputKeySchema(Job job, Schema schema);
78
public static Schema getInputKeySchema(Configuration conf);
79
public static void setInputValueSchema(Job job, Schema schema);
80
public static Schema getInputValueSchema(Configuration conf);
81
82
// Map Output Schema Configuration
83
public static void setMapOutputKeySchema(Job job, Schema schema);
84
public static Schema getMapOutputKeySchema(Configuration conf);
85
public static void setMapOutputValueSchema(Job job, Schema schema);
86
public static Schema getMapOutputValueSchema(Configuration conf);
87
88
// Output Schema Configuration
89
public static void setOutputKeySchema(Job job, Schema schema);
90
public static Schema getOutputKeySchema(Configuration conf);
91
public static void setOutputValueSchema(Job job, Schema schema);
92
public static Schema getOutputValueSchema(Configuration conf);
93
94
// Data Model Configuration
95
public static void setDataModelClass(Job job, Class<? extends GenericData> modelClass);
96
}
97
```
98
99
#### Usage Example
100
101
```java
102
import org.apache.avro.Schema;
103
import org.apache.avro.mapreduce.AvroJob;
104
import org.apache.hadoop.mapreduce.Job;
105
106
// Parse schemas
107
Schema keySchema = Schema.parse("{\"type\":\"string\"}");
108
Schema valueSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}");
109
Schema outputValueSchema = Schema.parse("{\"type\":\"int\"}");
110
111
// Configure job with separate key and value schemas
112
Job job = Job.getInstance();
113
AvroJob.setInputKeySchema(job, keySchema);
114
AvroJob.setInputValueSchema(job, valueSchema);
115
AvroJob.setMapOutputKeySchema(job, keySchema);
116
AvroJob.setMapOutputValueSchema(job, Schema.create(Schema.Type.INT));
117
AvroJob.setOutputKeySchema(job, keySchema);
118
AvroJob.setOutputValueSchema(job, outputValueSchema);
119
```
120
121
### Configuration Constants
122
123
Key configuration constants used internally by the job configuration utilities.
124
125
```java { .api }
126
// Legacy API Constants (org.apache.avro.mapred.AvroJob)
127
public static final String INPUT_SCHEMA = "avro.input.schema";
128
public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
129
public static final String OUTPUT_SCHEMA = "avro.output.schema";
130
public static final String OUTPUT_CODEC = "avro.output.codec";
131
public static final String REFLECT_DATA = "avro.reflect.data";
132
public static final String INPUT_IS_REFLECT = "avro.input.is.reflect";
133
public static final String MAP_OUTPUT_IS_REFLECT = "avro.map.output.is.reflect";
134
135
// New API Constants (org.apache.avro.mapreduce.AvroJob)
136
public static final String CONF_OUTPUT_CODEC = "avro.mapreduce.output.codec";
137
```
138
139
### Data Model Integration
140
141
Configuration methods for integrating different Avro data models (Generic, Specific, Reflect) with MapReduce jobs.
142
143
```java { .api }
144
// Set data model class for custom data handling
145
public static void setDataModelClass(JobConf job, Class<? extends GenericData> modelClass);
146
public static void setDataModelClass(Job job, Class<? extends GenericData> modelClass);
147
148
// Create data model instance from configuration
149
public static GenericData createDataModel(Configuration conf);
150
151
// Enable reflection-based data handling
152
public static void setReflect(JobConf job); // Enable reflection globally
153
public static void setInputReflect(JobConf job); // Enable for input only
154
public static void setMapOutputReflect(JobConf job); // Enable for map output only
155
```
156
157
#### Data Model Usage Example
158
159
```java
160
import org.apache.avro.generic.GenericData;
161
import org.apache.avro.reflect.ReflectData;
162
163
// Use reflection data model for POJOs
164
AvroJob.setDataModelClass(job, ReflectData.class);
165
AvroJob.setReflect(job);
166
167
// Or configure reflection for specific stages
168
AvroJob.setInputReflect(job); // Input uses reflection
169
AvroJob.setMapOutputReflect(job); // Map output uses reflection
170
```
171
172
### Output Configuration
173
174
Specialized configuration for output formatting, compression, and metadata.
175
176
```java { .api }
177
// Compression configuration
178
public static void setOutputCodec(JobConf job, String codec);
179
180
// Metadata configuration (multiple overloads)
181
public static void setOutputMeta(JobConf job, String key, String value);
182
public static void setOutputMeta(JobConf job, String key, long value);
183
public static void setOutputMeta(JobConf job, String key, byte[] value);
184
185
// Special input format configuration
186
public static void setInputSequenceFile(JobConf job); // Enable SequenceFile input
187
```
188
189
#### Output Configuration Example
190
191
```java
192
// Set compression codec
193
AvroJob.setOutputCodec(job, "snappy"); // Use Snappy compression
194
AvroJob.setOutputCodec(job, "deflate"); // Use Deflate compression
195
196
// Add metadata to output files
197
AvroJob.setOutputMeta(job, "created.by", "MyApplication");
198
AvroJob.setOutputMeta(job, "created.time", System.currentTimeMillis());
199
AvroJob.setOutputMeta(job, "version", "1.0".getBytes());
200
201
// Configure for SequenceFile input
202
AvroJob.setInputSequenceFile(job);
203
```
204
205
## Integration with Input/Output Formats
206
207
The job configuration utilities work seamlessly with Avro input and output formats:
208
209
```java
210
import org.apache.avro.mapred.*;
211
import org.apache.avro.mapreduce.*;
212
213
// Legacy API integration
214
job.setInputFormat(AvroInputFormat.class);
215
job.setOutputFormat(AvroOutputFormat.class);
216
217
// New API integration
218
job.setInputFormatClass(AvroKeyInputFormat.class);
219
job.setOutputFormatClass(AvroKeyOutputFormat.class);
220
221
// Or key-value formats
222
job.setInputFormatClass(AvroKeyValueInputFormat.class);
223
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
224
```
225
226
## Error Handling
227
228
Common configuration errors and troubleshooting:
229
230
- **Schema Parse Errors**: Ensure JSON schema strings are properly formatted and valid Avro schemas
231
- **Class Not Found**: Verify mapper/reducer classes are on the classpath and extend appropriate Avro base classes
232
- **Codec Errors**: Use supported codec names ("snappy", "deflate", "bzip2", "zstd", "null")
233
- **Data Model Conflicts**: Ensure consistent data model usage across input, map output, and output configurations