Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API
—
This document covers the detailed stream transformation operations available in Ray Streaming, including transformation functions, partitioning strategies, and advanced stream processing patterns.
Ray Streaming provides a comprehensive set of stream transformation operations:
Ray Streaming defines function interfaces for type-safe and extensible stream processing.
from ray.streaming.function import Function
class Function:
def open(self, runtime_context) -> None
def close(self) -> None
def save_checkpoint(self) -> object
def load_checkpoint(self, checkpoint_obj) -> None# Map function interface
class MapFunction(Function):
def map(self, value) -> object
# FlatMap function interface
class FlatMapFunction(Function):
def flat_map(self, value, collector) -> None
# Filter function interface
class FilterFunction(Function):
def filter(self, value) -> bool
# Key extraction function interface
class KeyFunction(Function):
def key_by(self, value) -> object
# Reduce function interface
class ReduceFunction(Function):
def reduce(self, old_value, new_value) -> object
# Sink function interface
class SinkFunction(Function):
def sink(self, value) -> NoneTransform each element in the stream using a one-to-one function.
def map(self, func) -> DataStream:
"""
Apply a function to each element in the stream.
Args:
func: Function or MapFunction instance for transformation
Returns:
New DataStream with transformed elements
"""from ray.streaming import StreamingContext
from ray.streaming.function import MapFunction
# Using lambda function
ctx = StreamingContext.Builder().build()
ctx.from_collection([1, 2, 3, 4, 5]) \
.map(lambda x: x * 2) \
.sink(lambda x: print(f"Doubled: {x}"))
# Using custom MapFunction
class SquareMapFunction(MapFunction):
def map(self, value):
return value ** 2
ctx.from_collection([1, 2, 3, 4, 5]) \
.map(SquareMapFunction()) \
.sink(lambda x: print(f"Squared: {x}"))
# Complex transformation
ctx.from_collection(["hello", "world", "ray", "streaming"]) \
.map(lambda word: {"word": word, "length": len(word), "upper": word.upper()}) \
.sink(lambda obj: print(f"Word: {obj['word']}, Length: {obj['length']}"))
ctx.submit("map_operations")Transform each element into zero or more output elements.
def flat_map(self, func) -> DataStream:
"""
Transform each element into multiple output elements.
Args:
func: Function or FlatMapFunction that returns iterable
Returns:
New DataStream with flattened results
"""from ray.streaming.function import FlatMapFunction
# Split sentences into words
ctx.from_values("hello world", "ray streaming", "distributed computing") \
.flat_map(lambda sentence: sentence.split()) \
.map(lambda word: word.upper()) \
.sink(lambda word: print(f"Word: {word}"))
# Using custom FlatMapFunction
class TokenizeFlatMapFunction(FlatMapFunction):
def flat_map(self, value, collector):
words = value.split()
for word in words:
if len(word) > 3: # Only emit words longer than 3 characters
collector.collect(word.lower())
ctx.from_values("The quick brown fox jumps over the lazy dog") \
.flat_map(TokenizeFlatMapFunction()) \
.sink(print)
# Generate multiple outputs per input
ctx.from_collection([1, 2, 3]) \
.flat_map(lambda x: [x, x*2, x*3]) \
.sink(lambda x: print(f"Generated: {x}"))
ctx.submit("flatmap_operations")Keep only elements that satisfy a predicate condition.
def filter(self, func) -> DataStream:
"""
Filter elements based on a predicate function.
Args:
func: Function or FilterFunction returning boolean
Returns:
New DataStream with filtered elements
"""from ray.streaming.function import FilterFunction
# Simple filtering
ctx.from_collection(range(10)) \
.filter(lambda x: x % 2 == 0) \
.sink(lambda x: print(f"Even: {x}"))
# Using custom FilterFunction
class PositiveFilterFunction(FilterFunction):
def filter(self, value):
return value > 0
ctx.from_collection([-3, -1, 0, 1, 5, -2, 8]) \
.filter(PositiveFilterFunction()) \
.sink(lambda x: print(f"Positive: {x}"))
# Complex filtering with string operations
ctx.from_values("apple", "banana", "cherry", "date", "elderberry") \
.filter(lambda fruit: len(fruit) > 5 and 'e' in fruit) \
.sink(lambda fruit: print(f"Long fruit with 'e': {fruit}"))
ctx.submit("filter_operations")Partition stream by key for stateful operations.
def key_by(self, func) -> KeyDataStream:
"""
Partition stream by key extracted using the provided function.
Args:
func: Function or KeyFunction to extract key from elements
Returns:
KeyDataStream partitioned by the key function
"""from ray.streaming.function import KeyFunction
# Group by key for word counting
ctx.from_values("apple", "banana", "apple", "cherry", "banana", "apple") \
.map(lambda word: (word, 1)) \
.key_by(lambda pair: pair[0]) \
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
.sink(lambda result: print(f"Count: {result[0]} = {result[1]}"))
# Using custom KeyFunction
class CategoryKeyFunction(KeyFunction):
def key_by(self, value):
# Group items by their category
categories = {
'fruits': ['apple', 'banana', 'cherry'],
'vegetables': ['carrot', 'broccoli', 'spinach'],
'grains': ['rice', 'wheat', 'oats']
}
for category, items in categories.items():
if value in items:
return category
return 'other'
ctx.from_values("apple", "carrot", "banana", "rice", "broccoli") \
.key_by(CategoryKeyFunction()) \
.reduce(lambda old, new: f"{old},{new}" if isinstance(old, str) else f"{old},{new}") \
.sink(lambda result: print(f"Category items: {result}"))
ctx.submit("keyed_operations")Combine elements with the same key using a reduce function.
def reduce(self, func) -> DataStream:
"""
Reduce elements with the same key using the provided function.
Args:
func: Function or ReduceFunction for combining values
Returns:
DataStream with reduced values per key
"""from ray.streaming.function import ReduceFunction
# Sum values by key
ctx.from_values(("A", 10), ("B", 5), ("A", 15), ("B", 20), ("A", 8)) \
.key_by(lambda pair: pair[0]) \
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
.sink(lambda result: print(f"Sum for {result[0]}: {result[1]}"))
# Using custom ReduceFunction
class MaxReduceFunction(ReduceFunction):
def reduce(self, old_value, new_value):
return max(old_value, new_value, key=lambda x: x[1])
ctx.from_values(("user1", 100), ("user2", 85), ("user1", 120), ("user2", 95)) \
.key_by(lambda pair: pair[0]) \
.reduce(MaxReduceFunction()) \
.sink(lambda result: print(f"Max score for {result[0]}: {result[1]}"))
# String aggregation
ctx.from_values(("group1", "a"), ("group2", "x"), ("group1", "b"), ("group2", "y")) \
.key_by(lambda pair: pair[0]) \
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
.sink(lambda result: print(f"Concatenated {result[0]}: {result[1]}"))
ctx.submit("reduce_operations")Merge multiple streams of the same type.
def union(self, *streams) -> UnionStream:
"""
Union this stream with other streams.
Args:
*streams: DataStreams to union with this stream
Returns:
UnionStream containing elements from all input streams
"""# Union multiple data sources
stream1 = ctx.from_values("source1-a", "source1-b", "source1-c")
stream2 = ctx.from_values("source2-x", "source2-y", "source2-z")
stream3 = ctx.from_values("source3-1", "source3-2", "source3-3")
# Union all streams
unified = stream1.union(stream2, stream3) \
.map(lambda x: f"Unified: {x}") \
.sink(print)
# Union with different processing branches
numbers = ctx.from_collection(range(20))
evens = numbers.filter(lambda x: x % 2 == 0).map(lambda x: f"Even: {x}")
odds = numbers.filter(lambda x: x % 2 == 1).map(lambda x: f"Odd: {x}")
evens.union(odds).sink(print)
ctx.submit("union_operations")Send all elements to every parallel instance of the next operator.
def broadcast(self) -> DataStream:
"""
Broadcast all elements to every parallel instance.
Returns:
DataStream with broadcast partitioning
"""# Broadcast configuration data to all workers
config_stream = ctx.from_values("config1", "config2", "config3") \
.broadcast() \
.map(lambda config: f"All workers got: {config}") \
.sink(print)
# Broadcast lookup table
lookup_data = ctx.from_values(("key1", "value1"), ("key2", "value2")) \
.broadcast() \
.map(lambda pair: f"Lookup: {pair[0]} -> {pair[1]}") \
.sink(print)
ctx.submit("broadcast_operations")Use custom partitioning logic to control data distribution.
def partition_by(self, partition_func) -> DataStream:
"""
Partition stream using custom partitioning function.
Args:
partition_func: Function or Partition instance for custom partitioning
Returns:
DataStream with custom partitioning
"""from ray.streaming.partition import Partition
# Hash-based partitioning
def hash_partition(element):
return hash(str(element)) % 4
ctx.from_collection(range(20)) \
.partition_by(hash_partition) \
.map(lambda x: f"Partitioned: {x}") \
.sink(print)
# Custom partitioning class
class RegionPartition(Partition):
def partition(self, record, num_partitions):
region_map = {"US": 0, "EU": 1, "ASIA": 2}
region = record.get("region", "OTHER")
return region_map.get(region, 3) % num_partitions
regions_data = [
{"id": 1, "region": "US", "data": "user1"},
{"id": 2, "region": "EU", "data": "user2"},
{"id": 3, "region": "ASIA", "data": "user3"}
]
ctx.from_collection(regions_data) \
.partition_by(RegionPartition()) \
.map(lambda record: f"Region {record['region']}: {record['data']}") \
.sink(print)
ctx.submit("partition_operations")Forward elements locally to the next operator.
def forward(self) -> DataStream:
"""
Forward elements locally to avoid network transfer.
Returns:
DataStream with forward partitioning
"""Define output behavior for stream processing results.
def sink(self, func) -> StreamSink:
"""
Create a sink for the stream.
Args:
func: Function or SinkFunction for handling output
Returns:
StreamSink representing the output operation
"""from ray.streaming.function import SinkFunction
# Simple sink with lambda
ctx.from_collection([1, 2, 3, 4, 5]) \
.map(lambda x: x ** 2) \
.sink(lambda x: print(f"Result: {x}"))
# Custom SinkFunction
class FileSinkFunction(SinkFunction):
def __init__(self, filename):
self.filename = filename
self.file = None
def open(self, runtime_context):
self.file = open(self.filename, 'w')
def sink(self, value):
self.file.write(f"{value}\n")
self.file.flush()
def close(self):
if self.file:
self.file.close()
ctx.from_values("line1", "line2", "line3") \
.sink(FileSinkFunction("output.txt"))
# Database sink simulation
class DatabaseSinkFunction(SinkFunction):
def open(self, runtime_context):
print("Connecting to database...")
self.connection = "mock_db_connection"
def sink(self, value):
print(f"INSERT INTO results VALUES ('{value}')")
def close(self):
print("Closing database connection")
ctx.from_collection(range(5)) \
.map(lambda x: f"record_{x}") \
.sink(DatabaseSinkFunction())
ctx.submit("sink_operations")Although Ray Streaming doesn't have built-in windowing, you can simulate time-based processing.
import time
from datetime import datetime
# Time-based processing simulation
def timestamped_data():
for i in range(10):
yield {"timestamp": datetime.now().isoformat(), "value": i}
time.sleep(0.5)
class TimestampedSource(SourceFunction):
def fetch(self, ctx):
for data in timestamped_data():
ctx.collect(data)
ctx.from_source(TimestampedSource()) \
.map(lambda record: f"Time: {record['timestamp']}, Value: {record['value']}") \
.sink(print)Process multiple streams with different operations.
# Create multiple processing branches
source = ctx.from_collection(range(100))
# Branch 1: Even numbers processing
evens = source.filter(lambda x: x % 2 == 0) \
.map(lambda x: f"Even: {x}")
# Branch 2: Odd numbers processing
odds = source.filter(lambda x: x % 2 == 1) \
.map(lambda x: f"Odd: {x}")
# Branch 3: Multiples of 5
fives = source.filter(lambda x: x % 5 == 0) \
.map(lambda x: f"Multiple of 5: {x}")
# Combine all branches
evens.union(odds, fives) \
.sink(print)Control operator chaining for performance optimization.
# Disable chaining for better parallelism
stream = ctx.from_collection(large_dataset) \
.disable_chain() \
.map(expensive_transformation) \
.disable_chain() \
.filter(complex_predicate)
# Use forward for local processing
local_stream = ctx.from_collection(data) \
.forward() \
.map(local_transformation)Configure parallelism for different operations.
# Different parallelism for different operations
ctx.from_collection(data) \
.set_parallelism(8) \
.map(cpu_intensive_function) \
.set_parallelism(4) \
.reduce(reduction_function) \
.set_parallelism(1) \
.sink(output_function)Install with Tessl CLI
npx tessl i tessl/pypi-ray-streaming