CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-mapred

Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines

Pending
Overview
Eval results
Files

cross-language-processing.mddocs/

Cross-Language Processing (Tether)

Tether framework for implementing MapReduce jobs in non-Java languages while maintaining Avro data integration and schema compatibility. Tether enables developers to write MapReduce logic in languages like Python, C++, or any language that can communicate via standard input/output, while still benefiting from Avro's schema evolution and efficient serialization.

Capabilities

Tether Job Configuration

Main class for configuring and executing Tether-based MapReduce jobs.

public class TetherJob {
    // Executable configuration
    public static void setExecutable(JobConf job, File executable);
    public static void setExecutable(JobConf job, String executable);
    
    // Schema configuration
    public static void setInputSchema(JobConf job, Schema schema);
    public static void setMapOutputSchema(JobConf job, Schema schema);  
    public static void setOutputSchema(JobConf job, Schema schema);
    
    // Protocol configuration
    public static void setProtocol(JobConf job, Protocol protocol);
    
    // Job execution
    public static void submit(JobConf job) throws IOException;
    public static RunningJob runJob(JobConf job) throws IOException;
}

Usage Example

import org.apache.avro.mapred.tether.TetherJob;
import org.apache.avro.Schema;
import org.apache.hadoop.mapred.JobConf;

// Configure Tether job for Python MapReduce script
JobConf job = new JobConf();
job.setJobName("Python Word Count via Tether");

// Set executable (Python script)
TetherJob.setExecutable(job, new File("/path/to/wordcount.py"));

// Configure schemas
Schema stringSchema = Schema.create(Schema.Type.STRING);
Schema intSchema = Schema.create(Schema.Type.INT);
Schema pairSchema = Pair.getPairSchema(stringSchema, intSchema);

TetherJob.setInputSchema(job, stringSchema);
TetherJob.setMapOutputSchema(job, pairSchema);
TetherJob.setOutputSchema(job, pairSchema);

// Set input/output paths
FileInputFormat.setInputPaths(job, new Path("/input"));
FileOutputFormat.setOutputPath(job, new Path("/output"));

// Submit job
TetherJob.runJob(job);

Tether Input/Output Formats

Specialized formats for reading and writing data in Tether jobs.

public class TetherInputFormat extends FileInputFormat<AvroKey<Object>, AvroValue<Object>> {
    public RecordReader<AvroKey<Object>, AvroValue<Object>> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

public class TetherOutputFormat extends AvroOutputFormat<Object> {
    public RecordWriter<AvroKey<Object>, AvroValue<Object>> getRecordWriter(TaskAttemptContext context) 
        throws IOException, InterruptedException;
}

public class TetherRecordReader extends RecordReader<AvroKey<Object>, AvroValue<Object>> {
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
    public boolean nextKeyValue() throws IOException, InterruptedException;
    public AvroKey<Object> getCurrentKey() throws IOException, InterruptedException;  
    public AvroValue<Object> getCurrentValue() throws IOException, InterruptedException;
    public float getProgress() throws IOException, InterruptedException;
    public void close() throws IOException;
}

Tether Execution Framework

Classes that manage the execution of external processes and communication protocols.

public class TetherMapRunner implements MapRunnable<AvroKey<Object>, AvroValue<Object>, AvroKey<Object>, AvroValue<Object>> {
    public void run(RecordReader<AvroKey<Object>, AvroValue<Object>> input,
                   OutputCollector<AvroKey<Object>, AvroValue<Object>> output,
                   Reporter reporter) throws IOException;
}

public class TetherReducer implements Reducer<AvroKey<Object>, AvroValue<Object>, AvroKey<Object>, AvroValue<Object>> {
    public void reduce(AvroKey<Object> key, Iterator<AvroValue<Object>> values,
                      OutputCollector<AvroKey<Object>, AvroValue<Object>> output,
                      Reporter reporter) throws IOException;
    public void configure(JobConf job);
    public void close() throws IOException;
}

public class TetheredProcess {
    // Process management
    public TetheredProcess(JobConf job, TaskAttemptContext context);
    public void startProcess() throws IOException;
    public void stopProcess() throws IOException;
    
    // Communication
    public void writeInput(Object datum) throws IOException;
    public Object readOutput() throws IOException;
    
    // Status monitoring
    public boolean isAlive();
    public int getExitCode();
}

Tether Data Handling

Classes for managing data serialization and protocol communication with external processes.

public class TetherData {
    // Data serialization for external processes
    public static void writeDatum(OutputStream out, Object datum, Schema schema) throws IOException;
    public static Object readDatum(InputStream in, Schema schema) throws IOException;
    
    // Protocol message handling
    public static void writeMessage(OutputStream out, Object message) throws IOException;
    public static Object readMessage(InputStream in) throws IOException;
    
    // Schema transmission
    public static void sendSchema(OutputStream out, Schema schema) throws IOException;
    public static Schema receiveSchema(InputStream in) throws IOException;
}

public class TetherKeySerialization implements Serialization<AvroKey<Object>> {
    public boolean accept(Class<?> c);
    public Deserializer<AvroKey<Object>> getDeserializer(Class<AvroKey<Object>> c);
    public Serializer<AvroKey<Object>> getSerializer(Class<AvroKey<Object>> c);
}

public class TetherKeyComparator implements RawComparator<AvroKey<Object>>, Configurable {
    public int compare(AvroKey<Object> o1, AvroKey<Object> o2);
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
    public void setConf(Configuration conf);
    public Configuration getConf();
}

public class TetherPartitioner implements Partitioner<AvroKey<Object>, AvroValue<Object>>, Configurable {
    public int getPartition(AvroKey<Object> key, AvroValue<Object> value, int numPartitions);
    public void configure(JobConf job);
}

Tether Output Service

Service for handling output from external processes back to the Hadoop framework.

public class TetherOutputService {
    // Service management
    public TetherOutputService(TaskAttemptContext context);
    public void start() throws IOException;
    public void stop() throws IOException;
    
    // Output handling
    public void handleOutput(Object key, Object value) throws IOException;
    public void handleComplete() throws IOException;
    public void handleError(String error) throws IOException;
    
    // Status reporting
    public void reportProgress(float progress);
    public void reportCounter(String group, String name, long value);
}

Complete Example: Python Word Count

Python Script (wordcount.py)

#!/usr/bin/env python3
import sys
import json
import avro.schema
import avro.io
import io

def map_phase():
    """Map phase: read lines and emit word counts."""
    # Read input schema
    input_schema_json = sys.stdin.readline().strip()
    input_schema = avro.schema.parse(input_schema_json)
    
    # Read output schema  
    output_schema_json = sys.stdin.readline().strip()
    output_schema = avro.schema.parse(output_schema_json)
    
    # Create readers/writers
    decoder = avro.io.BinaryDecoder(sys.stdin.buffer)
    encoder = avro.io.BinaryEncoder(sys.stdout.buffer)
    reader = avro.io.DatumReader(input_schema)
    writer = avro.io.DatumWriter(output_schema)
    
    try:
        while True:
            # Read input line
            line = reader.read(decoder)
            
            # Split into words and emit pairs
            words = line.lower().split()
            for word in words:
                if word:
                    # Create key-value pair
                    pair = {"key": word, "value": 1}
                    writer.write(pair, encoder)
                    sys.stdout.buffer.flush()
                    
    except EOFError:
        pass  # End of input

def reduce_phase():
    """Reduce phase: sum counts for each word."""
    # Read schemas
    key_schema_json = sys.stdin.readline().strip()
    key_schema = avro.schema.parse(key_schema_json)
    
    value_schema_json = sys.stdin.readline().strip()  
    value_schema = avro.schema.parse(value_schema_json)
    
    output_schema_json = sys.stdin.readline().strip()
    output_schema = avro.schema.parse(output_schema_json)
    
    # Create readers/writers
    decoder = avro.io.BinaryDecoder(sys.stdin.buffer)
    encoder = avro.io.BinaryEncoder(sys.stdout.buffer)
    key_reader = avro.io.DatumReader(key_schema)
    value_reader = avro.io.DatumReader(value_schema)
    writer = avro.io.DatumWriter(output_schema)
    
    current_key = None
    count = 0
    
    try:
        while True:
            # Read key-value pair
            key = key_reader.read(decoder)
            value = value_reader.read(decoder)
            
            if current_key != key:
                # Emit previous key's count
                if current_key is not None:
                    result = {"key": current_key, "value": count}
                    writer.write(result, encoder)
                    sys.stdout.buffer.flush()
                
                # Start new key
                current_key = key
                count = value
            else:
                # Accumulate count
                count += value
                
    except EOFError:
        # Emit final count
        if current_key is not None:
            result = {"key": current_key, "value": count}
            writer.write(result, encoder)
            sys.stdout.buffer.flush()

if __name__ == "__main__":
    phase = sys.argv[1] if len(sys.argv) > 1 else "map"
    if phase == "map":
        map_phase()
    elif phase == "reduce":
        reduce_phase()

Java Job Configuration

import org.apache.avro.mapred.tether.TetherJob;
import org.apache.avro.Schema;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.Path;

public class PythonWordCountJob {
    public static void main(String[] args) throws Exception {
        JobConf job = new JobConf();
        job.setJobName("Python Word Count via Tether");
        
        // Configure Tether executable
        TetherJob.setExecutable(job, "/path/to/wordcount.py");
        
        // Define schemas
        Schema stringSchema = Schema.create(Schema.Type.STRING);
        Schema intSchema = Schema.create(Schema.Type.INT);  
        Schema pairSchema = Schema.createRecord("Pair", null, null, false);
        pairSchema.setFields(Arrays.asList(
            new Schema.Field("key", stringSchema, null, null),
            new Schema.Field("value", intSchema, null, null)
        ));
        
        // Configure schemas
        TetherJob.setInputSchema(job, stringSchema);
        TetherJob.setMapOutputSchema(job, pairSchema);
        TetherJob.setOutputSchema(job, pairSchema);
        
        // Set input/output paths
        job.setInputFormat(TetherInputFormat.class);
        job.setOutputFormat(TetherOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // Run job
        TetherJob.runJob(job);
    }
}

Protocol Communication

Message Protocol

Tether uses a JSON-based protocol for communication between Java and external processes:

{
  "type": "configure",
  "schemas": {
    "input": "{\"type\":\"string\"}",
    "output": "{\"type\":\"record\",\"name\":\"Pair\",...}"
  }
}

{
  "type": "map",
  "input": "Hello world"
}

{
  "type": "output", 
  "data": {"key": "hello", "value": 1}
}

{
  "type": "reduce",
  "key": "hello",
  "values": [1, 1, 1]
}

{
  "type": "complete"
}

Error Handling Protocol

{
  "type": "error",
  "message": "Failed to process input",
  "details": "Invalid schema format"
}

{
  "type": "progress",
  "value": 0.75
}

{
  "type": "counter",
  "group": "PROCESSING",
  "name": "RECORDS_PROCESSED", 
  "value": 1000
}

Language-Specific Examples

Python Integration

# Python Tether client library usage
from avro_tether import TetherMapper, TetherReducer

class WordCountMapper(TetherMapper):
    def map(self, datum):
        words = datum.lower().split()
        for word in words:
            self.emit({"key": word, "value": 1})

class WordCountReducer(TetherReducer):
    def reduce(self, key, values):
        total = sum(values)
        self.emit({"key": key, "value": total})

# Run with framework
if __name__ == "__main__":
    import sys
    if sys.argv[1] == "map":
        WordCountMapper().run()
    else:
        WordCountReducer().run()

C++ Integration

// C++ Tether client example
#include "avro_tether.h"

class WordCountMapper : public TetherMapper {
public:
    void map(const avro::GenericDatum& input) override {
        std::string line = input.value<std::string>();
        std::istringstream iss(line);
        std::string word;
        
        while (iss >> word) {
            avro::GenericRecord pair;
            pair.setField("key", word);
            pair.setField("value", 1);
            emit(pair);
        }
    }
};

int main(int argc, char** argv) {
    if (std::string(argv[1]) == "map") {
        WordCountMapper mapper;
        mapper.run();
    }
    return 0;
}

Performance Considerations

Process Management

// Configure process resources
job.setInt("mapreduce.map.memory.mb", 2048);
job.setInt("mapreduce.reduce.memory.mb", 4096);

// Set executable permissions
job.set("tether.executable.permissions", "755");

// Configure timeouts
job.setLong("tether.process.timeout", 300000); // 5 minutes

Data Transfer Optimization

// Configure buffer sizes for I/O
job.setInt("tether.io.buffer.size", 65536);

// Enable compression for data transfer  
job.setBoolean("tether.compress.data", true);
job.set("tether.compression.codec", "snappy");

Memory Management

# Python: Process data in streaming fashion
def map_phase():
    for line in sys.stdin:
        # Process immediately, don't accumulate
        process_and_emit(line)
        
def reduce_phase():
    current_key = None
    count = 0
    
    for key, value in read_key_values():
        if current_key != key:
            if current_key is not None:
                emit(current_key, count)
            current_key = key
            count = value
        else:
            count += value

Error Handling and Debugging

Process Monitoring

// Monitor external process health
public class TetherProcessMonitor {
    public void monitorProcess(TetheredProcess process) {
        while (process.isAlive()) {
            if (!process.isResponding()) {
                logger.warn("Tether process not responding, restarting");
                process.restart();
            }
            Thread.sleep(5000);
        }
    }
}

Error Recovery

// Configure retry behavior
job.setInt("tether.process.max.retries", 3);
job.setLong("tether.process.retry.delay", 10000);

// Enable detailed logging
job.setBoolean("tether.debug.enabled", true);
job.set("tether.log.level", "DEBUG");

Common Issues

  • Process Not Found: Ensure executable path is correct and accessible on all nodes
  • Schema Mismatch: Verify external process handles schemas correctly
  • Communication Timeout: Increase timeout values for complex processing
  • Memory Issues: Monitor memory usage in external processes
  • Permission Errors: Ensure executable has proper permissions on cluster nodes

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-mapred

docs

cross-language-processing.md

data-wrappers.md

file-utilities.md

index.md

input-output-formats.md

job-configuration.md

mapreduce-processing.md

serialization.md

tile.json