or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language-processing.mddata-wrappers.mdfile-utilities.mdindex.mdinput-output-formats.mdjob-configuration.mdmapreduce-processing.mdserialization.md

cross-language-processing.mddocs/

0

# Cross-Language Processing (Tether)

1

2

Tether framework for implementing MapReduce jobs in non-Java languages while maintaining Avro data integration and schema compatibility. Tether enables developers to write MapReduce logic in languages like Python, C++, or any language that can communicate via standard input/output, while still benefiting from Avro's schema evolution and efficient serialization.

3

4

## Capabilities

5

6

### Tether Job Configuration

7

8

Main class for configuring and executing Tether-based MapReduce jobs.

9

10

```java { .api }

11

public class TetherJob {

12

// Executable configuration

13

public static void setExecutable(JobConf job, File executable);

14

public static void setExecutable(JobConf job, String executable);

15

16

// Schema configuration

17

public static void setInputSchema(JobConf job, Schema schema);

18

public static void setMapOutputSchema(JobConf job, Schema schema);

19

public static void setOutputSchema(JobConf job, Schema schema);

20

21

// Protocol configuration

22

public static void setProtocol(JobConf job, Protocol protocol);

23

24

// Job execution

25

public static void submit(JobConf job) throws IOException;

26

public static RunningJob runJob(JobConf job) throws IOException;

27

}

28

```

29

30

#### Usage Example

31

32

```java

33

import org.apache.avro.mapred.tether.TetherJob;

34

import org.apache.avro.Schema;

35

import org.apache.hadoop.mapred.JobConf;

36

37

// Configure Tether job for Python MapReduce script

38

JobConf job = new JobConf();

39

job.setJobName("Python Word Count via Tether");

40

41

// Set executable (Python script)

42

TetherJob.setExecutable(job, new File("/path/to/wordcount.py"));

43

44

// Configure schemas

45

Schema stringSchema = Schema.create(Schema.Type.STRING);

46

Schema intSchema = Schema.create(Schema.Type.INT);

47

Schema pairSchema = Pair.getPairSchema(stringSchema, intSchema);

48

49

TetherJob.setInputSchema(job, stringSchema);

50

TetherJob.setMapOutputSchema(job, pairSchema);

51

TetherJob.setOutputSchema(job, pairSchema);

52

53

// Set input/output paths

54

FileInputFormat.setInputPaths(job, new Path("/input"));

55

FileOutputFormat.setOutputPath(job, new Path("/output"));

56

57

// Submit job

58

TetherJob.runJob(job);

59

```

60

61

### Tether Input/Output Formats

62

63

Specialized formats for reading and writing data in Tether jobs.

64

65

```java { .api }

66

public class TetherInputFormat extends FileInputFormat<AvroKey<Object>, AvroValue<Object>> {

67

public RecordReader<AvroKey<Object>, AvroValue<Object>> createRecordReader(

68

InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

69

}

70

71

public class TetherOutputFormat extends AvroOutputFormat<Object> {

72

public RecordWriter<AvroKey<Object>, AvroValue<Object>> getRecordWriter(TaskAttemptContext context)

73

throws IOException, InterruptedException;

74

}

75

76

public class TetherRecordReader extends RecordReader<AvroKey<Object>, AvroValue<Object>> {

77

public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

78

public boolean nextKeyValue() throws IOException, InterruptedException;

79

public AvroKey<Object> getCurrentKey() throws IOException, InterruptedException;

80

public AvroValue<Object> getCurrentValue() throws IOException, InterruptedException;

81

public float getProgress() throws IOException, InterruptedException;

82

public void close() throws IOException;

83

}

84

```

85

86

### Tether Execution Framework

87

88

Classes that manage the execution of external processes and communication protocols.

89

90

```java { .api }

91

public class TetherMapRunner implements MapRunnable<AvroKey<Object>, AvroValue<Object>, AvroKey<Object>, AvroValue<Object>> {

92

public void run(RecordReader<AvroKey<Object>, AvroValue<Object>> input,

93

OutputCollector<AvroKey<Object>, AvroValue<Object>> output,

94

Reporter reporter) throws IOException;

95

}

96

97

public class TetherReducer implements Reducer<AvroKey<Object>, AvroValue<Object>, AvroKey<Object>, AvroValue<Object>> {

98

public void reduce(AvroKey<Object> key, Iterator<AvroValue<Object>> values,

99

OutputCollector<AvroKey<Object>, AvroValue<Object>> output,

100

Reporter reporter) throws IOException;

101

public void configure(JobConf job);

102

public void close() throws IOException;

103

}

104

105

public class TetheredProcess {

106

// Process management

107

public TetheredProcess(JobConf job, TaskAttemptContext context);

108

public void startProcess() throws IOException;

109

public void stopProcess() throws IOException;

110

111

// Communication

112

public void writeInput(Object datum) throws IOException;

113

public Object readOutput() throws IOException;

114

115

// Status monitoring

116

public boolean isAlive();

117

public int getExitCode();

118

}

119

```

120

121

### Tether Data Handling

122

123

Classes for managing data serialization and protocol communication with external processes.

124

125

```java { .api }

126

public class TetherData {

127

// Data serialization for external processes

128

public static void writeDatum(OutputStream out, Object datum, Schema schema) throws IOException;

129

public static Object readDatum(InputStream in, Schema schema) throws IOException;

130

131

// Protocol message handling

132

public static void writeMessage(OutputStream out, Object message) throws IOException;

133

public static Object readMessage(InputStream in) throws IOException;

134

135

// Schema transmission

136

public static void sendSchema(OutputStream out, Schema schema) throws IOException;

137

public static Schema receiveSchema(InputStream in) throws IOException;

138

}

139

140

public class TetherKeySerialization implements Serialization<AvroKey<Object>> {

141

public boolean accept(Class<?> c);

142

public Deserializer<AvroKey<Object>> getDeserializer(Class<AvroKey<Object>> c);

143

public Serializer<AvroKey<Object>> getSerializer(Class<AvroKey<Object>> c);

144

}

145

146

public class TetherKeyComparator implements RawComparator<AvroKey<Object>>, Configurable {

147

public int compare(AvroKey<Object> o1, AvroKey<Object> o2);

148

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

149

public void setConf(Configuration conf);

150

public Configuration getConf();

151

}

152

153

public class TetherPartitioner implements Partitioner<AvroKey<Object>, AvroValue<Object>>, Configurable {

154

public int getPartition(AvroKey<Object> key, AvroValue<Object> value, int numPartitions);

155

public void configure(JobConf job);

156

}

157

```

158

159

### Tether Output Service

160

161

Service for handling output from external processes back to the Hadoop framework.

162

163

```java { .api }

164

public class TetherOutputService {

165

// Service management

166

public TetherOutputService(TaskAttemptContext context);

167

public void start() throws IOException;

168

public void stop() throws IOException;

169

170

// Output handling

171

public void handleOutput(Object key, Object value) throws IOException;

172

public void handleComplete() throws IOException;

173

public void handleError(String error) throws IOException;

174

175

// Status reporting

176

public void reportProgress(float progress);

177

public void reportCounter(String group, String name, long value);

178

}

179

```

180

181

## Complete Example: Python Word Count

182

183

### Python Script (wordcount.py)

184

185

```python

186

#!/usr/bin/env python3

187

import sys

188

import json

189

import avro.schema

190

import avro.io

191

import io

192

193

def map_phase():

194

"""Map phase: read lines and emit word counts."""

195

# Read input schema

196

input_schema_json = sys.stdin.readline().strip()

197

input_schema = avro.schema.parse(input_schema_json)

198

199

# Read output schema

200

output_schema_json = sys.stdin.readline().strip()

201

output_schema = avro.schema.parse(output_schema_json)

202

203

# Create readers/writers

204

decoder = avro.io.BinaryDecoder(sys.stdin.buffer)

205

encoder = avro.io.BinaryEncoder(sys.stdout.buffer)

206

reader = avro.io.DatumReader(input_schema)

207

writer = avro.io.DatumWriter(output_schema)

208

209

try:

210

while True:

211

# Read input line

212

line = reader.read(decoder)

213

214

# Split into words and emit pairs

215

words = line.lower().split()

216

for word in words:

217

if word:

218

# Create key-value pair

219

pair = {"key": word, "value": 1}

220

writer.write(pair, encoder)

221

sys.stdout.buffer.flush()

222

223

except EOFError:

224

pass # End of input

225

226

def reduce_phase():

227

"""Reduce phase: sum counts for each word."""

228

# Read schemas

229

key_schema_json = sys.stdin.readline().strip()

230

key_schema = avro.schema.parse(key_schema_json)

231

232

value_schema_json = sys.stdin.readline().strip()

233

value_schema = avro.schema.parse(value_schema_json)

234

235

output_schema_json = sys.stdin.readline().strip()

236

output_schema = avro.schema.parse(output_schema_json)

237

238

# Create readers/writers

239

decoder = avro.io.BinaryDecoder(sys.stdin.buffer)

240

encoder = avro.io.BinaryEncoder(sys.stdout.buffer)

241

key_reader = avro.io.DatumReader(key_schema)

242

value_reader = avro.io.DatumReader(value_schema)

243

writer = avro.io.DatumWriter(output_schema)

244

245

current_key = None

246

count = 0

247

248

try:

249

while True:

250

# Read key-value pair

251

key = key_reader.read(decoder)

252

value = value_reader.read(decoder)

253

254

if current_key != key:

255

# Emit previous key's count

256

if current_key is not None:

257

result = {"key": current_key, "value": count}

258

writer.write(result, encoder)

259

sys.stdout.buffer.flush()

260

261

# Start new key

262

current_key = key

263

count = value

264

else:

265

# Accumulate count

266

count += value

267

268

except EOFError:

269

# Emit final count

270

if current_key is not None:

271

result = {"key": current_key, "value": count}

272

writer.write(result, encoder)

273

sys.stdout.buffer.flush()

274

275

if __name__ == "__main__":

276

phase = sys.argv[1] if len(sys.argv) > 1 else "map"

277

if phase == "map":

278

map_phase()

279

elif phase == "reduce":

280

reduce_phase()

281

```

282

283

### Java Job Configuration

284

285

```java

286

import org.apache.avro.mapred.tether.TetherJob;

287

import org.apache.avro.Schema;

288

import org.apache.hadoop.mapred.JobConf;

289

import org.apache.hadoop.fs.Path;

290

291

public class PythonWordCountJob {

292

public static void main(String[] args) throws Exception {

293

JobConf job = new JobConf();

294

job.setJobName("Python Word Count via Tether");

295

296

// Configure Tether executable

297

TetherJob.setExecutable(job, "/path/to/wordcount.py");

298

299

// Define schemas

300

Schema stringSchema = Schema.create(Schema.Type.STRING);

301

Schema intSchema = Schema.create(Schema.Type.INT);

302

Schema pairSchema = Schema.createRecord("Pair", null, null, false);

303

pairSchema.setFields(Arrays.asList(

304

new Schema.Field("key", stringSchema, null, null),

305

new Schema.Field("value", intSchema, null, null)

306

));

307

308

// Configure schemas

309

TetherJob.setInputSchema(job, stringSchema);

310

TetherJob.setMapOutputSchema(job, pairSchema);

311

TetherJob.setOutputSchema(job, pairSchema);

312

313

// Set input/output paths

314

job.setInputFormat(TetherInputFormat.class);

315

job.setOutputFormat(TetherOutputFormat.class);

316

FileInputFormat.setInputPaths(job, new Path(args[0]));

317

FileOutputFormat.setOutputPath(job, new Path(args[1]));

318

319

// Run job

320

TetherJob.runJob(job);

321

}

322

}

323

```

324

325

## Protocol Communication

326

327

### Message Protocol

328

329

Tether uses a JSON-based protocol for communication between Java and external processes:

330

331

```json

332

{

333

"type": "configure",

334

"schemas": {

335

"input": "{\"type\":\"string\"}",

336

"output": "{\"type\":\"record\",\"name\":\"Pair\",...}"

337

}

338

}

339

340

{

341

"type": "map",

342

"input": "Hello world"

343

}

344

345

{

346

"type": "output",

347

"data": {"key": "hello", "value": 1}

348

}

349

350

{

351

"type": "reduce",

352

"key": "hello",

353

"values": [1, 1, 1]

354

}

355

356

{

357

"type": "complete"

358

}

359

```

360

361

### Error Handling Protocol

362

363

```json

364

{

365

"type": "error",

366

"message": "Failed to process input",

367

"details": "Invalid schema format"

368

}

369

370

{

371

"type": "progress",

372

"value": 0.75

373

}

374

375

{

376

"type": "counter",

377

"group": "PROCESSING",

378

"name": "RECORDS_PROCESSED",

379

"value": 1000

380

}

381

```

382

383

## Language-Specific Examples

384

385

### Python Integration

386

387

```python

388

# Python Tether client library usage

389

from avro_tether import TetherMapper, TetherReducer

390

391

class WordCountMapper(TetherMapper):

392

def map(self, datum):

393

words = datum.lower().split()

394

for word in words:

395

self.emit({"key": word, "value": 1})

396

397

class WordCountReducer(TetherReducer):

398

def reduce(self, key, values):

399

total = sum(values)

400

self.emit({"key": key, "value": total})

401

402

# Run with framework

403

if __name__ == "__main__":

404

import sys

405

if sys.argv[1] == "map":

406

WordCountMapper().run()

407

else:

408

WordCountReducer().run()

409

```

410

411

### C++ Integration

412

413

```cpp

414

// C++ Tether client example

415

#include "avro_tether.h"

416

417

class WordCountMapper : public TetherMapper {

418

public:

419

void map(const avro::GenericDatum& input) override {

420

std::string line = input.value<std::string>();

421

std::istringstream iss(line);

422

std::string word;

423

424

while (iss >> word) {

425

avro::GenericRecord pair;

426

pair.setField("key", word);

427

pair.setField("value", 1);

428

emit(pair);

429

}

430

}

431

};

432

433

int main(int argc, char** argv) {

434

if (std::string(argv[1]) == "map") {

435

WordCountMapper mapper;

436

mapper.run();

437

}

438

return 0;

439

}

440

```

441

442

## Performance Considerations

443

444

### Process Management

445

446

```java

447

// Configure process resources

448

job.setInt("mapreduce.map.memory.mb", 2048);

449

job.setInt("mapreduce.reduce.memory.mb", 4096);

450

451

// Set executable permissions

452

job.set("tether.executable.permissions", "755");

453

454

// Configure timeouts

455

job.setLong("tether.process.timeout", 300000); // 5 minutes

456

```

457

458

### Data Transfer Optimization

459

460

```java

461

// Configure buffer sizes for I/O

462

job.setInt("tether.io.buffer.size", 65536);

463

464

// Enable compression for data transfer

465

job.setBoolean("tether.compress.data", true);

466

job.set("tether.compression.codec", "snappy");

467

```

468

469

### Memory Management

470

471

```python

472

# Python: Process data in streaming fashion

473

def map_phase():

474

for line in sys.stdin:

475

# Process immediately, don't accumulate

476

process_and_emit(line)

477

478

def reduce_phase():

479

current_key = None

480

count = 0

481

482

for key, value in read_key_values():

483

if current_key != key:

484

if current_key is not None:

485

emit(current_key, count)

486

current_key = key

487

count = value

488

else:

489

count += value

490

```

491

492

## Error Handling and Debugging

493

494

### Process Monitoring

495

496

```java

497

// Monitor external process health

498

public class TetherProcessMonitor {

499

public void monitorProcess(TetheredProcess process) {

500

while (process.isAlive()) {

501

if (!process.isResponding()) {

502

logger.warn("Tether process not responding, restarting");

503

process.restart();

504

}

505

Thread.sleep(5000);

506

}

507

}

508

}

509

```

510

511

### Error Recovery

512

513

```java

514

// Configure retry behavior

515

job.setInt("tether.process.max.retries", 3);

516

job.setLong("tether.process.retry.delay", 10000);

517

518

// Enable detailed logging

519

job.setBoolean("tether.debug.enabled", true);

520

job.set("tether.log.level", "DEBUG");

521

```

522

523

### Common Issues

524

525

- **Process Not Found**: Ensure executable path is correct and accessible on all nodes

526

- **Schema Mismatch**: Verify external process handles schemas correctly

527

- **Communication Timeout**: Increase timeout values for complex processing

528

- **Memory Issues**: Monitor memory usage in external processes

529

- **Permission Errors**: Ensure executable has proper permissions on cluster nodes