Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API
—
This document covers the StreamingContext class, which serves as the main entry point for Ray streaming functionality and provides job lifecycle management.
The StreamingContext is the primary interface for creating and managing streaming jobs in Ray Streaming. It provides methods for:
The main class for Ray streaming operations, acting as a wrapper around the Java StreamingContext implementation.
from ray.streaming import StreamingContext
from ray.streaming.function import SourceFunction
from ray.streaming.datastream import StreamSource
# Main StreamingContext class
class StreamingContext:
def __init__(self)
# Create stream from custom source function
def source(self, source_func: SourceFunction) -> StreamSource
# Create stream from multiple values
def from_values(self, *values) -> StreamSource
# Create stream from collection/list
def from_collection(self, values) -> StreamSource
# Create stream from text file (line by line)
def read_text_file(self, filename: str) -> StreamSource
# Submit job for execution
def submit(self, job_name: str) -> NoneConfiguration builder for creating StreamingContext instances with custom settings.
class StreamingContext.Builder:
def __init__(self)
# Set configuration option(s)
def option(self, key=None, value=None, conf=None) -> Builder
# Create configured StreamingContext
def build(self) -> StreamingContextCreate and configure streaming contexts with various options.
from ray.streaming import StreamingContext
# Simple context creation
ctx = StreamingContext()
# Context with configuration
ctx = StreamingContext.Builder() \
.option("streaming.worker-num", "4") \
.option("streaming.context.backend.type", "MEMORY") \
.build()
# Multiple configuration options
config = {
"streaming.checkpoint.interval": "5000",
"streaming.queue.capacity": "1000"
}
ctx = StreamingContext.Builder() \
.option(conf=config) \
.build()Create data streams from various input sources.
# Create stream from values
stream = ctx.from_values("hello", "world", "ray", "streaming")
# Create stream from collection
data = [1, 2, 3, 4, 5]
stream = ctx.from_collection(data)
# Create stream from text file
stream = ctx.read_text_file("input.txt")
# Create stream from custom source function
from ray.streaming.function import SourceFunction
class MySource(SourceFunction):
def fetch(self, collector):
for i in range(100):
collector.collect(f"item-{i}")
stream = ctx.source(MySource())Submit streaming jobs for execution on the Ray cluster.
# Build streaming pipeline
ctx.from_values(1, 2, 3, 4, 5) \
.map(lambda x: x * 2) \
.filter(lambda x: x > 4) \
.sink(lambda x: print(f"Result: {x}"))
# Submit job with descriptive name
ctx.submit("data_processing_job")import ray
from ray.streaming import StreamingContext
# Initialize Ray
ray.init()
# Create streaming context
ctx = StreamingContext.Builder().build()
# Create and process stream
ctx.from_collection([1, 2, 3, 4, 5]) \
.map(lambda x: x ** 2) \
.filter(lambda x: x > 10) \
.sink(lambda x: print(f"Large square: {x}"))
# Submit job
ctx.submit("squares_job")import ray
from ray.streaming import StreamingContext
ray.init()
ctx = StreamingContext.Builder() \
.option("streaming.worker-num", "2") \
.build()
# Process text file
ctx.read_text_file("document.txt") \
.flat_map(lambda line: line.split()) \
.map(lambda word: (word.lower(), 1)) \
.key_by(lambda pair: pair[0]) \
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
.sink(lambda result: print(f"Word: {result[0]}, Count: {result[1]}"))
ctx.submit("word_count")import ray
from ray.streaming import StreamingContext
from ray.streaming.function import SourceFunction
import time
class TimestampSource(SourceFunction):
def init(self, parallel_id, num_parallel):
self.count = 0
self.max_count = 10
def fetch(self, collector):
while self.count < self.max_count:
timestamp = int(time.time())
collector.collect(f"timestamp-{timestamp}-{self.count}")
self.count += 1
time.sleep(1)
ray.init()
ctx = StreamingContext.Builder().build()
ctx.source(TimestampSource()) \
.map(lambda x: x.upper()) \
.sink(lambda x: print(f"Processed: {x}"))
ctx.submit("timestamp_job")Ray Streaming supports various configuration options for job tuning:
# Performance tuning
ctx = StreamingContext.Builder() \
.option("streaming.worker-num", "8") \
.option("streaming.queue.capacity", "2000") \
.option("streaming.checkpoint.interval", "10000") \
.build()
# Backend configuration
ctx = StreamingContext.Builder() \
.option("streaming.context.backend.type", "LOCAL_FILE") \
.option("streaming.context.backend.path", "/tmp/streaming") \
.build()
# Multiple options via dictionary
config = {
"streaming.worker-num": "4",
"streaming.context.backend.type": "MEMORY",
"streaming.checkpoint.interval": "5000",
"streaming.queue.capacity": "1000"
}
ctx = StreamingContext.Builder() \
.option(conf=config) \
.build()StreamingContext integrates seamlessly with Ray's distributed computing capabilities:
try:
ctx.from_collection([1, 2, 3]) \
.map(lambda x: x / 0) # This will cause division by zero
.sink(print)
ctx.submit("error_job")
except Exception as e:
print(f"Job failed: {e}")# Configure resources for streaming job
ctx = StreamingContext.Builder() \
.option("streaming.worker-num", "4") \
.option("streaming.worker-cpu", "2") \
.option("streaming.worker-memory", "2GB") \
.build()set_parallelism() on streams to control processing parallelismInstall with Tessl CLI
npx tessl i tessl/pypi-ray-streaming