CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ray-streaming

Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API

Pending
Overview
Eval results
Files

data-streams.mddocs/

Data Streams and Transformations

This document covers the data stream classes and transformation operations in Ray Streaming, including DataStream, KeyDataStream, and their Java counterparts for cross-language integration.

Overview

Ray Streaming provides a fluent API for stream transformations through several stream classes:

  • DataStream: Main stream class for Python-based transformations
  • KeyDataStream: Keyed stream for stateful operations like reduce
  • StreamSource: Entry point streams from data sources
  • UnionStream: Result of union operations between multiple streams
  • JavaDataStream: Java-based stream for cross-language operations

Stream Base Class

All stream classes inherit from the abstract Stream base class.

Core API

from ray.streaming.datastream import Stream

class Stream:
    def get_parallelism(self) -> int
    def set_parallelism(self, parallelism: int) -> Stream
    def get_id(self) -> str
    def with_config(self, key=None, value=None, conf=None) -> Stream
    def get_config(self) -> dict
    def forward(self) -> Stream
    def disable_chain(self) -> Stream
    def get_input_stream(self) -> Stream
    def get_streaming_context(self) -> StreamingContext

DataStream

The main stream class for Python-based stream processing and transformations.

Core API

from ray.streaming.datastream import DataStream

class DataStream(Stream):
    # Element-wise transformations
    def map(self, func) -> DataStream
    def flat_map(self, func) -> DataStream
    def filter(self, func) -> DataStream
    
    # Stream composition
    def union(self, *streams) -> UnionStream
    
    # Partitioning and keying
    def key_by(self, func) -> KeyDataStream
    def broadcast(self) -> DataStream
    def partition_by(self, partition_func) -> DataStream
    
    # Output operations
    def sink(self, func) -> StreamSink
    
    # Cross-language integration
    def as_java_stream(self) -> JavaDataStream

Capabilities

Element-wise Transformations

Apply functions to individual elements in the stream.

# Map transformation - one-to-one element transformation
def map(self, func) -> DataStream:
    """
    Transform each element using the provided function.
    
    Args:
        func: Function or MapFunction instance for transformation
        
    Returns:
        New DataStream with transformed elements
    """

# FlatMap transformation - one-to-many element transformation  
def flat_map(self, func) -> DataStream:
    """
    Transform each element into zero or more output elements.
    
    Args:
        func: Function or FlatMapFunction that returns iterable
        
    Returns:
        New DataStream with flattened results
    """

# Filter transformation - element filtering
def filter(self, func) -> DataStream:
    """
    Keep only elements that satisfy the predicate.
    
    Args:
        func: Function or FilterFunction returning boolean
        
    Returns:
        New DataStream with filtered elements
    """

Usage Examples

from ray.streaming import StreamingContext

ctx = StreamingContext.Builder().build()

# Map transformation
ctx.from_collection([1, 2, 3, 4, 5]) \
    .map(lambda x: x * 2) \
    .sink(lambda x: print(f"Doubled: {x}"))

# FlatMap transformation
ctx.from_values("hello world", "ray streaming") \
    .flat_map(lambda line: line.split()) \
    .map(lambda word: word.upper()) \
    .sink(lambda x: print(f"Word: {x}"))

# Filter transformation
ctx.from_collection(range(10)) \
    .filter(lambda x: x % 2 == 0) \
    .map(lambda x: x ** 2) \
    .sink(lambda x: print(f"Even square: {x}"))

Stream Composition

Combine multiple streams into unified processing pipelines.

def union(self, *streams) -> UnionStream:
    """
    Merge multiple streams of the same type.
    
    Args:
        *streams: DataStreams to union with this stream
        
    Returns:
        UnionStream containing elements from all input streams
    """

Usage Example

# Union multiple streams
stream1 = ctx.from_values(1, 2, 3)
stream2 = ctx.from_values(4, 5, 6) 
stream3 = ctx.from_values(7, 8, 9)

unified = stream1.union(stream2, stream3) \
    .map(lambda x: x * 10) \
    .sink(lambda x: print(f"Unified: {x}"))

Partitioning and Keying

Control data distribution and enable stateful operations.

def key_by(self, func) -> KeyDataStream:
    """
    Partition stream by key for stateful operations.
    
    Args:
        func: Function or KeyFunction to extract key from elements
        
    Returns:
        KeyDataStream partitioned by the key function
    """

def broadcast(self) -> DataStream:
    """
    Broadcast all elements to every parallel instance.
    
    Returns:
        DataStream with broadcast partitioning
    """

def partition_by(self, partition_func) -> DataStream:
    """
    Partition stream using custom partitioning function.
    
    Args:
        partition_func: Function or Partition instance for custom partitioning
        
    Returns:
        DataStream with custom partitioning
    """

Usage Examples

# Key-based processing
ctx.from_values(("a", 1), ("b", 2), ("a", 3), ("b", 4)) \
    .key_by(lambda pair: pair[0]) \
    .reduce(lambda old, new: (old[0], old[1] + new[1])) \
    .sink(lambda x: print(f"Sum for {x[0]}: {x[1]}"))

# Broadcast partitioning
ctx.from_values("broadcast", "message") \
    .broadcast() \
    .map(lambda x: x.upper()) \
    .sink(lambda x: print(f"Broadcast: {x}"))

# Custom partitioning
def custom_partition(element):
    return hash(element) % 4

ctx.from_collection(range(20)) \
    .partition_by(custom_partition) \
    .map(lambda x: f"Partition-{x}") \
    .sink(print)

KeyDataStream

Specialized stream for keyed operations that maintain state per key.

Core API

from ray.streaming.datastream import KeyDataStream

class KeyDataStream(DataStream):
    def reduce(self, func) -> DataStream
    def as_java_stream(self) -> JavaKeyDataStream

Stateful Operations

def reduce(self, func) -> DataStream:
    """
    Apply reduce function to elements with the same key.
    
    Args:
        func: Function or ReduceFunction for combining values
        
    Returns:
        DataStream with reduced values per key
    """

Usage Examples

# Word count with reduce
ctx.read_text_file("document.txt") \
    .flat_map(lambda line: line.split()) \
    .map(lambda word: (word.lower(), 1)) \
    .key_by(lambda pair: pair[0]) \
    .reduce(lambda old, new: (old[0], old[1] + new[1])) \
    .sink(lambda result: print(f"{result[0]}: {result[1]}"))

# Sum by category
data = [("fruits", 10), ("vegetables", 5), ("fruits", 7), ("vegetables", 3)]
ctx.from_collection(data) \
    .key_by(lambda item: item[0]) \
    .reduce(lambda old, new: (old[0], old[1] + new[1])) \
    .sink(lambda result: print(f"Total {result[0]}: {result[1]}"))

StreamSource

Entry point streams created from data sources.

Core API

from ray.streaming.datastream import StreamSource

class StreamSource(DataStream):
    @staticmethod
    def build_source(streaming_context, func) -> StreamSource

StreamSource inherits all DataStream transformation methods and serves as the starting point for stream processing pipelines.

Usage Examples

from ray.streaming.function import SourceFunction

class NumberSource(SourceFunction):
    def init(self, parallel_id, num_parallel):
        self.current = parallel_id
        self.step = num_parallel
        self.max_num = 100
    
    def fetch(self, collector):
        while self.current < self.max_num:
            collector.collect(self.current)
            self.current += self.step

# Create stream from custom source
source_stream = StreamSource.build_source(ctx, NumberSource())
source_stream.map(lambda x: x * 2) \
    .filter(lambda x: x > 10) \
    .sink(print)

Cross-Language Integration

Ray Streaming supports mixed Python/Java operations through stream conversion.

JavaDataStream

Java-based stream for cross-language operations.

from ray.streaming.datastream import JavaDataStream

class JavaDataStream(Stream):
    # Java operator methods (require Java class names)
    def map(self, java_func_class) -> JavaDataStream
    def flat_map(self, java_func_class) -> JavaDataStream
    def filter(self, java_func_class) -> JavaDataStream
    def union(self, *streams) -> JavaUnionStream
    def key_by(self, java_func_class) -> JavaKeyDataStream
    def sink(self, java_func_class) -> JavaStreamSink
    
    # Convert back to Python stream
    def as_python_stream(self) -> DataStream

Usage Examples

# Mixed Python/Java processing
python_stream = ctx.from_values("hello", "world", "ray") \
    .map(lambda x: x.upper())

# Convert to Java for Java operators
java_stream = python_stream.as_java_stream() \
    .map("com.example.JavaMapperFunction") \
    .filter("com.example.JavaFilterFunction")

# Convert back to Python
result_stream = java_stream.as_python_stream() \
    .map(lambda x: f"Processed: {x}") \
    .sink(print)

Stream Configuration

Configure stream behavior using the configuration system.

Stream-Level Configuration

# Configure individual streams
stream = ctx.from_collection(range(100)) \
    .set_parallelism(4) \
    .with_config("streaming.buffer.size", "1024") \
    .with_config("streaming.timeout", "5000")

# Multiple configuration options
config = {
    "streaming.checkpoint.interval": "10000",
    "streaming.queue.capacity": "500"
}
configured_stream = stream.with_config(conf=config)

Performance Optimization

# Optimize for throughput
high_throughput_stream = ctx.from_collection(large_dataset) \
    .set_parallelism(8) \
    .with_config("streaming.buffer.size", "4096") \
    .disable_chain()  # Prevent operator chaining

# Optimize for latency
low_latency_stream = ctx.from_collection(data) \
    .set_parallelism(1) \
    .forward()  # Local forwarding
    .with_config("streaming.timeout", "100")

Advanced Patterns

Pipeline Branching

# Split stream into multiple processing branches
source = ctx.from_collection(range(100))

# Branch 1: Even numbers
evens = source.filter(lambda x: x % 2 == 0) \
    .map(lambda x: f"Even: {x}")

# Branch 2: Odd numbers  
odds = source.filter(lambda x: x % 2 == 1) \
    .map(lambda x: f"Odd: {x}")

# Merge branches
evens.union(odds).sink(print)

Stateful Processing

# Maintain running totals per key
ctx.from_values(("A", 10), ("B", 5), ("A", 15), ("B", 20)) \
    .key_by(lambda pair: pair[0]) \
    .reduce(lambda running_total, new_value: 
            (running_total[0], running_total[1] + new_value[1])) \
    .sink(lambda result: print(f"Running total for {result[0]}: {result[1]}"))

Error Handling

Handle errors in stream processing operations.

def safe_transform(x):
    try:
        return int(x) * 2
    except ValueError:
        return 0  # Default value for invalid input

ctx.from_values("1", "2", "invalid", "4") \
    .map(safe_transform) \
    .filter(lambda x: x > 0) \
    .sink(lambda x: print(f"Valid result: {x}"))

See Also

  • Streaming Context Documentation - StreamingContext and job management
  • Source Functions Documentation - Custom data source implementation
  • Stream Operations Documentation - Detailed transformation operations
  • Cross-Language Support Documentation - Python/Java integration details

Install with Tessl CLI

npx tessl i tessl/pypi-ray-streaming

docs

cross-language.md

data-streams.md

index.md

source-functions.md

stream-operations.md

streaming-context.md

tile.json