Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API
npx @tessl/cli install tessl/pypi-ray-streaming@1.10.0A distributed streaming processing framework built on Ray that provides fault-tolerant, scalable stream processing with Python API and cross-language support.
pip install rayRay Streaming provides a distributed stream processing framework built on Ray's actor model. It enables fault-tolerant stream processing with automatic checkpointing, resource management, and cross-language integration between Python and Java workloads.
Key Features:
from ray.streaming import StreamingContext
from ray.streaming.datastream import DataStream, StreamSource
from ray.streaming.function import SourceFunction, CollectionSourceFunctionimport ray
from ray.streaming import StreamingContext
# Initialize Ray and create streaming context
ray.init()
ctx = StreamingContext.Builder().build()
# Create a simple streaming pipeline
ctx.read_text_file("input.txt") \
.set_parallelism(1) \
.flat_map(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.key_by(lambda x: x[0]) \
.reduce(lambda old_value, new_value:
(old_value[0], old_value[1] + new_value[1])) \
.filter(lambda x: "ray" not in x) \
.sink(lambda x: print("result", x))
# Submit the job
ctx.submit("word_count_job")from ray.streaming import StreamingContext
ctx = StreamingContext.Builder().build()
# Create stream from values
ctx.from_values("a", "b", "c") \
.map(lambda x: x.upper()) \
.sink(lambda x: print(x))
# Create stream from collection
data = [1, 2, 3, 4, 5]
ctx.from_collection(data) \
.filter(lambda x: x % 2 == 0) \
.map(lambda x: x * 2) \
.sink(lambda x: print(f"Even doubled: {x}"))
ctx.submit("collection_processing")Ray Streaming implements a master-worker architecture using Ray actors:
Ray Streaming provides automatic fault tolerance through:
Main entry point for Ray streaming functionality with job lifecycle management.
class StreamingContext:
class Builder:
def option(self, key=None, value=None, conf=None) -> Builder
def build(self) -> StreamingContext
def source(self, source_func: SourceFunction) -> StreamSource
def from_values(self, *values) -> StreamSource
def from_collection(self, values) -> StreamSource
def read_text_file(self, filename: str) -> StreamSource
def submit(self, job_name: str) -> None→ Streaming Context Documentation
Stream transformation operations and data flow management.
class DataStream:
def map(self, func) -> DataStream
def flat_map(self, func) -> DataStream
def filter(self, func) -> DataStream
def key_by(self, func) -> KeyDataStream
def union(self, *others) -> DataStream
def sink(self, func) -> DataStreamSink
def set_parallelism(self, parallelism: int) -> DataStreamData source implementations and custom source function interfaces.
class SourceFunction:
def init(self, parallel_id: int, num_parallel: int) -> None
def fetch(self, collector) -> None
class CollectionSourceFunction(SourceFunction):
def __init__(self, values)
class LocalFileSourceFunction(SourceFunction):
def __init__(self, filename: str)→ Source Functions Documentation
Core stream transformation operators and windowing functions.
# Transformation Operations
DataStream.map(func) # One-to-one transformation
DataStream.flat_map(func) # One-to-many transformation
DataStream.filter(func) # Element filtering
DataStream.union(*others) # Stream union
# Keyed Operations
KeyDataStream.reduce(func) # Stateful reduction
KeyDataStream.window(window_func) # Windowing operations→ Stream Operations Documentation
Support for mixed Python/Java streaming applications.
# Convert to Java stream for Java operators
DataStream.as_java_stream()
# Use Java operators with class names
java_stream.map("com.example.MyMapper")
java_stream.filter("com.example.MyFilter")
# Convert back to Python stream
java_stream.as_python_stream()→ Cross-Language Support Documentation
Create custom data sources by implementing the SourceFunction interface:
from ray.streaming.function import SourceFunction
class CustomSourceFunction(SourceFunction):
def init(self, parallel_id, num_parallel):
self.counter = 0
self.max_count = 1000
def fetch(self, collector):
if self.counter < self.max_count:
collector.collect(f"message-{self.counter}")
self.counter += 1
else:
# Signal end of stream
collector.close()Configure streaming jobs with various options:
ctx = StreamingContext.Builder() \
.option("streaming.worker-num", "4") \
.option("streaming.context.backend.type", "MEMORY") \
.option("streaming.checkpoint.interval", "5000") \
.build()Ray Streaming integrates with Ray's monitoring and error handling:
pip install rayray.init()ctx = StreamingContext.Builder().build()ctx.submit("job_name")For detailed examples and API references, see the individual capability documentation pages linked above.