CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ray-streaming

Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API

Pending
Overview
Eval results
Files

cross-language.mddocs/

Cross-Language Integration

This document covers Ray Streaming's cross-language support, enabling mixed Python and Java streaming applications with seamless integration and data exchange.

Overview

Ray Streaming provides comprehensive cross-language support that allows:

  • Mixed Pipelines: Combine Python and Java operators in the same streaming job
  • Stream Conversion: Convert between Python and Java streams seamlessly
  • Cross-Language Serialization: Automatic data serialization between languages
  • Operator Interoperability: Use Java operators from Python and vice versa
  • Unified Execution: Single runtime handles both Python and Java components

Language Stream Types

Ray Streaming provides separate stream classes for each language with conversion capabilities.

Python Streams

Standard Python-based streams with native Python operators.

from ray.streaming.datastream import DataStream

class DataStream:
    # Python operators (using Python functions)
    def map(self, func) -> DataStream
    def flat_map(self, func) -> DataStream
    def filter(self, func) -> DataStream
    def key_by(self, func) -> KeyDataStream
    def reduce(self, func) -> DataStream
    def sink(self, func) -> StreamSink
    
    # Convert to Java stream
    def as_java_stream(self) -> JavaDataStream

Java Streams

Java-based streams for using Java operators from Python.

from ray.streaming.datastream import JavaDataStream

class JavaDataStream:
    # Java operators (using Java class names)
    def map(self, java_func_class: str) -> JavaDataStream
    def flat_map(self, java_func_class: str) -> JavaDataStream
    def filter(self, java_func_class: str) -> JavaDataStream
    def key_by(self, java_func_class: str) -> JavaKeyDataStream
    def sink(self, java_func_class: str) -> JavaStreamSink
    
    # Convert to Python stream
    def as_python_stream(self) -> DataStream

Capabilities

Stream Conversion

Convert between Python and Java streams to use operators from either language.

from ray.streaming import StreamingContext

ctx = StreamingContext.Builder().build()

# Start with Python stream
python_stream = ctx.from_values("hello", "world", "ray", "streaming") \
    .map(lambda x: x.upper())

# Convert to Java stream for Java operators
java_stream = python_stream.as_java_stream() \
    .map("io.ray.streaming.examples.WordCapitalizer") \
    .filter("io.ray.streaming.examples.LongWordFilter")

# Convert back to Python stream
result_stream = java_stream.as_python_stream() \
    .map(lambda x: f"Final: {x}") \
    .sink(print)

ctx.submit("cross_language_job")

Mixed Processing Pipelines

Create processing pipelines that leverage the strengths of both languages.

Example: Text Processing with Java NLP and Python Analytics

from ray.streaming import StreamingContext

ctx = StreamingContext.Builder().build()

# Start with Python data ingestion
text_stream = ctx.read_text_file("documents.txt") \
    .flat_map(lambda line: line.split('.'))  # Split into sentences

# Use Java for NLP processing
processed_stream = text_stream.as_java_stream() \
    .map("com.example.nlp.SentimentAnalyzer") \
    .filter("com.example.nlp.PositiveSentimentFilter") \
    .map("com.example.nlp.EntityExtractor")

# Return to Python for data analysis
analytics_stream = processed_stream.as_python_stream() \
    .map(lambda result: parse_java_result(result)) \
    .key_by(lambda analysis: analysis['entity_type']) \
    .reduce(lambda old, new: combine_analytics(old, new)) \
    .sink(lambda stats: save_to_database(stats))

ctx.submit("nlp_analytics_job")

Example: Real-time ML Pipeline

# Python for data preprocessing, Java for ML inference
ml_pipeline = ctx.source(sensor_data_source) \
    .map(lambda raw: preprocess_sensor_data(raw)) \
    .filter(lambda data: data['quality_score'] > 0.8) \
    .as_java_stream() \
    .map("com.example.ml.TensorFlowPredictor") \
    .map("com.example.ml.ModelEnsemble") \
    .as_python_stream() \
    .map(lambda prediction: post_process_prediction(prediction)) \
    .sink(lambda result: send_alert_if_anomaly(result))

ctx.submit("ml_inference_job")

Java Operator Integration

Use Java operators from Python by specifying the fully qualified class name.

Java Operator Classes

// Example Java operators that can be used from Python
package com.example.operators;

public class StringReverser implements MapFunction<String, String> {
    @Override
    public String map(String value) {
        return new StringBuilder(value).reverse().toString();
    }
}

public class LengthFilter implements FilterFunction<String> {
    @Override
    public Boolean filter(String value) {
        return value.length() > 5;
    }
}

public class WordCounter implements ReduceFunction<Tuple2<String, Integer>> {
    @Override
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> old, Tuple2<String, Integer> new) {
        return new Tuple2<>(old.f0, old.f1 + new.f1);
    }
}

Using Java Operators from Python

from ray.streaming import StreamingContext

ctx = StreamingContext.Builder().build()

# Use Java operators with fully qualified class names
ctx.from_values("hello", "streaming", "world", "processing") \
    .as_java_stream() \
    .map("com.example.operators.StringReverser") \
    .filter("com.example.operators.LengthFilter") \
    .as_python_stream() \
    .map(lambda x: f"Processed: {x}") \
    .sink(print)

# Mixed keyed operations
ctx.from_values("apple", "banana", "apple", "cherry", "banana") \
    .map(lambda word: (word, 1)) \
    .as_java_stream() \
    .key_by("com.example.operators.TupleKeyExtractor") \
    .reduce("com.example.operators.WordCounter") \
    .as_python_stream() \
    .sink(lambda result: print(f"Count: {result}"))

ctx.submit("java_operators_job")

Python Operator Integration from Java

While the primary interface is Python, Java applications can also use Python operators.

Example Java Code Using Python Operators

// Java code using Python operators
StreamingContext context = StreamingContext.buildContext();
DataStreamSource<String> source = DataStreamSource.fromCollection(
    context, Arrays.asList("data1", "data2", "data3"));

source.map(x -> x.toUpperCase())
      .asPythonStream()
      .map("my_python_module", "custom_transform_function")  
      .filter("my_python_module", "quality_filter")
      .asJavaStream()
      .sink(value -> System.out.println("Result: " + value));

context.execute("mixed_java_python_job");

Python Module for Java Integration

# my_python_module.py - Python functions callable from Java

def custom_transform_function(data):
    """Transform data using Python libraries"""
    import json
    import pandas as pd
    
    # Use Python-specific libraries
    parsed = json.loads(data) if isinstance(data, str) else data
    df = pd.DataFrame([parsed])
    # Perform pandas operations
    return df.to_dict('records')[0]

def quality_filter(data):
    """Filter using Python logic"""
    return isinstance(data, dict) and data.get('quality', 0) > 0.5

Data Serialization

Ray Streaming handles automatic serialization between Python and Java components.

Serialization Types

# Serialization type constants
from ray.streaming.runtime.serialization import Serializer

class Serializer:
    CROSS_LANG_TYPE_ID = 0    # Cross-language serialization
    JAVA_TYPE_ID = 1          # Java-specific serialization  
    PYTHON_TYPE_ID = 2        # Python-specific serialization

Supported Data Types

Ray Streaming automatically handles serialization for common data types:

  • Primitives: int, float, string, boolean
  • Collections: list, dict, tuple
  • Custom Objects: Objects implementing serialization interfaces
  • Complex Data: JSON-serializable structures
# Data types that work seamlessly across languages
simple_data = ctx.from_values(
    42,                           # int
    3.14,                        # float  
    "hello",                     # string
    True,                        # boolean
    [1, 2, 3],                  # list
    {"key": "value"},           # dict
    ("a", "b", "c")            # tuple
)

# Complex structured data
complex_data = ctx.from_values({
    "user_id": 12345,
    "name": "John Doe", 
    "scores": [95, 87, 92],
    "metadata": {
        "created": "2024-01-01",
        "active": True
    }
})

# Both work with cross-language operations
simple_data.as_java_stream() \
    .map("com.example.DataProcessor") \
    .as_python_stream() \
    .sink(print)

complex_data.as_java_stream() \
    .filter("com.example.ComplexDataFilter") \
    .as_python_stream() \
    .map(lambda x: f"Processed: {x}") \
    .sink(print)

Advanced Cross-Language Patterns

Language-Specific Processing Stages

Organize processing pipeline by language strengths.

def create_multi_language_pipeline(ctx):
    """Create pipeline leveraging each language's strengths"""
    
    # Stage 1: Python for data ingestion and preprocessing
    raw_data = ctx.source(custom_data_source) \
        .map(lambda x: json.loads(x)) \
        .filter(lambda x: validate_data_quality(x)) \
        .map(lambda x: normalize_data_format(x))
    
    # Stage 2: Java for high-performance processing
    processed_data = raw_data.as_java_stream() \
        .map("com.example.HighPerformanceProcessor") \
        .filter("com.example.BusinessRuleValidator") \
        .map("com.example.DataEnricher")
    
    # Stage 3: Python for ML and analytics
    analyzed_data = processed_data.as_python_stream() \
        .map(lambda x: apply_ml_model(x)) \
        .key_by(lambda x: x['category']) \
        .reduce(lambda old, new: aggregate_analytics(old, new))
    
    # Stage 4: Java for enterprise integration
    final_output = analyzed_data.as_java_stream() \
        .map("com.example.enterprise.MessageFormatter") \
        .sink("com.example.enterprise.KafkaSink")
    
    return final_output

Error Handling Across Languages

Handle errors that may occur in cross-language operations.

class RobustCrossLanguageProcessor:
    def process_with_fallback(self, ctx, data_stream):
        try:
            # Try Java processing first
            result = data_stream.as_java_stream() \
                .map("com.example.OptimizedProcessor") \
                .as_python_stream()
        except Exception as java_error:
            print(f"Java processing failed: {java_error}")
            # Fallback to Python processing
            result = data_stream.map(lambda x: self.python_fallback_processor(x))
        
        return result.sink(self.error_tolerant_sink)
    
    def python_fallback_processor(self, data):
        # Pure Python implementation as fallback
        return {"processed": True, "data": data, "method": "python_fallback"}
    
    def error_tolerant_sink(self, data):
        try:
            # Attempt to sink data
            print(f"Output: {data}")
        except Exception as e:
            print(f"Sink error: {e}, data: {data}")

Performance Optimization

Optimize cross-language pipelines for performance.

def optimized_cross_language_pipeline(ctx):
    """Performance-optimized cross-language pipeline"""
    
    # Minimize language switches
    data = ctx.source(high_volume_source) \
        .set_parallelism(8)  # High parallelism for ingestion
    
    # Batch Python operations together
    python_processed = data \
        .map(preprocess_func) \
        .filter(quality_check_func) \
        .map(feature_extraction_func)
    
    # Single switch to Java for batch operations
    java_processed = python_processed.as_java_stream() \
        .map("com.example.BatchProcessor") \
        .filter("com.example.BatchValidator") \
        .map("com.example.BatchEnricher")
    
    # Single switch back to Python for final operations
    final_result = java_processed.as_python_stream() \
        .key_by(lambda x: x['partition_key']) \
        .reduce(efficient_reduce_func) \
        .sink(optimized_sink_func)
    
    return final_result

Configuration for Cross-Language Jobs

Configure Ray Streaming for optimal cross-language performance.

Job Configuration

# Cross-language job configuration
ctx = StreamingContext.Builder() \
    .option("streaming.cross-lang.enabled", "true") \
    .option("streaming.serialization.type", "CROSS_LANG") \
    .option("streaming.java.classpath", "/path/to/java/classes") \
    .option("streaming.python.module.path", "/path/to/python/modules") \
    .build()

Memory and Resource Configuration

# Configure resources for mixed workloads
ctx = StreamingContext.Builder() \
    .option("streaming.worker-num", "6") \
    .option("streaming.java.worker.memory", "4GB") \
    .option("streaming.python.worker.memory", "2GB") \
    .option("streaming.serialization.buffer.size", "8MB") \
    .build()

Best Practices

Cross-Language Development Guidelines

  1. Minimize Language Switches: Group operations by language to reduce serialization overhead
  2. Use Appropriate Languages: Leverage each language's strengths (Java for performance, Python for flexibility)
  3. Handle Serialization: Ensure data types are compatible across language boundaries
  4. Error Handling: Implement robust error handling for cross-language failures
  5. Testing: Test both language paths thoroughly
  6. Performance Monitoring: Monitor serialization and conversion overhead

Example Best Practice Implementation

class CrossLanguageBestPractices:
    def __init__(self, ctx):
        self.ctx = ctx
        
    def efficient_pipeline(self, source_data):
        """Implement best practices for cross-language pipeline"""
        
        # 1. Group Python operations
        python_stage = source_data \
            .map(self.validate_input) \
            .filter(self.quality_check) \
            .map(self.extract_features)
        
        # 2. Single conversion to Java for performance-critical operations
        java_stage = python_stage.as_java_stream() \
            .map("com.example.PerformanceCriticalProcessor") \
            .filter("com.example.HighThroughputFilter") \
            .map("com.example.OptimizedTransformer")
        
        # 3. Single conversion back to Python for final processing
        final_stage = java_stage.as_python_stream() \
            .map(self.post_process) \
            .sink(self.reliable_sink)
        
        return final_stage
    
    def validate_input(self, data):
        """Input validation in Python"""
        if not isinstance(data, dict) or 'id' not in data:
            raise ValueError("Invalid input format")
        return data
    
    def quality_check(self, data):
        """Quality filtering in Python"""  
        return data.get('quality_score', 0) > 0.7
    
    def extract_features(self, data):
        """Feature extraction using Python libraries"""
        # Use pandas, numpy, etc. for feature engineering
        return {"features": data, "timestamp": time.time()}
    
    def post_process(self, data):
        """Post-processing in Python"""
        return f"Final result: {data}"
    
    def reliable_sink(self, data):
        """Error-tolerant sink"""
        try:
            print(f"Output: {data}")
        except Exception as e:
            print(f"Sink error handled: {e}")

Troubleshooting

Common Issues and Solutions

  1. Serialization Errors: Ensure data types are serializable across languages
  2. ClassPath Issues: Verify Java classes are in the classpath
  3. Module Import Errors: Check Python module paths are configured correctly
  4. Performance Issues: Minimize language switches and optimize batch sizes
  5. Version Compatibility: Ensure Ray Streaming versions are compatible across languages

Debugging Cross-Language Operations

# Enable debugging for cross-language operations
ctx = StreamingContext.Builder() \
    .option("streaming.cross-lang.debug", "true") \
    .option("streaming.log.level", "DEBUG") \
    .build()

# Add logging to track language conversions
def debug_conversion(data):
    print(f"Converting data: {type(data)} -> {data}")
    return data

data_stream.map(debug_conversion) \
    .as_java_stream() \
    .map("com.example.DebugProcessor") \
    .as_python_stream() \
    .map(debug_conversion) \
    .sink(print)

See Also

  • Data Streams Documentation - Stream classes and transformations
  • Stream Operations Documentation - Available stream operations
  • Streaming Context Documentation - Job management and configuration
  • Source Functions Documentation - Custom data source implementation

Install with Tessl CLI

npx tessl i tessl/pypi-ray-streaming

docs

cross-language.md

data-streams.md

index.md

source-functions.md

stream-operations.md

streaming-context.md

tile.json