0
# Cross-Language Integration
1
2
This document covers Ray Streaming's cross-language support, enabling mixed Python and Java streaming applications with seamless integration and data exchange.
3
4
## Overview
5
6
Ray Streaming provides comprehensive cross-language support that allows:
7
- **Mixed Pipelines**: Combine Python and Java operators in the same streaming job
8
- **Stream Conversion**: Convert between Python and Java streams seamlessly
9
- **Cross-Language Serialization**: Automatic data serialization between languages
10
- **Operator Interoperability**: Use Java operators from Python and vice versa
11
- **Unified Execution**: Single runtime handles both Python and Java components
12
13
## Language Stream Types
14
15
Ray Streaming provides separate stream classes for each language with conversion capabilities.
16
17
### Python Streams
18
19
Standard Python-based streams with native Python operators.
20
21
```python { .api }
22
from ray.streaming.datastream import DataStream
23
24
class DataStream:
25
# Python operators (using Python functions)
26
def map(self, func) -> DataStream
27
def flat_map(self, func) -> DataStream
28
def filter(self, func) -> DataStream
29
def key_by(self, func) -> KeyDataStream
30
def reduce(self, func) -> DataStream
31
def sink(self, func) -> StreamSink
32
33
# Convert to Java stream
34
def as_java_stream(self) -> JavaDataStream
35
```
36
37
### Java Streams
38
39
Java-based streams for using Java operators from Python.
40
41
```python { .api }
42
from ray.streaming.datastream import JavaDataStream
43
44
class JavaDataStream:
45
# Java operators (using Java class names)
46
def map(self, java_func_class: str) -> JavaDataStream
47
def flat_map(self, java_func_class: str) -> JavaDataStream
48
def filter(self, java_func_class: str) -> JavaDataStream
49
def key_by(self, java_func_class: str) -> JavaKeyDataStream
50
def sink(self, java_func_class: str) -> JavaStreamSink
51
52
# Convert to Python stream
53
def as_python_stream(self) -> DataStream
54
```
55
56
## Capabilities
57
58
### Stream Conversion
59
60
Convert between Python and Java streams to use operators from either language.
61
62
```python
63
from ray.streaming import StreamingContext
64
65
ctx = StreamingContext.Builder().build()
66
67
# Start with Python stream
68
python_stream = ctx.from_values("hello", "world", "ray", "streaming") \
69
.map(lambda x: x.upper())
70
71
# Convert to Java stream for Java operators
72
java_stream = python_stream.as_java_stream() \
73
.map("io.ray.streaming.examples.WordCapitalizer") \
74
.filter("io.ray.streaming.examples.LongWordFilter")
75
76
# Convert back to Python stream
77
result_stream = java_stream.as_python_stream() \
78
.map(lambda x: f"Final: {x}") \
79
.sink(print)
80
81
ctx.submit("cross_language_job")
82
```
83
84
### Mixed Processing Pipelines
85
86
Create processing pipelines that leverage the strengths of both languages.
87
88
#### Example: Text Processing with Java NLP and Python Analytics
89
90
```python
91
from ray.streaming import StreamingContext
92
93
ctx = StreamingContext.Builder().build()
94
95
# Start with Python data ingestion
96
text_stream = ctx.read_text_file("documents.txt") \
97
.flat_map(lambda line: line.split('.')) # Split into sentences
98
99
# Use Java for NLP processing
100
processed_stream = text_stream.as_java_stream() \
101
.map("com.example.nlp.SentimentAnalyzer") \
102
.filter("com.example.nlp.PositiveSentimentFilter") \
103
.map("com.example.nlp.EntityExtractor")
104
105
# Return to Python for data analysis
106
analytics_stream = processed_stream.as_python_stream() \
107
.map(lambda result: parse_java_result(result)) \
108
.key_by(lambda analysis: analysis['entity_type']) \
109
.reduce(lambda old, new: combine_analytics(old, new)) \
110
.sink(lambda stats: save_to_database(stats))
111
112
ctx.submit("nlp_analytics_job")
113
```
114
115
#### Example: Real-time ML Pipeline
116
117
```python
118
# Python for data preprocessing, Java for ML inference
119
ml_pipeline = ctx.source(sensor_data_source) \
120
.map(lambda raw: preprocess_sensor_data(raw)) \
121
.filter(lambda data: data['quality_score'] > 0.8) \
122
.as_java_stream() \
123
.map("com.example.ml.TensorFlowPredictor") \
124
.map("com.example.ml.ModelEnsemble") \
125
.as_python_stream() \
126
.map(lambda prediction: post_process_prediction(prediction)) \
127
.sink(lambda result: send_alert_if_anomaly(result))
128
129
ctx.submit("ml_inference_job")
130
```
131
132
### Java Operator Integration
133
134
Use Java operators from Python by specifying the fully qualified class name.
135
136
#### Java Operator Classes
137
138
```java
139
// Example Java operators that can be used from Python
140
package com.example.operators;
141
142
public class StringReverser implements MapFunction<String, String> {
143
@Override
144
public String map(String value) {
145
return new StringBuilder(value).reverse().toString();
146
}
147
}
148
149
public class LengthFilter implements FilterFunction<String> {
150
@Override
151
public Boolean filter(String value) {
152
return value.length() > 5;
153
}
154
}
155
156
public class WordCounter implements ReduceFunction<Tuple2<String, Integer>> {
157
@Override
158
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> old, Tuple2<String, Integer> new) {
159
return new Tuple2<>(old.f0, old.f1 + new.f1);
160
}
161
}
162
```
163
164
#### Using Java Operators from Python
165
166
```python
167
from ray.streaming import StreamingContext
168
169
ctx = StreamingContext.Builder().build()
170
171
# Use Java operators with fully qualified class names
172
ctx.from_values("hello", "streaming", "world", "processing") \
173
.as_java_stream() \
174
.map("com.example.operators.StringReverser") \
175
.filter("com.example.operators.LengthFilter") \
176
.as_python_stream() \
177
.map(lambda x: f"Processed: {x}") \
178
.sink(print)
179
180
# Mixed keyed operations
181
ctx.from_values("apple", "banana", "apple", "cherry", "banana") \
182
.map(lambda word: (word, 1)) \
183
.as_java_stream() \
184
.key_by("com.example.operators.TupleKeyExtractor") \
185
.reduce("com.example.operators.WordCounter") \
186
.as_python_stream() \
187
.sink(lambda result: print(f"Count: {result}"))
188
189
ctx.submit("java_operators_job")
190
```
191
192
### Python Operator Integration from Java
193
194
While the primary interface is Python, Java applications can also use Python operators.
195
196
#### Example Java Code Using Python Operators
197
198
```java
199
// Java code using Python operators
200
StreamingContext context = StreamingContext.buildContext();
201
DataStreamSource<String> source = DataStreamSource.fromCollection(
202
context, Arrays.asList("data1", "data2", "data3"));
203
204
source.map(x -> x.toUpperCase())
205
.asPythonStream()
206
.map("my_python_module", "custom_transform_function")
207
.filter("my_python_module", "quality_filter")
208
.asJavaStream()
209
.sink(value -> System.out.println("Result: " + value));
210
211
context.execute("mixed_java_python_job");
212
```
213
214
#### Python Module for Java Integration
215
216
```python
217
# my_python_module.py - Python functions callable from Java
218
219
def custom_transform_function(data):
220
"""Transform data using Python libraries"""
221
import json
222
import pandas as pd
223
224
# Use Python-specific libraries
225
parsed = json.loads(data) if isinstance(data, str) else data
226
df = pd.DataFrame([parsed])
227
# Perform pandas operations
228
return df.to_dict('records')[0]
229
230
def quality_filter(data):
231
"""Filter using Python logic"""
232
return isinstance(data, dict) and data.get('quality', 0) > 0.5
233
```
234
235
## Data Serialization
236
237
Ray Streaming handles automatic serialization between Python and Java components.
238
239
### Serialization Types
240
241
```python { .api }
242
# Serialization type constants
243
from ray.streaming.runtime.serialization import Serializer
244
245
class Serializer:
246
CROSS_LANG_TYPE_ID = 0 # Cross-language serialization
247
JAVA_TYPE_ID = 1 # Java-specific serialization
248
PYTHON_TYPE_ID = 2 # Python-specific serialization
249
```
250
251
### Supported Data Types
252
253
Ray Streaming automatically handles serialization for common data types:
254
255
- **Primitives**: int, float, string, boolean
256
- **Collections**: list, dict, tuple
257
- **Custom Objects**: Objects implementing serialization interfaces
258
- **Complex Data**: JSON-serializable structures
259
260
```python
261
# Data types that work seamlessly across languages
262
simple_data = ctx.from_values(
263
42, # int
264
3.14, # float
265
"hello", # string
266
True, # boolean
267
[1, 2, 3], # list
268
{"key": "value"}, # dict
269
("a", "b", "c") # tuple
270
)
271
272
# Complex structured data
273
complex_data = ctx.from_values({
274
"user_id": 12345,
275
"name": "John Doe",
276
"scores": [95, 87, 92],
277
"metadata": {
278
"created": "2024-01-01",
279
"active": True
280
}
281
})
282
283
# Both work with cross-language operations
284
simple_data.as_java_stream() \
285
.map("com.example.DataProcessor") \
286
.as_python_stream() \
287
.sink(print)
288
289
complex_data.as_java_stream() \
290
.filter("com.example.ComplexDataFilter") \
291
.as_python_stream() \
292
.map(lambda x: f"Processed: {x}") \
293
.sink(print)
294
```
295
296
## Advanced Cross-Language Patterns
297
298
### Language-Specific Processing Stages
299
300
Organize processing pipeline by language strengths.
301
302
```python
303
def create_multi_language_pipeline(ctx):
304
"""Create pipeline leveraging each language's strengths"""
305
306
# Stage 1: Python for data ingestion and preprocessing
307
raw_data = ctx.source(custom_data_source) \
308
.map(lambda x: json.loads(x)) \
309
.filter(lambda x: validate_data_quality(x)) \
310
.map(lambda x: normalize_data_format(x))
311
312
# Stage 2: Java for high-performance processing
313
processed_data = raw_data.as_java_stream() \
314
.map("com.example.HighPerformanceProcessor") \
315
.filter("com.example.BusinessRuleValidator") \
316
.map("com.example.DataEnricher")
317
318
# Stage 3: Python for ML and analytics
319
analyzed_data = processed_data.as_python_stream() \
320
.map(lambda x: apply_ml_model(x)) \
321
.key_by(lambda x: x['category']) \
322
.reduce(lambda old, new: aggregate_analytics(old, new))
323
324
# Stage 4: Java for enterprise integration
325
final_output = analyzed_data.as_java_stream() \
326
.map("com.example.enterprise.MessageFormatter") \
327
.sink("com.example.enterprise.KafkaSink")
328
329
return final_output
330
```
331
332
### Error Handling Across Languages
333
334
Handle errors that may occur in cross-language operations.
335
336
```python
337
class RobustCrossLanguageProcessor:
338
def process_with_fallback(self, ctx, data_stream):
339
try:
340
# Try Java processing first
341
result = data_stream.as_java_stream() \
342
.map("com.example.OptimizedProcessor") \
343
.as_python_stream()
344
except Exception as java_error:
345
print(f"Java processing failed: {java_error}")
346
# Fallback to Python processing
347
result = data_stream.map(lambda x: self.python_fallback_processor(x))
348
349
return result.sink(self.error_tolerant_sink)
350
351
def python_fallback_processor(self, data):
352
# Pure Python implementation as fallback
353
return {"processed": True, "data": data, "method": "python_fallback"}
354
355
def error_tolerant_sink(self, data):
356
try:
357
# Attempt to sink data
358
print(f"Output: {data}")
359
except Exception as e:
360
print(f"Sink error: {e}, data: {data}")
361
```
362
363
### Performance Optimization
364
365
Optimize cross-language pipelines for performance.
366
367
```python
368
def optimized_cross_language_pipeline(ctx):
369
"""Performance-optimized cross-language pipeline"""
370
371
# Minimize language switches
372
data = ctx.source(high_volume_source) \
373
.set_parallelism(8) # High parallelism for ingestion
374
375
# Batch Python operations together
376
python_processed = data \
377
.map(preprocess_func) \
378
.filter(quality_check_func) \
379
.map(feature_extraction_func)
380
381
# Single switch to Java for batch operations
382
java_processed = python_processed.as_java_stream() \
383
.map("com.example.BatchProcessor") \
384
.filter("com.example.BatchValidator") \
385
.map("com.example.BatchEnricher")
386
387
# Single switch back to Python for final operations
388
final_result = java_processed.as_python_stream() \
389
.key_by(lambda x: x['partition_key']) \
390
.reduce(efficient_reduce_func) \
391
.sink(optimized_sink_func)
392
393
return final_result
394
```
395
396
## Configuration for Cross-Language Jobs
397
398
Configure Ray Streaming for optimal cross-language performance.
399
400
### Job Configuration
401
402
```python
403
# Cross-language job configuration
404
ctx = StreamingContext.Builder() \
405
.option("streaming.cross-lang.enabled", "true") \
406
.option("streaming.serialization.type", "CROSS_LANG") \
407
.option("streaming.java.classpath", "/path/to/java/classes") \
408
.option("streaming.python.module.path", "/path/to/python/modules") \
409
.build()
410
```
411
412
### Memory and Resource Configuration
413
414
```python
415
# Configure resources for mixed workloads
416
ctx = StreamingContext.Builder() \
417
.option("streaming.worker-num", "6") \
418
.option("streaming.java.worker.memory", "4GB") \
419
.option("streaming.python.worker.memory", "2GB") \
420
.option("streaming.serialization.buffer.size", "8MB") \
421
.build()
422
```
423
424
## Best Practices
425
426
### Cross-Language Development Guidelines
427
428
1. **Minimize Language Switches**: Group operations by language to reduce serialization overhead
429
2. **Use Appropriate Languages**: Leverage each language's strengths (Java for performance, Python for flexibility)
430
3. **Handle Serialization**: Ensure data types are compatible across language boundaries
431
4. **Error Handling**: Implement robust error handling for cross-language failures
432
5. **Testing**: Test both language paths thoroughly
433
6. **Performance Monitoring**: Monitor serialization and conversion overhead
434
435
### Example Best Practice Implementation
436
437
```python
438
class CrossLanguageBestPractices:
439
def __init__(self, ctx):
440
self.ctx = ctx
441
442
def efficient_pipeline(self, source_data):
443
"""Implement best practices for cross-language pipeline"""
444
445
# 1. Group Python operations
446
python_stage = source_data \
447
.map(self.validate_input) \
448
.filter(self.quality_check) \
449
.map(self.extract_features)
450
451
# 2. Single conversion to Java for performance-critical operations
452
java_stage = python_stage.as_java_stream() \
453
.map("com.example.PerformanceCriticalProcessor") \
454
.filter("com.example.HighThroughputFilter") \
455
.map("com.example.OptimizedTransformer")
456
457
# 3. Single conversion back to Python for final processing
458
final_stage = java_stage.as_python_stream() \
459
.map(self.post_process) \
460
.sink(self.reliable_sink)
461
462
return final_stage
463
464
def validate_input(self, data):
465
"""Input validation in Python"""
466
if not isinstance(data, dict) or 'id' not in data:
467
raise ValueError("Invalid input format")
468
return data
469
470
def quality_check(self, data):
471
"""Quality filtering in Python"""
472
return data.get('quality_score', 0) > 0.7
473
474
def extract_features(self, data):
475
"""Feature extraction using Python libraries"""
476
# Use pandas, numpy, etc. for feature engineering
477
return {"features": data, "timestamp": time.time()}
478
479
def post_process(self, data):
480
"""Post-processing in Python"""
481
return f"Final result: {data}"
482
483
def reliable_sink(self, data):
484
"""Error-tolerant sink"""
485
try:
486
print(f"Output: {data}")
487
except Exception as e:
488
print(f"Sink error handled: {e}")
489
```
490
491
## Troubleshooting
492
493
### Common Issues and Solutions
494
495
1. **Serialization Errors**: Ensure data types are serializable across languages
496
2. **ClassPath Issues**: Verify Java classes are in the classpath
497
3. **Module Import Errors**: Check Python module paths are configured correctly
498
4. **Performance Issues**: Minimize language switches and optimize batch sizes
499
5. **Version Compatibility**: Ensure Ray Streaming versions are compatible across languages
500
501
### Debugging Cross-Language Operations
502
503
```python
504
# Enable debugging for cross-language operations
505
ctx = StreamingContext.Builder() \
506
.option("streaming.cross-lang.debug", "true") \
507
.option("streaming.log.level", "DEBUG") \
508
.build()
509
510
# Add logging to track language conversions
511
def debug_conversion(data):
512
print(f"Converting data: {type(data)} -> {data}")
513
return data
514
515
data_stream.map(debug_conversion) \
516
.as_java_stream() \
517
.map("com.example.DebugProcessor") \
518
.as_python_stream() \
519
.map(debug_conversion) \
520
.sink(print)
521
```
522
523
## See Also
524
525
- [Data Streams Documentation](./data-streams.md) - Stream classes and transformations
526
- [Stream Operations Documentation](./stream-operations.md) - Available stream operations
527
- [Streaming Context Documentation](./streaming-context.md) - Job management and configuration
528
- [Source Functions Documentation](./source-functions.md) - Custom data source implementation