Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API
—
This document covers the data stream classes and transformation operations in Ray Streaming, including DataStream, KeyDataStream, and their Java counterparts for cross-language integration.
Ray Streaming provides a fluent API for stream transformations through several stream classes:
All stream classes inherit from the abstract Stream base class.
from ray.streaming.datastream import Stream
class Stream:
def get_parallelism(self) -> int
def set_parallelism(self, parallelism: int) -> Stream
def get_id(self) -> str
def with_config(self, key=None, value=None, conf=None) -> Stream
def get_config(self) -> dict
def forward(self) -> Stream
def disable_chain(self) -> Stream
def get_input_stream(self) -> Stream
def get_streaming_context(self) -> StreamingContextThe main stream class for Python-based stream processing and transformations.
from ray.streaming.datastream import DataStream
class DataStream(Stream):
# Element-wise transformations
def map(self, func) -> DataStream
def flat_map(self, func) -> DataStream
def filter(self, func) -> DataStream
# Stream composition
def union(self, *streams) -> UnionStream
# Partitioning and keying
def key_by(self, func) -> KeyDataStream
def broadcast(self) -> DataStream
def partition_by(self, partition_func) -> DataStream
# Output operations
def sink(self, func) -> StreamSink
# Cross-language integration
def as_java_stream(self) -> JavaDataStreamApply functions to individual elements in the stream.
# Map transformation - one-to-one element transformation
def map(self, func) -> DataStream:
"""
Transform each element using the provided function.
Args:
func: Function or MapFunction instance for transformation
Returns:
New DataStream with transformed elements
"""
# FlatMap transformation - one-to-many element transformation
def flat_map(self, func) -> DataStream:
"""
Transform each element into zero or more output elements.
Args:
func: Function or FlatMapFunction that returns iterable
Returns:
New DataStream with flattened results
"""
# Filter transformation - element filtering
def filter(self, func) -> DataStream:
"""
Keep only elements that satisfy the predicate.
Args:
func: Function or FilterFunction returning boolean
Returns:
New DataStream with filtered elements
"""from ray.streaming import StreamingContext
ctx = StreamingContext.Builder().build()
# Map transformation
ctx.from_collection([1, 2, 3, 4, 5]) \
.map(lambda x: x * 2) \
.sink(lambda x: print(f"Doubled: {x}"))
# FlatMap transformation
ctx.from_values("hello world", "ray streaming") \
.flat_map(lambda line: line.split()) \
.map(lambda word: word.upper()) \
.sink(lambda x: print(f"Word: {x}"))
# Filter transformation
ctx.from_collection(range(10)) \
.filter(lambda x: x % 2 == 0) \
.map(lambda x: x ** 2) \
.sink(lambda x: print(f"Even square: {x}"))Combine multiple streams into unified processing pipelines.
def union(self, *streams) -> UnionStream:
"""
Merge multiple streams of the same type.
Args:
*streams: DataStreams to union with this stream
Returns:
UnionStream containing elements from all input streams
"""# Union multiple streams
stream1 = ctx.from_values(1, 2, 3)
stream2 = ctx.from_values(4, 5, 6)
stream3 = ctx.from_values(7, 8, 9)
unified = stream1.union(stream2, stream3) \
.map(lambda x: x * 10) \
.sink(lambda x: print(f"Unified: {x}"))Control data distribution and enable stateful operations.
def key_by(self, func) -> KeyDataStream:
"""
Partition stream by key for stateful operations.
Args:
func: Function or KeyFunction to extract key from elements
Returns:
KeyDataStream partitioned by the key function
"""
def broadcast(self) -> DataStream:
"""
Broadcast all elements to every parallel instance.
Returns:
DataStream with broadcast partitioning
"""
def partition_by(self, partition_func) -> DataStream:
"""
Partition stream using custom partitioning function.
Args:
partition_func: Function or Partition instance for custom partitioning
Returns:
DataStream with custom partitioning
"""# Key-based processing
ctx.from_values(("a", 1), ("b", 2), ("a", 3), ("b", 4)) \
.key_by(lambda pair: pair[0]) \
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
.sink(lambda x: print(f"Sum for {x[0]}: {x[1]}"))
# Broadcast partitioning
ctx.from_values("broadcast", "message") \
.broadcast() \
.map(lambda x: x.upper()) \
.sink(lambda x: print(f"Broadcast: {x}"))
# Custom partitioning
def custom_partition(element):
return hash(element) % 4
ctx.from_collection(range(20)) \
.partition_by(custom_partition) \
.map(lambda x: f"Partition-{x}") \
.sink(print)Specialized stream for keyed operations that maintain state per key.
from ray.streaming.datastream import KeyDataStream
class KeyDataStream(DataStream):
def reduce(self, func) -> DataStream
def as_java_stream(self) -> JavaKeyDataStreamdef reduce(self, func) -> DataStream:
"""
Apply reduce function to elements with the same key.
Args:
func: Function or ReduceFunction for combining values
Returns:
DataStream with reduced values per key
"""# Word count with reduce
ctx.read_text_file("document.txt") \
.flat_map(lambda line: line.split()) \
.map(lambda word: (word.lower(), 1)) \
.key_by(lambda pair: pair[0]) \
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
.sink(lambda result: print(f"{result[0]}: {result[1]}"))
# Sum by category
data = [("fruits", 10), ("vegetables", 5), ("fruits", 7), ("vegetables", 3)]
ctx.from_collection(data) \
.key_by(lambda item: item[0]) \
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
.sink(lambda result: print(f"Total {result[0]}: {result[1]}"))Entry point streams created from data sources.
from ray.streaming.datastream import StreamSource
class StreamSource(DataStream):
@staticmethod
def build_source(streaming_context, func) -> StreamSourceStreamSource inherits all DataStream transformation methods and serves as the starting point for stream processing pipelines.
from ray.streaming.function import SourceFunction
class NumberSource(SourceFunction):
def init(self, parallel_id, num_parallel):
self.current = parallel_id
self.step = num_parallel
self.max_num = 100
def fetch(self, collector):
while self.current < self.max_num:
collector.collect(self.current)
self.current += self.step
# Create stream from custom source
source_stream = StreamSource.build_source(ctx, NumberSource())
source_stream.map(lambda x: x * 2) \
.filter(lambda x: x > 10) \
.sink(print)Ray Streaming supports mixed Python/Java operations through stream conversion.
Java-based stream for cross-language operations.
from ray.streaming.datastream import JavaDataStream
class JavaDataStream(Stream):
# Java operator methods (require Java class names)
def map(self, java_func_class) -> JavaDataStream
def flat_map(self, java_func_class) -> JavaDataStream
def filter(self, java_func_class) -> JavaDataStream
def union(self, *streams) -> JavaUnionStream
def key_by(self, java_func_class) -> JavaKeyDataStream
def sink(self, java_func_class) -> JavaStreamSink
# Convert back to Python stream
def as_python_stream(self) -> DataStream# Mixed Python/Java processing
python_stream = ctx.from_values("hello", "world", "ray") \
.map(lambda x: x.upper())
# Convert to Java for Java operators
java_stream = python_stream.as_java_stream() \
.map("com.example.JavaMapperFunction") \
.filter("com.example.JavaFilterFunction")
# Convert back to Python
result_stream = java_stream.as_python_stream() \
.map(lambda x: f"Processed: {x}") \
.sink(print)Configure stream behavior using the configuration system.
# Configure individual streams
stream = ctx.from_collection(range(100)) \
.set_parallelism(4) \
.with_config("streaming.buffer.size", "1024") \
.with_config("streaming.timeout", "5000")
# Multiple configuration options
config = {
"streaming.checkpoint.interval": "10000",
"streaming.queue.capacity": "500"
}
configured_stream = stream.with_config(conf=config)# Optimize for throughput
high_throughput_stream = ctx.from_collection(large_dataset) \
.set_parallelism(8) \
.with_config("streaming.buffer.size", "4096") \
.disable_chain() # Prevent operator chaining
# Optimize for latency
low_latency_stream = ctx.from_collection(data) \
.set_parallelism(1) \
.forward() # Local forwarding
.with_config("streaming.timeout", "100")# Split stream into multiple processing branches
source = ctx.from_collection(range(100))
# Branch 1: Even numbers
evens = source.filter(lambda x: x % 2 == 0) \
.map(lambda x: f"Even: {x}")
# Branch 2: Odd numbers
odds = source.filter(lambda x: x % 2 == 1) \
.map(lambda x: f"Odd: {x}")
# Merge branches
evens.union(odds).sink(print)# Maintain running totals per key
ctx.from_values(("A", 10), ("B", 5), ("A", 15), ("B", 20)) \
.key_by(lambda pair: pair[0]) \
.reduce(lambda running_total, new_value:
(running_total[0], running_total[1] + new_value[1])) \
.sink(lambda result: print(f"Running total for {result[0]}: {result[1]}"))Handle errors in stream processing operations.
def safe_transform(x):
try:
return int(x) * 2
except ValueError:
return 0 # Default value for invalid input
ctx.from_values("1", "2", "invalid", "4") \
.map(safe_transform) \
.filter(lambda x: x > 0) \
.sink(lambda x: print(f"Valid result: {x}"))Install with Tessl CLI
npx tessl i tessl/pypi-ray-streaming