or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language.mddata-streams.mdindex.mdsource-functions.mdstream-operations.mdstreaming-context.md

cross-language.mddocs/

0

# Cross-Language Integration

1

2

This document covers Ray Streaming's cross-language support, enabling mixed Python and Java streaming applications with seamless integration and data exchange.

3

4

## Overview

5

6

Ray Streaming provides comprehensive cross-language support that allows:

7

- **Mixed Pipelines**: Combine Python and Java operators in the same streaming job

8

- **Stream Conversion**: Convert between Python and Java streams seamlessly

9

- **Cross-Language Serialization**: Automatic data serialization between languages

10

- **Operator Interoperability**: Use Java operators from Python and vice versa

11

- **Unified Execution**: Single runtime handles both Python and Java components

12

13

## Language Stream Types

14

15

Ray Streaming provides separate stream classes for each language with conversion capabilities.

16

17

### Python Streams

18

19

Standard Python-based streams with native Python operators.

20

21

```python { .api }

22

from ray.streaming.datastream import DataStream

23

24

class DataStream:

25

# Python operators (using Python functions)

26

def map(self, func) -> DataStream

27

def flat_map(self, func) -> DataStream

28

def filter(self, func) -> DataStream

29

def key_by(self, func) -> KeyDataStream

30

def reduce(self, func) -> DataStream

31

def sink(self, func) -> StreamSink

32

33

# Convert to Java stream

34

def as_java_stream(self) -> JavaDataStream

35

```

36

37

### Java Streams

38

39

Java-based streams for using Java operators from Python.

40

41

```python { .api }

42

from ray.streaming.datastream import JavaDataStream

43

44

class JavaDataStream:

45

# Java operators (using Java class names)

46

def map(self, java_func_class: str) -> JavaDataStream

47

def flat_map(self, java_func_class: str) -> JavaDataStream

48

def filter(self, java_func_class: str) -> JavaDataStream

49

def key_by(self, java_func_class: str) -> JavaKeyDataStream

50

def sink(self, java_func_class: str) -> JavaStreamSink

51

52

# Convert to Python stream

53

def as_python_stream(self) -> DataStream

54

```

55

56

## Capabilities

57

58

### Stream Conversion

59

60

Convert between Python and Java streams to use operators from either language.

61

62

```python

63

from ray.streaming import StreamingContext

64

65

ctx = StreamingContext.Builder().build()

66

67

# Start with Python stream

68

python_stream = ctx.from_values("hello", "world", "ray", "streaming") \

69

.map(lambda x: x.upper())

70

71

# Convert to Java stream for Java operators

72

java_stream = python_stream.as_java_stream() \

73

.map("io.ray.streaming.examples.WordCapitalizer") \

74

.filter("io.ray.streaming.examples.LongWordFilter")

75

76

# Convert back to Python stream

77

result_stream = java_stream.as_python_stream() \

78

.map(lambda x: f"Final: {x}") \

79

.sink(print)

80

81

ctx.submit("cross_language_job")

82

```

83

84

### Mixed Processing Pipelines

85

86

Create processing pipelines that leverage the strengths of both languages.

87

88

#### Example: Text Processing with Java NLP and Python Analytics

89

90

```python

91

from ray.streaming import StreamingContext

92

93

ctx = StreamingContext.Builder().build()

94

95

# Start with Python data ingestion

96

text_stream = ctx.read_text_file("documents.txt") \

97

.flat_map(lambda line: line.split('.')) # Split into sentences

98

99

# Use Java for NLP processing

100

processed_stream = text_stream.as_java_stream() \

101

.map("com.example.nlp.SentimentAnalyzer") \

102

.filter("com.example.nlp.PositiveSentimentFilter") \

103

.map("com.example.nlp.EntityExtractor")

104

105

# Return to Python for data analysis

106

analytics_stream = processed_stream.as_python_stream() \

107

.map(lambda result: parse_java_result(result)) \

108

.key_by(lambda analysis: analysis['entity_type']) \

109

.reduce(lambda old, new: combine_analytics(old, new)) \

110

.sink(lambda stats: save_to_database(stats))

111

112

ctx.submit("nlp_analytics_job")

113

```

114

115

#### Example: Real-time ML Pipeline

116

117

```python

118

# Python for data preprocessing, Java for ML inference

119

ml_pipeline = ctx.source(sensor_data_source) \

120

.map(lambda raw: preprocess_sensor_data(raw)) \

121

.filter(lambda data: data['quality_score'] > 0.8) \

122

.as_java_stream() \

123

.map("com.example.ml.TensorFlowPredictor") \

124

.map("com.example.ml.ModelEnsemble") \

125

.as_python_stream() \

126

.map(lambda prediction: post_process_prediction(prediction)) \

127

.sink(lambda result: send_alert_if_anomaly(result))

128

129

ctx.submit("ml_inference_job")

130

```

131

132

### Java Operator Integration

133

134

Use Java operators from Python by specifying the fully qualified class name.

135

136

#### Java Operator Classes

137

138

```java

139

// Example Java operators that can be used from Python

140

package com.example.operators;

141

142

public class StringReverser implements MapFunction<String, String> {

143

@Override

144

public String map(String value) {

145

return new StringBuilder(value).reverse().toString();

146

}

147

}

148

149

public class LengthFilter implements FilterFunction<String> {

150

@Override

151

public Boolean filter(String value) {

152

return value.length() > 5;

153

}

154

}

155

156

public class WordCounter implements ReduceFunction<Tuple2<String, Integer>> {

157

@Override

158

public Tuple2<String, Integer> reduce(Tuple2<String, Integer> old, Tuple2<String, Integer> new) {

159

return new Tuple2<>(old.f0, old.f1 + new.f1);

160

}

161

}

162

```

163

164

#### Using Java Operators from Python

165

166

```python

167

from ray.streaming import StreamingContext

168

169

ctx = StreamingContext.Builder().build()

170

171

# Use Java operators with fully qualified class names

172

ctx.from_values("hello", "streaming", "world", "processing") \

173

.as_java_stream() \

174

.map("com.example.operators.StringReverser") \

175

.filter("com.example.operators.LengthFilter") \

176

.as_python_stream() \

177

.map(lambda x: f"Processed: {x}") \

178

.sink(print)

179

180

# Mixed keyed operations

181

ctx.from_values("apple", "banana", "apple", "cherry", "banana") \

182

.map(lambda word: (word, 1)) \

183

.as_java_stream() \

184

.key_by("com.example.operators.TupleKeyExtractor") \

185

.reduce("com.example.operators.WordCounter") \

186

.as_python_stream() \

187

.sink(lambda result: print(f"Count: {result}"))

188

189

ctx.submit("java_operators_job")

190

```

191

192

### Python Operator Integration from Java

193

194

While the primary interface is Python, Java applications can also use Python operators.

195

196

#### Example Java Code Using Python Operators

197

198

```java

199

// Java code using Python operators

200

StreamingContext context = StreamingContext.buildContext();

201

DataStreamSource<String> source = DataStreamSource.fromCollection(

202

context, Arrays.asList("data1", "data2", "data3"));

203

204

source.map(x -> x.toUpperCase())

205

.asPythonStream()

206

.map("my_python_module", "custom_transform_function")

207

.filter("my_python_module", "quality_filter")

208

.asJavaStream()

209

.sink(value -> System.out.println("Result: " + value));

210

211

context.execute("mixed_java_python_job");

212

```

213

214

#### Python Module for Java Integration

215

216

```python

217

# my_python_module.py - Python functions callable from Java

218

219

def custom_transform_function(data):

220

"""Transform data using Python libraries"""

221

import json

222

import pandas as pd

223

224

# Use Python-specific libraries

225

parsed = json.loads(data) if isinstance(data, str) else data

226

df = pd.DataFrame([parsed])

227

# Perform pandas operations

228

return df.to_dict('records')[0]

229

230

def quality_filter(data):

231

"""Filter using Python logic"""

232

return isinstance(data, dict) and data.get('quality', 0) > 0.5

233

```

234

235

## Data Serialization

236

237

Ray Streaming handles automatic serialization between Python and Java components.

238

239

### Serialization Types

240

241

```python { .api }

242

# Serialization type constants

243

from ray.streaming.runtime.serialization import Serializer

244

245

class Serializer:

246

CROSS_LANG_TYPE_ID = 0 # Cross-language serialization

247

JAVA_TYPE_ID = 1 # Java-specific serialization

248

PYTHON_TYPE_ID = 2 # Python-specific serialization

249

```

250

251

### Supported Data Types

252

253

Ray Streaming automatically handles serialization for common data types:

254

255

- **Primitives**: int, float, string, boolean

256

- **Collections**: list, dict, tuple

257

- **Custom Objects**: Objects implementing serialization interfaces

258

- **Complex Data**: JSON-serializable structures

259

260

```python

261

# Data types that work seamlessly across languages

262

simple_data = ctx.from_values(

263

42, # int

264

3.14, # float

265

"hello", # string

266

True, # boolean

267

[1, 2, 3], # list

268

{"key": "value"}, # dict

269

("a", "b", "c") # tuple

270

)

271

272

# Complex structured data

273

complex_data = ctx.from_values({

274

"user_id": 12345,

275

"name": "John Doe",

276

"scores": [95, 87, 92],

277

"metadata": {

278

"created": "2024-01-01",

279

"active": True

280

}

281

})

282

283

# Both work with cross-language operations

284

simple_data.as_java_stream() \

285

.map("com.example.DataProcessor") \

286

.as_python_stream() \

287

.sink(print)

288

289

complex_data.as_java_stream() \

290

.filter("com.example.ComplexDataFilter") \

291

.as_python_stream() \

292

.map(lambda x: f"Processed: {x}") \

293

.sink(print)

294

```

295

296

## Advanced Cross-Language Patterns

297

298

### Language-Specific Processing Stages

299

300

Organize processing pipeline by language strengths.

301

302

```python

303

def create_multi_language_pipeline(ctx):

304

"""Create pipeline leveraging each language's strengths"""

305

306

# Stage 1: Python for data ingestion and preprocessing

307

raw_data = ctx.source(custom_data_source) \

308

.map(lambda x: json.loads(x)) \

309

.filter(lambda x: validate_data_quality(x)) \

310

.map(lambda x: normalize_data_format(x))

311

312

# Stage 2: Java for high-performance processing

313

processed_data = raw_data.as_java_stream() \

314

.map("com.example.HighPerformanceProcessor") \

315

.filter("com.example.BusinessRuleValidator") \

316

.map("com.example.DataEnricher")

317

318

# Stage 3: Python for ML and analytics

319

analyzed_data = processed_data.as_python_stream() \

320

.map(lambda x: apply_ml_model(x)) \

321

.key_by(lambda x: x['category']) \

322

.reduce(lambda old, new: aggregate_analytics(old, new))

323

324

# Stage 4: Java for enterprise integration

325

final_output = analyzed_data.as_java_stream() \

326

.map("com.example.enterprise.MessageFormatter") \

327

.sink("com.example.enterprise.KafkaSink")

328

329

return final_output

330

```

331

332

### Error Handling Across Languages

333

334

Handle errors that may occur in cross-language operations.

335

336

```python

337

class RobustCrossLanguageProcessor:

338

def process_with_fallback(self, ctx, data_stream):

339

try:

340

# Try Java processing first

341

result = data_stream.as_java_stream() \

342

.map("com.example.OptimizedProcessor") \

343

.as_python_stream()

344

except Exception as java_error:

345

print(f"Java processing failed: {java_error}")

346

# Fallback to Python processing

347

result = data_stream.map(lambda x: self.python_fallback_processor(x))

348

349

return result.sink(self.error_tolerant_sink)

350

351

def python_fallback_processor(self, data):

352

# Pure Python implementation as fallback

353

return {"processed": True, "data": data, "method": "python_fallback"}

354

355

def error_tolerant_sink(self, data):

356

try:

357

# Attempt to sink data

358

print(f"Output: {data}")

359

except Exception as e:

360

print(f"Sink error: {e}, data: {data}")

361

```

362

363

### Performance Optimization

364

365

Optimize cross-language pipelines for performance.

366

367

```python

368

def optimized_cross_language_pipeline(ctx):

369

"""Performance-optimized cross-language pipeline"""

370

371

# Minimize language switches

372

data = ctx.source(high_volume_source) \

373

.set_parallelism(8) # High parallelism for ingestion

374

375

# Batch Python operations together

376

python_processed = data \

377

.map(preprocess_func) \

378

.filter(quality_check_func) \

379

.map(feature_extraction_func)

380

381

# Single switch to Java for batch operations

382

java_processed = python_processed.as_java_stream() \

383

.map("com.example.BatchProcessor") \

384

.filter("com.example.BatchValidator") \

385

.map("com.example.BatchEnricher")

386

387

# Single switch back to Python for final operations

388

final_result = java_processed.as_python_stream() \

389

.key_by(lambda x: x['partition_key']) \

390

.reduce(efficient_reduce_func) \

391

.sink(optimized_sink_func)

392

393

return final_result

394

```

395

396

## Configuration for Cross-Language Jobs

397

398

Configure Ray Streaming for optimal cross-language performance.

399

400

### Job Configuration

401

402

```python

403

# Cross-language job configuration

404

ctx = StreamingContext.Builder() \

405

.option("streaming.cross-lang.enabled", "true") \

406

.option("streaming.serialization.type", "CROSS_LANG") \

407

.option("streaming.java.classpath", "/path/to/java/classes") \

408

.option("streaming.python.module.path", "/path/to/python/modules") \

409

.build()

410

```

411

412

### Memory and Resource Configuration

413

414

```python

415

# Configure resources for mixed workloads

416

ctx = StreamingContext.Builder() \

417

.option("streaming.worker-num", "6") \

418

.option("streaming.java.worker.memory", "4GB") \

419

.option("streaming.python.worker.memory", "2GB") \

420

.option("streaming.serialization.buffer.size", "8MB") \

421

.build()

422

```

423

424

## Best Practices

425

426

### Cross-Language Development Guidelines

427

428

1. **Minimize Language Switches**: Group operations by language to reduce serialization overhead

429

2. **Use Appropriate Languages**: Leverage each language's strengths (Java for performance, Python for flexibility)

430

3. **Handle Serialization**: Ensure data types are compatible across language boundaries

431

4. **Error Handling**: Implement robust error handling for cross-language failures

432

5. **Testing**: Test both language paths thoroughly

433

6. **Performance Monitoring**: Monitor serialization and conversion overhead

434

435

### Example Best Practice Implementation

436

437

```python

438

class CrossLanguageBestPractices:

439

def __init__(self, ctx):

440

self.ctx = ctx

441

442

def efficient_pipeline(self, source_data):

443

"""Implement best practices for cross-language pipeline"""

444

445

# 1. Group Python operations

446

python_stage = source_data \

447

.map(self.validate_input) \

448

.filter(self.quality_check) \

449

.map(self.extract_features)

450

451

# 2. Single conversion to Java for performance-critical operations

452

java_stage = python_stage.as_java_stream() \

453

.map("com.example.PerformanceCriticalProcessor") \

454

.filter("com.example.HighThroughputFilter") \

455

.map("com.example.OptimizedTransformer")

456

457

# 3. Single conversion back to Python for final processing

458

final_stage = java_stage.as_python_stream() \

459

.map(self.post_process) \

460

.sink(self.reliable_sink)

461

462

return final_stage

463

464

def validate_input(self, data):

465

"""Input validation in Python"""

466

if not isinstance(data, dict) or 'id' not in data:

467

raise ValueError("Invalid input format")

468

return data

469

470

def quality_check(self, data):

471

"""Quality filtering in Python"""

472

return data.get('quality_score', 0) > 0.7

473

474

def extract_features(self, data):

475

"""Feature extraction using Python libraries"""

476

# Use pandas, numpy, etc. for feature engineering

477

return {"features": data, "timestamp": time.time()}

478

479

def post_process(self, data):

480

"""Post-processing in Python"""

481

return f"Final result: {data}"

482

483

def reliable_sink(self, data):

484

"""Error-tolerant sink"""

485

try:

486

print(f"Output: {data}")

487

except Exception as e:

488

print(f"Sink error handled: {e}")

489

```

490

491

## Troubleshooting

492

493

### Common Issues and Solutions

494

495

1. **Serialization Errors**: Ensure data types are serializable across languages

496

2. **ClassPath Issues**: Verify Java classes are in the classpath

497

3. **Module Import Errors**: Check Python module paths are configured correctly

498

4. **Performance Issues**: Minimize language switches and optimize batch sizes

499

5. **Version Compatibility**: Ensure Ray Streaming versions are compatible across languages

500

501

### Debugging Cross-Language Operations

502

503

```python

504

# Enable debugging for cross-language operations

505

ctx = StreamingContext.Builder() \

506

.option("streaming.cross-lang.debug", "true") \

507

.option("streaming.log.level", "DEBUG") \

508

.build()

509

510

# Add logging to track language conversions

511

def debug_conversion(data):

512

print(f"Converting data: {type(data)} -> {data}")

513

return data

514

515

data_stream.map(debug_conversion) \

516

.as_java_stream() \

517

.map("com.example.DebugProcessor") \

518

.as_python_stream() \

519

.map(debug_conversion) \

520

.sink(print)

521

```

522

523

## See Also

524

525

- [Data Streams Documentation](./data-streams.md) - Stream classes and transformations

526

- [Stream Operations Documentation](./stream-operations.md) - Available stream operations

527

- [Streaming Context Documentation](./streaming-context.md) - Job management and configuration

528

- [Source Functions Documentation](./source-functions.md) - Custom data source implementation