Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API
—
This document covers Ray Streaming's cross-language support, enabling mixed Python and Java streaming applications with seamless integration and data exchange.
Ray Streaming provides comprehensive cross-language support that allows:
Ray Streaming provides separate stream classes for each language with conversion capabilities.
Standard Python-based streams with native Python operators.
from ray.streaming.datastream import DataStream
class DataStream:
# Python operators (using Python functions)
def map(self, func) -> DataStream
def flat_map(self, func) -> DataStream
def filter(self, func) -> DataStream
def key_by(self, func) -> KeyDataStream
def reduce(self, func) -> DataStream
def sink(self, func) -> StreamSink
# Convert to Java stream
def as_java_stream(self) -> JavaDataStreamJava-based streams for using Java operators from Python.
from ray.streaming.datastream import JavaDataStream
class JavaDataStream:
# Java operators (using Java class names)
def map(self, java_func_class: str) -> JavaDataStream
def flat_map(self, java_func_class: str) -> JavaDataStream
def filter(self, java_func_class: str) -> JavaDataStream
def key_by(self, java_func_class: str) -> JavaKeyDataStream
def sink(self, java_func_class: str) -> JavaStreamSink
# Convert to Python stream
def as_python_stream(self) -> DataStreamConvert between Python and Java streams to use operators from either language.
from ray.streaming import StreamingContext
ctx = StreamingContext.Builder().build()
# Start with Python stream
python_stream = ctx.from_values("hello", "world", "ray", "streaming") \
.map(lambda x: x.upper())
# Convert to Java stream for Java operators
java_stream = python_stream.as_java_stream() \
.map("io.ray.streaming.examples.WordCapitalizer") \
.filter("io.ray.streaming.examples.LongWordFilter")
# Convert back to Python stream
result_stream = java_stream.as_python_stream() \
.map(lambda x: f"Final: {x}") \
.sink(print)
ctx.submit("cross_language_job")Create processing pipelines that leverage the strengths of both languages.
from ray.streaming import StreamingContext
ctx = StreamingContext.Builder().build()
# Start with Python data ingestion
text_stream = ctx.read_text_file("documents.txt") \
.flat_map(lambda line: line.split('.')) # Split into sentences
# Use Java for NLP processing
processed_stream = text_stream.as_java_stream() \
.map("com.example.nlp.SentimentAnalyzer") \
.filter("com.example.nlp.PositiveSentimentFilter") \
.map("com.example.nlp.EntityExtractor")
# Return to Python for data analysis
analytics_stream = processed_stream.as_python_stream() \
.map(lambda result: parse_java_result(result)) \
.key_by(lambda analysis: analysis['entity_type']) \
.reduce(lambda old, new: combine_analytics(old, new)) \
.sink(lambda stats: save_to_database(stats))
ctx.submit("nlp_analytics_job")# Python for data preprocessing, Java for ML inference
ml_pipeline = ctx.source(sensor_data_source) \
.map(lambda raw: preprocess_sensor_data(raw)) \
.filter(lambda data: data['quality_score'] > 0.8) \
.as_java_stream() \
.map("com.example.ml.TensorFlowPredictor") \
.map("com.example.ml.ModelEnsemble") \
.as_python_stream() \
.map(lambda prediction: post_process_prediction(prediction)) \
.sink(lambda result: send_alert_if_anomaly(result))
ctx.submit("ml_inference_job")Use Java operators from Python by specifying the fully qualified class name.
// Example Java operators that can be used from Python
package com.example.operators;
public class StringReverser implements MapFunction<String, String> {
@Override
public String map(String value) {
return new StringBuilder(value).reverse().toString();
}
}
public class LengthFilter implements FilterFunction<String> {
@Override
public Boolean filter(String value) {
return value.length() > 5;
}
}
public class WordCounter implements ReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> old, Tuple2<String, Integer> new) {
return new Tuple2<>(old.f0, old.f1 + new.f1);
}
}from ray.streaming import StreamingContext
ctx = StreamingContext.Builder().build()
# Use Java operators with fully qualified class names
ctx.from_values("hello", "streaming", "world", "processing") \
.as_java_stream() \
.map("com.example.operators.StringReverser") \
.filter("com.example.operators.LengthFilter") \
.as_python_stream() \
.map(lambda x: f"Processed: {x}") \
.sink(print)
# Mixed keyed operations
ctx.from_values("apple", "banana", "apple", "cherry", "banana") \
.map(lambda word: (word, 1)) \
.as_java_stream() \
.key_by("com.example.operators.TupleKeyExtractor") \
.reduce("com.example.operators.WordCounter") \
.as_python_stream() \
.sink(lambda result: print(f"Count: {result}"))
ctx.submit("java_operators_job")While the primary interface is Python, Java applications can also use Python operators.
// Java code using Python operators
StreamingContext context = StreamingContext.buildContext();
DataStreamSource<String> source = DataStreamSource.fromCollection(
context, Arrays.asList("data1", "data2", "data3"));
source.map(x -> x.toUpperCase())
.asPythonStream()
.map("my_python_module", "custom_transform_function")
.filter("my_python_module", "quality_filter")
.asJavaStream()
.sink(value -> System.out.println("Result: " + value));
context.execute("mixed_java_python_job");# my_python_module.py - Python functions callable from Java
def custom_transform_function(data):
"""Transform data using Python libraries"""
import json
import pandas as pd
# Use Python-specific libraries
parsed = json.loads(data) if isinstance(data, str) else data
df = pd.DataFrame([parsed])
# Perform pandas operations
return df.to_dict('records')[0]
def quality_filter(data):
"""Filter using Python logic"""
return isinstance(data, dict) and data.get('quality', 0) > 0.5Ray Streaming handles automatic serialization between Python and Java components.
# Serialization type constants
from ray.streaming.runtime.serialization import Serializer
class Serializer:
CROSS_LANG_TYPE_ID = 0 # Cross-language serialization
JAVA_TYPE_ID = 1 # Java-specific serialization
PYTHON_TYPE_ID = 2 # Python-specific serializationRay Streaming automatically handles serialization for common data types:
# Data types that work seamlessly across languages
simple_data = ctx.from_values(
42, # int
3.14, # float
"hello", # string
True, # boolean
[1, 2, 3], # list
{"key": "value"}, # dict
("a", "b", "c") # tuple
)
# Complex structured data
complex_data = ctx.from_values({
"user_id": 12345,
"name": "John Doe",
"scores": [95, 87, 92],
"metadata": {
"created": "2024-01-01",
"active": True
}
})
# Both work with cross-language operations
simple_data.as_java_stream() \
.map("com.example.DataProcessor") \
.as_python_stream() \
.sink(print)
complex_data.as_java_stream() \
.filter("com.example.ComplexDataFilter") \
.as_python_stream() \
.map(lambda x: f"Processed: {x}") \
.sink(print)Organize processing pipeline by language strengths.
def create_multi_language_pipeline(ctx):
"""Create pipeline leveraging each language's strengths"""
# Stage 1: Python for data ingestion and preprocessing
raw_data = ctx.source(custom_data_source) \
.map(lambda x: json.loads(x)) \
.filter(lambda x: validate_data_quality(x)) \
.map(lambda x: normalize_data_format(x))
# Stage 2: Java for high-performance processing
processed_data = raw_data.as_java_stream() \
.map("com.example.HighPerformanceProcessor") \
.filter("com.example.BusinessRuleValidator") \
.map("com.example.DataEnricher")
# Stage 3: Python for ML and analytics
analyzed_data = processed_data.as_python_stream() \
.map(lambda x: apply_ml_model(x)) \
.key_by(lambda x: x['category']) \
.reduce(lambda old, new: aggregate_analytics(old, new))
# Stage 4: Java for enterprise integration
final_output = analyzed_data.as_java_stream() \
.map("com.example.enterprise.MessageFormatter") \
.sink("com.example.enterprise.KafkaSink")
return final_outputHandle errors that may occur in cross-language operations.
class RobustCrossLanguageProcessor:
def process_with_fallback(self, ctx, data_stream):
try:
# Try Java processing first
result = data_stream.as_java_stream() \
.map("com.example.OptimizedProcessor") \
.as_python_stream()
except Exception as java_error:
print(f"Java processing failed: {java_error}")
# Fallback to Python processing
result = data_stream.map(lambda x: self.python_fallback_processor(x))
return result.sink(self.error_tolerant_sink)
def python_fallback_processor(self, data):
# Pure Python implementation as fallback
return {"processed": True, "data": data, "method": "python_fallback"}
def error_tolerant_sink(self, data):
try:
# Attempt to sink data
print(f"Output: {data}")
except Exception as e:
print(f"Sink error: {e}, data: {data}")Optimize cross-language pipelines for performance.
def optimized_cross_language_pipeline(ctx):
"""Performance-optimized cross-language pipeline"""
# Minimize language switches
data = ctx.source(high_volume_source) \
.set_parallelism(8) # High parallelism for ingestion
# Batch Python operations together
python_processed = data \
.map(preprocess_func) \
.filter(quality_check_func) \
.map(feature_extraction_func)
# Single switch to Java for batch operations
java_processed = python_processed.as_java_stream() \
.map("com.example.BatchProcessor") \
.filter("com.example.BatchValidator") \
.map("com.example.BatchEnricher")
# Single switch back to Python for final operations
final_result = java_processed.as_python_stream() \
.key_by(lambda x: x['partition_key']) \
.reduce(efficient_reduce_func) \
.sink(optimized_sink_func)
return final_resultConfigure Ray Streaming for optimal cross-language performance.
# Cross-language job configuration
ctx = StreamingContext.Builder() \
.option("streaming.cross-lang.enabled", "true") \
.option("streaming.serialization.type", "CROSS_LANG") \
.option("streaming.java.classpath", "/path/to/java/classes") \
.option("streaming.python.module.path", "/path/to/python/modules") \
.build()# Configure resources for mixed workloads
ctx = StreamingContext.Builder() \
.option("streaming.worker-num", "6") \
.option("streaming.java.worker.memory", "4GB") \
.option("streaming.python.worker.memory", "2GB") \
.option("streaming.serialization.buffer.size", "8MB") \
.build()class CrossLanguageBestPractices:
def __init__(self, ctx):
self.ctx = ctx
def efficient_pipeline(self, source_data):
"""Implement best practices for cross-language pipeline"""
# 1. Group Python operations
python_stage = source_data \
.map(self.validate_input) \
.filter(self.quality_check) \
.map(self.extract_features)
# 2. Single conversion to Java for performance-critical operations
java_stage = python_stage.as_java_stream() \
.map("com.example.PerformanceCriticalProcessor") \
.filter("com.example.HighThroughputFilter") \
.map("com.example.OptimizedTransformer")
# 3. Single conversion back to Python for final processing
final_stage = java_stage.as_python_stream() \
.map(self.post_process) \
.sink(self.reliable_sink)
return final_stage
def validate_input(self, data):
"""Input validation in Python"""
if not isinstance(data, dict) or 'id' not in data:
raise ValueError("Invalid input format")
return data
def quality_check(self, data):
"""Quality filtering in Python"""
return data.get('quality_score', 0) > 0.7
def extract_features(self, data):
"""Feature extraction using Python libraries"""
# Use pandas, numpy, etc. for feature engineering
return {"features": data, "timestamp": time.time()}
def post_process(self, data):
"""Post-processing in Python"""
return f"Final result: {data}"
def reliable_sink(self, data):
"""Error-tolerant sink"""
try:
print(f"Output: {data}")
except Exception as e:
print(f"Sink error handled: {e}")# Enable debugging for cross-language operations
ctx = StreamingContext.Builder() \
.option("streaming.cross-lang.debug", "true") \
.option("streaming.log.level", "DEBUG") \
.build()
# Add logging to track language conversions
def debug_conversion(data):
print(f"Converting data: {type(data)} -> {data}")
return data
data_stream.map(debug_conversion) \
.as_java_stream() \
.map("com.example.DebugProcessor") \
.as_python_stream() \
.map(debug_conversion) \
.sink(print)Install with Tessl CLI
npx tessl i tessl/pypi-ray-streaming