0
# Cross-Language Processing (Tether)
1
2
Tether framework for implementing MapReduce jobs in non-Java languages while maintaining Avro data integration and schema compatibility. Tether enables developers to write MapReduce logic in languages like Python, C++, or any language that can communicate via standard input/output, while still benefiting from Avro's schema evolution and efficient serialization.
3
4
## Capabilities
5
6
### Tether Job Configuration
7
8
Main class for configuring and executing Tether-based MapReduce jobs.
9
10
```java { .api }
11
public class TetherJob {
12
// Executable configuration
13
public static void setExecutable(JobConf job, File executable);
14
public static void setExecutable(JobConf job, String executable);
15
16
// Schema configuration
17
public static void setInputSchema(JobConf job, Schema schema);
18
public static void setMapOutputSchema(JobConf job, Schema schema);
19
public static void setOutputSchema(JobConf job, Schema schema);
20
21
// Protocol configuration
22
public static void setProtocol(JobConf job, Protocol protocol);
23
24
// Job execution
25
public static void submit(JobConf job) throws IOException;
26
public static RunningJob runJob(JobConf job) throws IOException;
27
}
28
```
29
30
#### Usage Example
31
32
```java
33
import org.apache.avro.mapred.tether.TetherJob;
34
import org.apache.avro.Schema;
35
import org.apache.hadoop.mapred.JobConf;
36
37
// Configure Tether job for Python MapReduce script
38
JobConf job = new JobConf();
39
job.setJobName("Python Word Count via Tether");
40
41
// Set executable (Python script)
42
TetherJob.setExecutable(job, new File("/path/to/wordcount.py"));
43
44
// Configure schemas
45
Schema stringSchema = Schema.create(Schema.Type.STRING);
46
Schema intSchema = Schema.create(Schema.Type.INT);
47
Schema pairSchema = Pair.getPairSchema(stringSchema, intSchema);
48
49
TetherJob.setInputSchema(job, stringSchema);
50
TetherJob.setMapOutputSchema(job, pairSchema);
51
TetherJob.setOutputSchema(job, pairSchema);
52
53
// Set input/output paths
54
FileInputFormat.setInputPaths(job, new Path("/input"));
55
FileOutputFormat.setOutputPath(job, new Path("/output"));
56
57
// Submit job
58
TetherJob.runJob(job);
59
```
60
61
### Tether Input/Output Formats
62
63
Specialized formats for reading and writing data in Tether jobs.
64
65
```java { .api }
66
public class TetherInputFormat extends FileInputFormat<AvroKey<Object>, AvroValue<Object>> {
67
public RecordReader<AvroKey<Object>, AvroValue<Object>> createRecordReader(
68
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
69
}
70
71
public class TetherOutputFormat extends AvroOutputFormat<Object> {
72
public RecordWriter<AvroKey<Object>, AvroValue<Object>> getRecordWriter(TaskAttemptContext context)
73
throws IOException, InterruptedException;
74
}
75
76
public class TetherRecordReader extends RecordReader<AvroKey<Object>, AvroValue<Object>> {
77
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
78
public boolean nextKeyValue() throws IOException, InterruptedException;
79
public AvroKey<Object> getCurrentKey() throws IOException, InterruptedException;
80
public AvroValue<Object> getCurrentValue() throws IOException, InterruptedException;
81
public float getProgress() throws IOException, InterruptedException;
82
public void close() throws IOException;
83
}
84
```
85
86
### Tether Execution Framework
87
88
Classes that manage the execution of external processes and communication protocols.
89
90
```java { .api }
91
public class TetherMapRunner implements MapRunnable<AvroKey<Object>, AvroValue<Object>, AvroKey<Object>, AvroValue<Object>> {
92
public void run(RecordReader<AvroKey<Object>, AvroValue<Object>> input,
93
OutputCollector<AvroKey<Object>, AvroValue<Object>> output,
94
Reporter reporter) throws IOException;
95
}
96
97
public class TetherReducer implements Reducer<AvroKey<Object>, AvroValue<Object>, AvroKey<Object>, AvroValue<Object>> {
98
public void reduce(AvroKey<Object> key, Iterator<AvroValue<Object>> values,
99
OutputCollector<AvroKey<Object>, AvroValue<Object>> output,
100
Reporter reporter) throws IOException;
101
public void configure(JobConf job);
102
public void close() throws IOException;
103
}
104
105
public class TetheredProcess {
106
// Process management
107
public TetheredProcess(JobConf job, TaskAttemptContext context);
108
public void startProcess() throws IOException;
109
public void stopProcess() throws IOException;
110
111
// Communication
112
public void writeInput(Object datum) throws IOException;
113
public Object readOutput() throws IOException;
114
115
// Status monitoring
116
public boolean isAlive();
117
public int getExitCode();
118
}
119
```
120
121
### Tether Data Handling
122
123
Classes for managing data serialization and protocol communication with external processes.
124
125
```java { .api }
126
public class TetherData {
127
// Data serialization for external processes
128
public static void writeDatum(OutputStream out, Object datum, Schema schema) throws IOException;
129
public static Object readDatum(InputStream in, Schema schema) throws IOException;
130
131
// Protocol message handling
132
public static void writeMessage(OutputStream out, Object message) throws IOException;
133
public static Object readMessage(InputStream in) throws IOException;
134
135
// Schema transmission
136
public static void sendSchema(OutputStream out, Schema schema) throws IOException;
137
public static Schema receiveSchema(InputStream in) throws IOException;
138
}
139
140
public class TetherKeySerialization implements Serialization<AvroKey<Object>> {
141
public boolean accept(Class<?> c);
142
public Deserializer<AvroKey<Object>> getDeserializer(Class<AvroKey<Object>> c);
143
public Serializer<AvroKey<Object>> getSerializer(Class<AvroKey<Object>> c);
144
}
145
146
public class TetherKeyComparator implements RawComparator<AvroKey<Object>>, Configurable {
147
public int compare(AvroKey<Object> o1, AvroKey<Object> o2);
148
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
149
public void setConf(Configuration conf);
150
public Configuration getConf();
151
}
152
153
public class TetherPartitioner implements Partitioner<AvroKey<Object>, AvroValue<Object>>, Configurable {
154
public int getPartition(AvroKey<Object> key, AvroValue<Object> value, int numPartitions);
155
public void configure(JobConf job);
156
}
157
```
158
159
### Tether Output Service
160
161
Service for handling output from external processes back to the Hadoop framework.
162
163
```java { .api }
164
public class TetherOutputService {
165
// Service management
166
public TetherOutputService(TaskAttemptContext context);
167
public void start() throws IOException;
168
public void stop() throws IOException;
169
170
// Output handling
171
public void handleOutput(Object key, Object value) throws IOException;
172
public void handleComplete() throws IOException;
173
public void handleError(String error) throws IOException;
174
175
// Status reporting
176
public void reportProgress(float progress);
177
public void reportCounter(String group, String name, long value);
178
}
179
```
180
181
## Complete Example: Python Word Count
182
183
### Python Script (wordcount.py)
184
185
```python
186
#!/usr/bin/env python3
187
import sys
188
import json
189
import avro.schema
190
import avro.io
191
import io
192
193
def map_phase():
194
"""Map phase: read lines and emit word counts."""
195
# Read input schema
196
input_schema_json = sys.stdin.readline().strip()
197
input_schema = avro.schema.parse(input_schema_json)
198
199
# Read output schema
200
output_schema_json = sys.stdin.readline().strip()
201
output_schema = avro.schema.parse(output_schema_json)
202
203
# Create readers/writers
204
decoder = avro.io.BinaryDecoder(sys.stdin.buffer)
205
encoder = avro.io.BinaryEncoder(sys.stdout.buffer)
206
reader = avro.io.DatumReader(input_schema)
207
writer = avro.io.DatumWriter(output_schema)
208
209
try:
210
while True:
211
# Read input line
212
line = reader.read(decoder)
213
214
# Split into words and emit pairs
215
words = line.lower().split()
216
for word in words:
217
if word:
218
# Create key-value pair
219
pair = {"key": word, "value": 1}
220
writer.write(pair, encoder)
221
sys.stdout.buffer.flush()
222
223
except EOFError:
224
pass # End of input
225
226
def reduce_phase():
227
"""Reduce phase: sum counts for each word."""
228
# Read schemas
229
key_schema_json = sys.stdin.readline().strip()
230
key_schema = avro.schema.parse(key_schema_json)
231
232
value_schema_json = sys.stdin.readline().strip()
233
value_schema = avro.schema.parse(value_schema_json)
234
235
output_schema_json = sys.stdin.readline().strip()
236
output_schema = avro.schema.parse(output_schema_json)
237
238
# Create readers/writers
239
decoder = avro.io.BinaryDecoder(sys.stdin.buffer)
240
encoder = avro.io.BinaryEncoder(sys.stdout.buffer)
241
key_reader = avro.io.DatumReader(key_schema)
242
value_reader = avro.io.DatumReader(value_schema)
243
writer = avro.io.DatumWriter(output_schema)
244
245
current_key = None
246
count = 0
247
248
try:
249
while True:
250
# Read key-value pair
251
key = key_reader.read(decoder)
252
value = value_reader.read(decoder)
253
254
if current_key != key:
255
# Emit previous key's count
256
if current_key is not None:
257
result = {"key": current_key, "value": count}
258
writer.write(result, encoder)
259
sys.stdout.buffer.flush()
260
261
# Start new key
262
current_key = key
263
count = value
264
else:
265
# Accumulate count
266
count += value
267
268
except EOFError:
269
# Emit final count
270
if current_key is not None:
271
result = {"key": current_key, "value": count}
272
writer.write(result, encoder)
273
sys.stdout.buffer.flush()
274
275
if __name__ == "__main__":
276
phase = sys.argv[1] if len(sys.argv) > 1 else "map"
277
if phase == "map":
278
map_phase()
279
elif phase == "reduce":
280
reduce_phase()
281
```
282
283
### Java Job Configuration
284
285
```java
286
import org.apache.avro.mapred.tether.TetherJob;
287
import org.apache.avro.Schema;
288
import org.apache.hadoop.mapred.JobConf;
289
import org.apache.hadoop.fs.Path;
290
291
public class PythonWordCountJob {
292
public static void main(String[] args) throws Exception {
293
JobConf job = new JobConf();
294
job.setJobName("Python Word Count via Tether");
295
296
// Configure Tether executable
297
TetherJob.setExecutable(job, "/path/to/wordcount.py");
298
299
// Define schemas
300
Schema stringSchema = Schema.create(Schema.Type.STRING);
301
Schema intSchema = Schema.create(Schema.Type.INT);
302
Schema pairSchema = Schema.createRecord("Pair", null, null, false);
303
pairSchema.setFields(Arrays.asList(
304
new Schema.Field("key", stringSchema, null, null),
305
new Schema.Field("value", intSchema, null, null)
306
));
307
308
// Configure schemas
309
TetherJob.setInputSchema(job, stringSchema);
310
TetherJob.setMapOutputSchema(job, pairSchema);
311
TetherJob.setOutputSchema(job, pairSchema);
312
313
// Set input/output paths
314
job.setInputFormat(TetherInputFormat.class);
315
job.setOutputFormat(TetherOutputFormat.class);
316
FileInputFormat.setInputPaths(job, new Path(args[0]));
317
FileOutputFormat.setOutputPath(job, new Path(args[1]));
318
319
// Run job
320
TetherJob.runJob(job);
321
}
322
}
323
```
324
325
## Protocol Communication
326
327
### Message Protocol
328
329
Tether uses a JSON-based protocol for communication between Java and external processes:
330
331
```json
332
{
333
"type": "configure",
334
"schemas": {
335
"input": "{\"type\":\"string\"}",
336
"output": "{\"type\":\"record\",\"name\":\"Pair\",...}"
337
}
338
}
339
340
{
341
"type": "map",
342
"input": "Hello world"
343
}
344
345
{
346
"type": "output",
347
"data": {"key": "hello", "value": 1}
348
}
349
350
{
351
"type": "reduce",
352
"key": "hello",
353
"values": [1, 1, 1]
354
}
355
356
{
357
"type": "complete"
358
}
359
```
360
361
### Error Handling Protocol
362
363
```json
364
{
365
"type": "error",
366
"message": "Failed to process input",
367
"details": "Invalid schema format"
368
}
369
370
{
371
"type": "progress",
372
"value": 0.75
373
}
374
375
{
376
"type": "counter",
377
"group": "PROCESSING",
378
"name": "RECORDS_PROCESSED",
379
"value": 1000
380
}
381
```
382
383
## Language-Specific Examples
384
385
### Python Integration
386
387
```python
388
# Python Tether client library usage
389
from avro_tether import TetherMapper, TetherReducer
390
391
class WordCountMapper(TetherMapper):
392
def map(self, datum):
393
words = datum.lower().split()
394
for word in words:
395
self.emit({"key": word, "value": 1})
396
397
class WordCountReducer(TetherReducer):
398
def reduce(self, key, values):
399
total = sum(values)
400
self.emit({"key": key, "value": total})
401
402
# Run with framework
403
if __name__ == "__main__":
404
import sys
405
if sys.argv[1] == "map":
406
WordCountMapper().run()
407
else:
408
WordCountReducer().run()
409
```
410
411
### C++ Integration
412
413
```cpp
414
// C++ Tether client example
415
#include "avro_tether.h"
416
417
class WordCountMapper : public TetherMapper {
418
public:
419
void map(const avro::GenericDatum& input) override {
420
std::string line = input.value<std::string>();
421
std::istringstream iss(line);
422
std::string word;
423
424
while (iss >> word) {
425
avro::GenericRecord pair;
426
pair.setField("key", word);
427
pair.setField("value", 1);
428
emit(pair);
429
}
430
}
431
};
432
433
int main(int argc, char** argv) {
434
if (std::string(argv[1]) == "map") {
435
WordCountMapper mapper;
436
mapper.run();
437
}
438
return 0;
439
}
440
```
441
442
## Performance Considerations
443
444
### Process Management
445
446
```java
447
// Configure process resources
448
job.setInt("mapreduce.map.memory.mb", 2048);
449
job.setInt("mapreduce.reduce.memory.mb", 4096);
450
451
// Set executable permissions
452
job.set("tether.executable.permissions", "755");
453
454
// Configure timeouts
455
job.setLong("tether.process.timeout", 300000); // 5 minutes
456
```
457
458
### Data Transfer Optimization
459
460
```java
461
// Configure buffer sizes for I/O
462
job.setInt("tether.io.buffer.size", 65536);
463
464
// Enable compression for data transfer
465
job.setBoolean("tether.compress.data", true);
466
job.set("tether.compression.codec", "snappy");
467
```
468
469
### Memory Management
470
471
```python
472
# Python: Process data in streaming fashion
473
def map_phase():
474
for line in sys.stdin:
475
# Process immediately, don't accumulate
476
process_and_emit(line)
477
478
def reduce_phase():
479
current_key = None
480
count = 0
481
482
for key, value in read_key_values():
483
if current_key != key:
484
if current_key is not None:
485
emit(current_key, count)
486
current_key = key
487
count = value
488
else:
489
count += value
490
```
491
492
## Error Handling and Debugging
493
494
### Process Monitoring
495
496
```java
497
// Monitor external process health
498
public class TetherProcessMonitor {
499
public void monitorProcess(TetheredProcess process) {
500
while (process.isAlive()) {
501
if (!process.isResponding()) {
502
logger.warn("Tether process not responding, restarting");
503
process.restart();
504
}
505
Thread.sleep(5000);
506
}
507
}
508
}
509
```
510
511
### Error Recovery
512
513
```java
514
// Configure retry behavior
515
job.setInt("tether.process.max.retries", 3);
516
job.setLong("tether.process.retry.delay", 10000);
517
518
// Enable detailed logging
519
job.setBoolean("tether.debug.enabled", true);
520
job.set("tether.log.level", "DEBUG");
521
```
522
523
### Common Issues
524
525
- **Process Not Found**: Ensure executable path is correct and accessible on all nodes
526
- **Schema Mismatch**: Verify external process handles schemas correctly
527
- **Communication Timeout**: Increase timeout values for complex processing
528
- **Memory Issues**: Monitor memory usage in external processes
529
- **Permission Errors**: Ensure executable has proper permissions on cluster nodes