CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ray-streaming

Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API

Pending
Overview
Eval results
Files

streaming-context.mddocs/

Streaming Context and Job Management

This document covers the StreamingContext class, which serves as the main entry point for Ray streaming functionality and provides job lifecycle management.

Overview

The StreamingContext is the primary interface for creating and managing streaming jobs in Ray Streaming. It provides methods for:

  • Creating data streams from various sources
  • Configuring streaming job parameters
  • Submitting jobs for execution
  • Managing the streaming job lifecycle

StreamingContext

The main class for Ray streaming operations, acting as a wrapper around the Java StreamingContext implementation.

Core API

from ray.streaming import StreamingContext
from ray.streaming.function import SourceFunction
from ray.streaming.datastream import StreamSource

# Main StreamingContext class
class StreamingContext:
    def __init__(self)
    
    # Create stream from custom source function
    def source(self, source_func: SourceFunction) -> StreamSource
    
    # Create stream from multiple values
    def from_values(self, *values) -> StreamSource
    
    # Create stream from collection/list
    def from_collection(self, values) -> StreamSource
    
    # Create stream from text file (line by line)
    def read_text_file(self, filename: str) -> StreamSource
    
    # Submit job for execution
    def submit(self, job_name: str) -> None

StreamingContext.Builder

Configuration builder for creating StreamingContext instances with custom settings.

class StreamingContext.Builder:
    def __init__(self)
    
    # Set configuration option(s)
    def option(self, key=None, value=None, conf=None) -> Builder
    
    # Create configured StreamingContext
    def build(self) -> StreamingContext

Capabilities

Context Creation and Configuration

Create and configure streaming contexts with various options.

from ray.streaming import StreamingContext

# Simple context creation
ctx = StreamingContext()

# Context with configuration
ctx = StreamingContext.Builder() \
    .option("streaming.worker-num", "4") \
    .option("streaming.context.backend.type", "MEMORY") \
    .build()

# Multiple configuration options
config = {
    "streaming.checkpoint.interval": "5000",
    "streaming.queue.capacity": "1000"
}
ctx = StreamingContext.Builder() \
    .option(conf=config) \
    .build()

Data Source Creation

Create data streams from various input sources.

# Create stream from values
stream = ctx.from_values("hello", "world", "ray", "streaming")

# Create stream from collection
data = [1, 2, 3, 4, 5]
stream = ctx.from_collection(data)

# Create stream from text file
stream = ctx.read_text_file("input.txt")

# Create stream from custom source function
from ray.streaming.function import SourceFunction

class MySource(SourceFunction):
    def fetch(self, collector):
        for i in range(100):
            collector.collect(f"item-{i}")

stream = ctx.source(MySource())

Job Submission and Execution

Submit streaming jobs for execution on the Ray cluster.

# Build streaming pipeline
ctx.from_values(1, 2, 3, 4, 5) \
    .map(lambda x: x * 2) \
    .filter(lambda x: x > 4) \
    .sink(lambda x: print(f"Result: {x}"))

# Submit job with descriptive name
ctx.submit("data_processing_job")

Usage Examples

Basic Streaming Job

import ray
from ray.streaming import StreamingContext

# Initialize Ray
ray.init()

# Create streaming context
ctx = StreamingContext.Builder().build()

# Create and process stream
ctx.from_collection([1, 2, 3, 4, 5]) \
    .map(lambda x: x ** 2) \
    .filter(lambda x: x > 10) \
    .sink(lambda x: print(f"Large square: {x}"))

# Submit job
ctx.submit("squares_job")

Word Count Example

import ray
from ray.streaming import StreamingContext

ray.init()
ctx = StreamingContext.Builder() \
    .option("streaming.worker-num", "2") \
    .build()

# Process text file
ctx.read_text_file("document.txt") \
    .flat_map(lambda line: line.split()) \
    .map(lambda word: (word.lower(), 1)) \
    .key_by(lambda pair: pair[0]) \
    .reduce(lambda old, new: (old[0], old[1] + new[1])) \
    .sink(lambda result: print(f"Word: {result[0]}, Count: {result[1]}"))

ctx.submit("word_count")

Custom Source Function

import ray
from ray.streaming import StreamingContext
from ray.streaming.function import SourceFunction
import time

class TimestampSource(SourceFunction):
    def init(self, parallel_id, num_parallel):
        self.count = 0
        self.max_count = 10
    
    def fetch(self, collector):
        while self.count < self.max_count:
            timestamp = int(time.time())
            collector.collect(f"timestamp-{timestamp}-{self.count}")
            self.count += 1
            time.sleep(1)

ray.init()
ctx = StreamingContext.Builder().build()

ctx.source(TimestampSource()) \
    .map(lambda x: x.upper()) \
    .sink(lambda x: print(f"Processed: {x}"))

ctx.submit("timestamp_job")

Configuration Options

Ray Streaming supports various configuration options for job tuning:

# Performance tuning
ctx = StreamingContext.Builder() \
    .option("streaming.worker-num", "8") \
    .option("streaming.queue.capacity", "2000") \
    .option("streaming.checkpoint.interval", "10000") \
    .build()

# Backend configuration  
ctx = StreamingContext.Builder() \
    .option("streaming.context.backend.type", "LOCAL_FILE") \
    .option("streaming.context.backend.path", "/tmp/streaming") \
    .build()

# Multiple options via dictionary
config = {
    "streaming.worker-num": "4",
    "streaming.context.backend.type": "MEMORY",
    "streaming.checkpoint.interval": "5000",
    "streaming.queue.capacity": "1000"
}
ctx = StreamingContext.Builder() \
    .option(conf=config) \
    .build()

Integration with Ray

StreamingContext integrates seamlessly with Ray's distributed computing capabilities:

  • Ray Actors: Streaming workers run as Ray actors for distributed processing
  • Ray Dashboard: Job monitoring and metrics available through Ray Dashboard
  • Resource Management: Leverages Ray's resource allocation and scheduling
  • Fault Tolerance: Built on Ray's actor supervision and recovery mechanisms

Advanced Usage

Error Handling

try:
    ctx.from_collection([1, 2, 3]) \
        .map(lambda x: x / 0)  # This will cause division by zero
        .sink(print)
    ctx.submit("error_job")
except Exception as e:
    print(f"Job failed: {e}")

Resource Configuration

# Configure resources for streaming job
ctx = StreamingContext.Builder() \
    .option("streaming.worker-num", "4") \
    .option("streaming.worker-cpu", "2") \
    .option("streaming.worker-memory", "2GB") \
    .build()

Best Practices

  1. Resource Planning: Configure worker count based on data volume and processing complexity
  2. Checkpoint Intervals: Set appropriate checkpoint intervals for fault tolerance vs. performance
  3. Parallelism: Use set_parallelism() on streams to control processing parallelism
  4. Error Handling: Implement proper error handling in user functions
  5. Resource Cleanup: Ensure Ray cluster is properly shut down after job completion

See Also

  • Data Streams Documentation - Stream transformation operations
  • Source Functions Documentation - Custom data source implementation
  • Stream Operations Documentation - Available stream transformations

Install with Tessl CLI

npx tessl i tessl/pypi-ray-streaming

docs

cross-language.md

data-streams.md

index.md

source-functions.md

stream-operations.md

streaming-context.md

tile.json