Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines
—
Tether framework for implementing MapReduce jobs in non-Java languages while maintaining Avro data integration and schema compatibility. Tether enables developers to write MapReduce logic in languages like Python, C++, or any language that can communicate via standard input/output, while still benefiting from Avro's schema evolution and efficient serialization.
Main class for configuring and executing Tether-based MapReduce jobs.
public class TetherJob {
// Executable configuration
public static void setExecutable(JobConf job, File executable);
public static void setExecutable(JobConf job, String executable);
// Schema configuration
public static void setInputSchema(JobConf job, Schema schema);
public static void setMapOutputSchema(JobConf job, Schema schema);
public static void setOutputSchema(JobConf job, Schema schema);
// Protocol configuration
public static void setProtocol(JobConf job, Protocol protocol);
// Job execution
public static void submit(JobConf job) throws IOException;
public static RunningJob runJob(JobConf job) throws IOException;
}import org.apache.avro.mapred.tether.TetherJob;
import org.apache.avro.Schema;
import org.apache.hadoop.mapred.JobConf;
// Configure Tether job for Python MapReduce script
JobConf job = new JobConf();
job.setJobName("Python Word Count via Tether");
// Set executable (Python script)
TetherJob.setExecutable(job, new File("/path/to/wordcount.py"));
// Configure schemas
Schema stringSchema = Schema.create(Schema.Type.STRING);
Schema intSchema = Schema.create(Schema.Type.INT);
Schema pairSchema = Pair.getPairSchema(stringSchema, intSchema);
TetherJob.setInputSchema(job, stringSchema);
TetherJob.setMapOutputSchema(job, pairSchema);
TetherJob.setOutputSchema(job, pairSchema);
// Set input/output paths
FileInputFormat.setInputPaths(job, new Path("/input"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
// Submit job
TetherJob.runJob(job);Specialized formats for reading and writing data in Tether jobs.
public class TetherInputFormat extends FileInputFormat<AvroKey<Object>, AvroValue<Object>> {
public RecordReader<AvroKey<Object>, AvroValue<Object>> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}
public class TetherOutputFormat extends AvroOutputFormat<Object> {
public RecordWriter<AvroKey<Object>, AvroValue<Object>> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException;
}
public class TetherRecordReader extends RecordReader<AvroKey<Object>, AvroValue<Object>> {
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
public boolean nextKeyValue() throws IOException, InterruptedException;
public AvroKey<Object> getCurrentKey() throws IOException, InterruptedException;
public AvroValue<Object> getCurrentValue() throws IOException, InterruptedException;
public float getProgress() throws IOException, InterruptedException;
public void close() throws IOException;
}Classes that manage the execution of external processes and communication protocols.
public class TetherMapRunner implements MapRunnable<AvroKey<Object>, AvroValue<Object>, AvroKey<Object>, AvroValue<Object>> {
public void run(RecordReader<AvroKey<Object>, AvroValue<Object>> input,
OutputCollector<AvroKey<Object>, AvroValue<Object>> output,
Reporter reporter) throws IOException;
}
public class TetherReducer implements Reducer<AvroKey<Object>, AvroValue<Object>, AvroKey<Object>, AvroValue<Object>> {
public void reduce(AvroKey<Object> key, Iterator<AvroValue<Object>> values,
OutputCollector<AvroKey<Object>, AvroValue<Object>> output,
Reporter reporter) throws IOException;
public void configure(JobConf job);
public void close() throws IOException;
}
public class TetheredProcess {
// Process management
public TetheredProcess(JobConf job, TaskAttemptContext context);
public void startProcess() throws IOException;
public void stopProcess() throws IOException;
// Communication
public void writeInput(Object datum) throws IOException;
public Object readOutput() throws IOException;
// Status monitoring
public boolean isAlive();
public int getExitCode();
}Classes for managing data serialization and protocol communication with external processes.
public class TetherData {
// Data serialization for external processes
public static void writeDatum(OutputStream out, Object datum, Schema schema) throws IOException;
public static Object readDatum(InputStream in, Schema schema) throws IOException;
// Protocol message handling
public static void writeMessage(OutputStream out, Object message) throws IOException;
public static Object readMessage(InputStream in) throws IOException;
// Schema transmission
public static void sendSchema(OutputStream out, Schema schema) throws IOException;
public static Schema receiveSchema(InputStream in) throws IOException;
}
public class TetherKeySerialization implements Serialization<AvroKey<Object>> {
public boolean accept(Class<?> c);
public Deserializer<AvroKey<Object>> getDeserializer(Class<AvroKey<Object>> c);
public Serializer<AvroKey<Object>> getSerializer(Class<AvroKey<Object>> c);
}
public class TetherKeyComparator implements RawComparator<AvroKey<Object>>, Configurable {
public int compare(AvroKey<Object> o1, AvroKey<Object> o2);
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
public void setConf(Configuration conf);
public Configuration getConf();
}
public class TetherPartitioner implements Partitioner<AvroKey<Object>, AvroValue<Object>>, Configurable {
public int getPartition(AvroKey<Object> key, AvroValue<Object> value, int numPartitions);
public void configure(JobConf job);
}Service for handling output from external processes back to the Hadoop framework.
public class TetherOutputService {
// Service management
public TetherOutputService(TaskAttemptContext context);
public void start() throws IOException;
public void stop() throws IOException;
// Output handling
public void handleOutput(Object key, Object value) throws IOException;
public void handleComplete() throws IOException;
public void handleError(String error) throws IOException;
// Status reporting
public void reportProgress(float progress);
public void reportCounter(String group, String name, long value);
}#!/usr/bin/env python3
import sys
import json
import avro.schema
import avro.io
import io
def map_phase():
"""Map phase: read lines and emit word counts."""
# Read input schema
input_schema_json = sys.stdin.readline().strip()
input_schema = avro.schema.parse(input_schema_json)
# Read output schema
output_schema_json = sys.stdin.readline().strip()
output_schema = avro.schema.parse(output_schema_json)
# Create readers/writers
decoder = avro.io.BinaryDecoder(sys.stdin.buffer)
encoder = avro.io.BinaryEncoder(sys.stdout.buffer)
reader = avro.io.DatumReader(input_schema)
writer = avro.io.DatumWriter(output_schema)
try:
while True:
# Read input line
line = reader.read(decoder)
# Split into words and emit pairs
words = line.lower().split()
for word in words:
if word:
# Create key-value pair
pair = {"key": word, "value": 1}
writer.write(pair, encoder)
sys.stdout.buffer.flush()
except EOFError:
pass # End of input
def reduce_phase():
"""Reduce phase: sum counts for each word."""
# Read schemas
key_schema_json = sys.stdin.readline().strip()
key_schema = avro.schema.parse(key_schema_json)
value_schema_json = sys.stdin.readline().strip()
value_schema = avro.schema.parse(value_schema_json)
output_schema_json = sys.stdin.readline().strip()
output_schema = avro.schema.parse(output_schema_json)
# Create readers/writers
decoder = avro.io.BinaryDecoder(sys.stdin.buffer)
encoder = avro.io.BinaryEncoder(sys.stdout.buffer)
key_reader = avro.io.DatumReader(key_schema)
value_reader = avro.io.DatumReader(value_schema)
writer = avro.io.DatumWriter(output_schema)
current_key = None
count = 0
try:
while True:
# Read key-value pair
key = key_reader.read(decoder)
value = value_reader.read(decoder)
if current_key != key:
# Emit previous key's count
if current_key is not None:
result = {"key": current_key, "value": count}
writer.write(result, encoder)
sys.stdout.buffer.flush()
# Start new key
current_key = key
count = value
else:
# Accumulate count
count += value
except EOFError:
# Emit final count
if current_key is not None:
result = {"key": current_key, "value": count}
writer.write(result, encoder)
sys.stdout.buffer.flush()
if __name__ == "__main__":
phase = sys.argv[1] if len(sys.argv) > 1 else "map"
if phase == "map":
map_phase()
elif phase == "reduce":
reduce_phase()import org.apache.avro.mapred.tether.TetherJob;
import org.apache.avro.Schema;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.Path;
public class PythonWordCountJob {
public static void main(String[] args) throws Exception {
JobConf job = new JobConf();
job.setJobName("Python Word Count via Tether");
// Configure Tether executable
TetherJob.setExecutable(job, "/path/to/wordcount.py");
// Define schemas
Schema stringSchema = Schema.create(Schema.Type.STRING);
Schema intSchema = Schema.create(Schema.Type.INT);
Schema pairSchema = Schema.createRecord("Pair", null, null, false);
pairSchema.setFields(Arrays.asList(
new Schema.Field("key", stringSchema, null, null),
new Schema.Field("value", intSchema, null, null)
));
// Configure schemas
TetherJob.setInputSchema(job, stringSchema);
TetherJob.setMapOutputSchema(job, pairSchema);
TetherJob.setOutputSchema(job, pairSchema);
// Set input/output paths
job.setInputFormat(TetherInputFormat.class);
job.setOutputFormat(TetherOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Run job
TetherJob.runJob(job);
}
}Tether uses a JSON-based protocol for communication between Java and external processes:
{
"type": "configure",
"schemas": {
"input": "{\"type\":\"string\"}",
"output": "{\"type\":\"record\",\"name\":\"Pair\",...}"
}
}
{
"type": "map",
"input": "Hello world"
}
{
"type": "output",
"data": {"key": "hello", "value": 1}
}
{
"type": "reduce",
"key": "hello",
"values": [1, 1, 1]
}
{
"type": "complete"
}{
"type": "error",
"message": "Failed to process input",
"details": "Invalid schema format"
}
{
"type": "progress",
"value": 0.75
}
{
"type": "counter",
"group": "PROCESSING",
"name": "RECORDS_PROCESSED",
"value": 1000
}# Python Tether client library usage
from avro_tether import TetherMapper, TetherReducer
class WordCountMapper(TetherMapper):
def map(self, datum):
words = datum.lower().split()
for word in words:
self.emit({"key": word, "value": 1})
class WordCountReducer(TetherReducer):
def reduce(self, key, values):
total = sum(values)
self.emit({"key": key, "value": total})
# Run with framework
if __name__ == "__main__":
import sys
if sys.argv[1] == "map":
WordCountMapper().run()
else:
WordCountReducer().run()// C++ Tether client example
#include "avro_tether.h"
class WordCountMapper : public TetherMapper {
public:
void map(const avro::GenericDatum& input) override {
std::string line = input.value<std::string>();
std::istringstream iss(line);
std::string word;
while (iss >> word) {
avro::GenericRecord pair;
pair.setField("key", word);
pair.setField("value", 1);
emit(pair);
}
}
};
int main(int argc, char** argv) {
if (std::string(argv[1]) == "map") {
WordCountMapper mapper;
mapper.run();
}
return 0;
}// Configure process resources
job.setInt("mapreduce.map.memory.mb", 2048);
job.setInt("mapreduce.reduce.memory.mb", 4096);
// Set executable permissions
job.set("tether.executable.permissions", "755");
// Configure timeouts
job.setLong("tether.process.timeout", 300000); // 5 minutes// Configure buffer sizes for I/O
job.setInt("tether.io.buffer.size", 65536);
// Enable compression for data transfer
job.setBoolean("tether.compress.data", true);
job.set("tether.compression.codec", "snappy");# Python: Process data in streaming fashion
def map_phase():
for line in sys.stdin:
# Process immediately, don't accumulate
process_and_emit(line)
def reduce_phase():
current_key = None
count = 0
for key, value in read_key_values():
if current_key != key:
if current_key is not None:
emit(current_key, count)
current_key = key
count = value
else:
count += value// Monitor external process health
public class TetherProcessMonitor {
public void monitorProcess(TetheredProcess process) {
while (process.isAlive()) {
if (!process.isResponding()) {
logger.warn("Tether process not responding, restarting");
process.restart();
}
Thread.sleep(5000);
}
}
}// Configure retry behavior
job.setInt("tether.process.max.retries", 3);
job.setLong("tether.process.retry.delay", 10000);
// Enable detailed logging
job.setBoolean("tether.debug.enabled", true);
job.set("tether.log.level", "DEBUG");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-mapred