or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cross-language.mddata-streams.mdindex.mdsource-functions.mdstream-operations.mdstreaming-context.md
tile.json

tessl/pypi-ray-streaming

Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/ray@1.10.x#streaming

To install, run

npx @tessl/cli install tessl/pypi-ray-streaming@1.10.0

index.mddocs/

Ray Streaming

A distributed streaming processing framework built on Ray that provides fault-tolerant, scalable stream processing with Python API and cross-language support.

Package Information

  • Package Name: ray (streaming module)
  • Package Type: PyPI
  • Language: Python
  • Installation: pip install ray

Overview

Ray Streaming provides a distributed stream processing framework built on Ray's actor model. It enables fault-tolerant stream processing with automatic checkpointing, resource management, and cross-language integration between Python and Java workloads.

Key Features:

  • Distributed stream processing with fault tolerance
  • Single-node failover mechanism for fast recovery
  • Cross-language support (Python and Java operators)
  • Built-in checkpointing and state management
  • Integration with Ray's distributed computing capabilities

Core Imports

from ray.streaming import StreamingContext
from ray.streaming.datastream import DataStream, StreamSource
from ray.streaming.function import SourceFunction, CollectionSourceFunction

Basic Usage

Simple Word Count Example

import ray
from ray.streaming import StreamingContext

# Initialize Ray and create streaming context
ray.init()
ctx = StreamingContext.Builder().build()

# Create a simple streaming pipeline
ctx.read_text_file("input.txt") \
    .set_parallelism(1) \
    .flat_map(lambda x: x.split()) \
    .map(lambda x: (x, 1)) \
    .key_by(lambda x: x[0]) \
    .reduce(lambda old_value, new_value: 
            (old_value[0], old_value[1] + new_value[1])) \
    .filter(lambda x: "ray" not in x) \
    .sink(lambda x: print("result", x))

# Submit the job
ctx.submit("word_count_job")

Creating Data Streams from Collections

from ray.streaming import StreamingContext

ctx = StreamingContext.Builder().build()

# Create stream from values
ctx.from_values("a", "b", "c") \
    .map(lambda x: x.upper()) \
    .sink(lambda x: print(x))

# Create stream from collection
data = [1, 2, 3, 4, 5]
ctx.from_collection(data) \
    .filter(lambda x: x % 2 == 0) \
    .map(lambda x: x * 2) \
    .sink(lambda x: print(f"Even doubled: {x}"))

ctx.submit("collection_processing")

Architecture

Ray Streaming implements a master-worker architecture using Ray actors:

Core Components

  • StreamingContext: Main entry point for creating and configuring streaming jobs
  • DataStream: Represents a stream of data elements that can be transformed
  • StreamSource: Entry points for data ingestion into the streaming pipeline
  • Operators: Transformation functions (map, filter, reduce, etc.) applied to streams
  • Internal Runtime: Java-based execution engine handling distributed processing, fault tolerance, and resource management

Fault Tolerance

Ray Streaming provides automatic fault tolerance through:

  • Periodic checkpointing of operator state
  • Single-node failover that only restarts failed components
  • Automatic recovery from the last successful checkpoint
  • Message replay from upstream operators during recovery

Capabilities

Streaming Context and Job Management

Main entry point for Ray streaming functionality with job lifecycle management.

class StreamingContext:
    class Builder:
        def option(self, key=None, value=None, conf=None) -> Builder
        def build(self) -> StreamingContext
    
    def source(self, source_func: SourceFunction) -> StreamSource
    def from_values(self, *values) -> StreamSource  
    def from_collection(self, values) -> StreamSource
    def read_text_file(self, filename: str) -> StreamSource
    def submit(self, job_name: str) -> None

→ Streaming Context Documentation

Data Streams and Transformations

Stream transformation operations and data flow management.

class DataStream:
    def map(self, func) -> DataStream
    def flat_map(self, func) -> DataStream
    def filter(self, func) -> DataStream
    def key_by(self, func) -> KeyDataStream
    def union(self, *others) -> DataStream
    def sink(self, func) -> DataStreamSink
    def set_parallelism(self, parallelism: int) -> DataStream

→ Data Streams Documentation

Source Functions and Data Ingestion

Data source implementations and custom source function interfaces.

class SourceFunction:
    def init(self, parallel_id: int, num_parallel: int) -> None
    def fetch(self, collector) -> None
    
class CollectionSourceFunction(SourceFunction):
    def __init__(self, values)
    
class LocalFileSourceFunction(SourceFunction):  
    def __init__(self, filename: str)

→ Source Functions Documentation

Stream Processing Operations

Core stream transformation operators and windowing functions.

# Transformation Operations
DataStream.map(func)                    # One-to-one transformation
DataStream.flat_map(func)               # One-to-many transformation  
DataStream.filter(func)                 # Element filtering
DataStream.union(*others)               # Stream union

# Keyed Operations  
KeyDataStream.reduce(func)              # Stateful reduction
KeyDataStream.window(window_func)       # Windowing operations

→ Stream Operations Documentation

Cross-Language Integration

Support for mixed Python/Java streaming applications.

# Convert to Java stream for Java operators
DataStream.as_java_stream()

# Use Java operators with class names
java_stream.map("com.example.MyMapper")
java_stream.filter("com.example.MyFilter") 

# Convert back to Python stream
java_stream.as_python_stream()

→ Cross-Language Support Documentation

Advanced Features

Custom Source Functions

Create custom data sources by implementing the SourceFunction interface:

from ray.streaming.function import SourceFunction

class CustomSourceFunction(SourceFunction):
    def init(self, parallel_id, num_parallel):
        self.counter = 0
        self.max_count = 1000
    
    def fetch(self, collector):
        if self.counter < self.max_count:
            collector.collect(f"message-{self.counter}")
            self.counter += 1
        else:
            # Signal end of stream
            collector.close()

Configuration Options

Configure streaming jobs with various options:

ctx = StreamingContext.Builder() \
    .option("streaming.worker-num", "4") \
    .option("streaming.context.backend.type", "MEMORY") \
    .option("streaming.checkpoint.interval", "5000") \
    .build()

Error Handling and Monitoring

Ray Streaming integrates with Ray's monitoring and error handling:

  • Built-in metrics collection and reporting
  • Integration with Ray Dashboard for job monitoring
  • Automatic error recovery and notification
  • Configurable retry policies and failure handling

Getting Started

  1. Install Ray: pip install ray
  2. Initialize Ray: ray.init()
  3. Create Context: ctx = StreamingContext.Builder().build()
  4. Build Pipeline: Chain stream operations using fluent API
  5. Submit Job: ctx.submit("job_name")

For detailed examples and API references, see the individual capability documentation pages linked above.