Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API
—
This document covers the source function interfaces and implementations in Ray Streaming, including built-in source functions and patterns for creating custom data sources.
Source functions are the entry points for data ingestion in Ray Streaming. They provide the mechanism to:
The base interface for all source functions in Ray Streaming.
from ray.streaming.function import SourceFunction, SourceContext
class SourceFunction:
def init(self, parallel: int, index: int) -> None
def fetch(self, ctx: SourceContext) -> None
def close(self) -> None
# Inherited from Function base class
def open(self, runtime_context) -> None
def save_checkpoint(self) -> object
def load_checkpoint(self, checkpoint_obj) -> NoneThe context interface used by source functions to emit elements.
class SourceContext:
def collect(self, element) -> NoneRay Streaming provides several built-in source function implementations.
Creates a stream from a Python collection (list, tuple, etc.).
from ray.streaming.function import CollectionSourceFunction
class CollectionSourceFunction(SourceFunction):
def __init__(self, values)
def init(self, parallel: int, index: int) -> None
def fetch(self, ctx: SourceContext) -> NoneReads data from a local text file line by line.
from ray.streaming.function import LocalFileSourceFunction
class LocalFileSourceFunction(SourceFunction):
def __init__(self, filename: str)
def init(self, parallel: int, index: int) -> None
def fetch(self, ctx: SourceContext) -> NoneUse built-in collection source for in-memory data.
from ray.streaming import StreamingContext
from ray.streaming.function import CollectionSourceFunction
ctx = StreamingContext.Builder().build()
# Using StreamingContext convenience method
stream1 = ctx.from_collection([1, 2, 3, 4, 5])
# Using source function directly
collection_source = CollectionSourceFunction([10, 20, 30, 40, 50])
stream2 = ctx.source(collection_source)
# Process the streams
stream1.map(lambda x: x * 2).sink(print)
stream2.filter(lambda x: x > 25).sink(print)
ctx.submit("collection_job")Use built-in file source for line-by-line file processing.
from ray.streaming import StreamingContext
from ray.streaming.function import LocalFileSourceFunction
ctx = StreamingContext.Builder().build()
# Using StreamingContext convenience method
stream1 = ctx.read_text_file("input.txt")
# Using source function directly
file_source = LocalFileSourceFunction("data.txt")
stream2 = ctx.source(file_source)
# Process file content
stream1.flat_map(lambda line: line.split()) \
.map(lambda word: word.lower()) \
.sink(print)
ctx.submit("file_processing_job")Create custom source functions by implementing the SourceFunction interface.
from ray.streaming.function import SourceFunction
import time
import random
class NumberGeneratorSource(SourceFunction):
def init(self, parallel, index):
"""Initialize source function with parallelism info"""
self.parallel = parallel
self.index = index
self.count = 0
self.max_count = 100
def fetch(self, ctx):
"""Generate and emit elements"""
while self.count < self.max_count:
# Generate numbers based on parallel index
number = (self.count * self.parallel) + self.index
ctx.collect(number)
self.count += 1
time.sleep(0.1) # Simulate data arrival rate
def close(self):
"""Cleanup resources"""
print(f"Source {self.index} closing after {self.count} elements")
# Use custom source
ctx = StreamingContext.Builder().build()
stream = ctx.source(NumberGeneratorSource())
stream.map(lambda x: f"Generated: {x}").sink(print)
ctx.submit("number_generator_job")Maintain state across checkpoint cycles for fault tolerance.
from ray.streaming.function import SourceFunction
import json
class StatefulCounterSource(SourceFunction):
def init(self, parallel, index):
self.parallel = parallel
self.index = index
self.counter = 0
self.max_count = 1000
def fetch(self, ctx):
while self.counter < self.max_count:
ctx.collect(f"count-{self.counter}-from-{self.index}")
self.counter += 1
# Simulate processing delay
if self.counter % 100 == 0:
time.sleep(0.5)
def save_checkpoint(self):
"""Save current state for fault tolerance"""
return {
'counter': self.counter,
'parallel': self.parallel,
'index': self.index
}
def load_checkpoint(self, checkpoint_obj):
"""Restore state from checkpoint"""
if checkpoint_obj:
self.counter = checkpoint_obj['counter']
self.parallel = checkpoint_obj['parallel']
self.index = checkpoint_obj['index']Connect to external systems like databases or message queues.
from ray.streaming.function import SourceFunction
import time
class DatabaseSource(SourceFunction):
def __init__(self, connection_string, query):
self.connection_string = connection_string
self.query = query
self.connection = None
self.cursor = None
def init(self, parallel, index):
self.parallel = parallel
self.index = index
def open(self, runtime_context):
"""Initialize database connection"""
# Simulated database connection
print(f"Connecting to database: {self.connection_string}")
self.connection = "mock_connection"
self.cursor = "mock_cursor"
def fetch(self, ctx):
"""Fetch data from database"""
# Simulated database query results
mock_results = [
{"id": i, "name": f"record_{i}", "value": i * 10}
for i in range(1, 101)
]
for record in mock_results:
ctx.collect(record)
time.sleep(0.01) # Simulate query time
def close(self):
"""Clean up database connections"""
if self.cursor:
print("Closing cursor")
if self.connection:
print("Closing database connection")
# Usage
ctx = StreamingContext.Builder().build()
db_source = DatabaseSource("postgresql://localhost/mydb", "SELECT * FROM users")
stream = ctx.source(db_source)
stream.map(lambda record: f"User: {record['name']}, Value: {record['value']}") \
.sink(print)
ctx.submit("database_job")Handle parallel execution across multiple source instances.
from ray.streaming.function import SourceFunction
import time
class ParallelRangeSource(SourceFunction):
def __init__(self, start, end):
self.start = start
self.end = end
def init(self, parallel, index):
"""Distribute range across parallel instances"""
self.parallel = parallel
self.index = index
# Calculate range for this parallel instance
total_range = self.end - self.start
range_per_instance = total_range // parallel
self.local_start = self.start + (index * range_per_instance)
if index == parallel - 1: # Last instance gets remainder
self.local_end = self.end
else:
self.local_end = self.local_start + range_per_instance
print(f"Instance {index}/{parallel} handling range {self.local_start}-{self.local_end}")
def fetch(self, ctx):
"""Generate numbers in assigned range"""
for num in range(self.local_start, self.local_end):
ctx.collect(num)
if num % 1000 == 0:
time.sleep(0.1) # Periodic pause
# Usage with parallelism
ctx = StreamingContext.Builder().build()
parallel_source = ParallelRangeSource(0, 10000)
stream = ctx.source(parallel_source)
stream.set_parallelism(4) \
.map(lambda x: x ** 2) \
.sink(lambda x: print(f"Square: {x}"))
ctx.submit("parallel_range_job")Create continuous data streams that simulate real-time data sources.
from ray.streaming.function import SourceFunction
import time
import random
import json
from datetime import datetime
class SensorDataSource(SourceFunction):
def __init__(self, sensor_id, measurement_interval=1.0):
self.sensor_id = sensor_id
self.measurement_interval = measurement_interval
def init(self, parallel, index):
self.parallel = parallel
self.index = index
# Each parallel instance simulates different sensors
self.actual_sensor_id = f"{self.sensor_id}_{index}"
def fetch(self, ctx):
"""Generate continuous sensor readings"""
reading_count = 0
while True: # Continuous stream
timestamp = datetime.now().isoformat()
temperature = 20 + random.uniform(-5, 15)
humidity = 50 + random.uniform(-20, 30)
sensor_reading = {
"sensor_id": self.actual_sensor_id,
"timestamp": timestamp,
"temperature": round(temperature, 2),
"humidity": round(humidity, 2),
"reading_count": reading_count
}
ctx.collect(sensor_reading)
reading_count += 1
time.sleep(self.measurement_interval)
# Process sensor data stream
ctx = StreamingContext.Builder().build()
sensor_stream = ctx.source(SensorDataSource("temp_sensor", 0.5))
sensor_stream.set_parallelism(3) \
.filter(lambda reading: reading["temperature"] > 25) \
.map(lambda reading: f"HIGH TEMP: {reading['sensor_id']} - {reading['temperature']}°C") \
.sink(print)
ctx.submit("sensor_monitoring_job")Handle errors gracefully in source functions.
from ray.streaming.function import SourceFunction
import time
import random
class ResilientSource(SourceFunction):
def __init__(self, failure_rate=0.1):
self.failure_rate = failure_rate
self.retry_count = 0
self.max_retries = 3
def init(self, parallel, index):
self.parallel = parallel
self.index = index
self.processed_count = 0
def fetch(self, ctx):
while self.processed_count < 1000:
try:
# Simulate potential failures
if random.random() < self.failure_rate:
raise Exception(f"Simulated failure at count {self.processed_count}")
# Normal processing
data = f"data-{self.processed_count}-{self.index}"
ctx.collect(data)
self.processed_count += 1
self.retry_count = 0 # Reset retry count on success
except Exception as e:
self.retry_count += 1
if self.retry_count <= self.max_retries:
print(f"Source {self.index} error: {e}, retry {self.retry_count}/{self.max_retries}")
time.sleep(1 * self.retry_count) # Exponential backoff
else:
print(f"Source {self.index} failed permanently after {self.max_retries} retries")
raise e
time.sleep(0.1)
# Usage
ctx = StreamingContext.Builder().build()
resilient_stream = ctx.source(ResilientSource(failure_rate=0.05))
resilient_stream.map(lambda x: f"Processed: {x}").sink(print)
ctx.submit("resilient_job")init() for parallel-aware setup, open() for resource initializationclose() method# Configure source parallelism and performance
ctx = StreamingContext.Builder() \
.option("streaming.source.parallelism", "4") \
.option("streaming.checkpoint.interval", "10000") \
.build()
source_stream = ctx.source(MyCustomSource()) \
.set_parallelism(4) \
.with_config("streaming.buffer.size", "2048")Install with Tessl CLI
npx tessl i tessl/pypi-ray-streaming