CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ray-streaming

Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API

Pending
Overview
Eval results
Files

stream-operations.mddocs/

Stream Processing Operations

This document covers the detailed stream transformation operations available in Ray Streaming, including transformation functions, partitioning strategies, and advanced stream processing patterns.

Overview

Ray Streaming provides a comprehensive set of stream transformation operations:

  • Element Transformations: map, flat_map, filter
  • Keyed Operations: key_by, reduce
  • Stream Composition: union
  • Partitioning: broadcast, partition_by, forward
  • Output Operations: sink
  • Function Interfaces: MapFunction, FilterFunction, ReduceFunction, etc.

Function Interfaces

Ray Streaming defines function interfaces for type-safe and extensible stream processing.

Core Function Interface

from ray.streaming.function import Function

class Function:
    def open(self, runtime_context) -> None
    def close(self) -> None
    def save_checkpoint(self) -> object
    def load_checkpoint(self, checkpoint_obj) -> None

Transformation Function Interfaces

# Map function interface
class MapFunction(Function):
    def map(self, value) -> object

# FlatMap function interface  
class FlatMapFunction(Function):
    def flat_map(self, value, collector) -> None

# Filter function interface
class FilterFunction(Function):
    def filter(self, value) -> bool

# Key extraction function interface
class KeyFunction(Function):
    def key_by(self, value) -> object

# Reduce function interface
class ReduceFunction(Function):
    def reduce(self, old_value, new_value) -> object

# Sink function interface
class SinkFunction(Function):
    def sink(self, value) -> None

Element Transformations

Map Operation

Transform each element in the stream using a one-to-one function.

def map(self, func) -> DataStream:
    """
    Apply a function to each element in the stream.
    
    Args:
        func: Function or MapFunction instance for transformation
        
    Returns:
        New DataStream with transformed elements
    """

Usage Examples

from ray.streaming import StreamingContext
from ray.streaming.function import MapFunction

# Using lambda function
ctx = StreamingContext.Builder().build()
ctx.from_collection([1, 2, 3, 4, 5]) \
    .map(lambda x: x * 2) \
    .sink(lambda x: print(f"Doubled: {x}"))

# Using custom MapFunction
class SquareMapFunction(MapFunction):
    def map(self, value):
        return value ** 2

ctx.from_collection([1, 2, 3, 4, 5]) \
    .map(SquareMapFunction()) \
    .sink(lambda x: print(f"Squared: {x}"))

# Complex transformation
ctx.from_collection(["hello", "world", "ray", "streaming"]) \
    .map(lambda word: {"word": word, "length": len(word), "upper": word.upper()}) \
    .sink(lambda obj: print(f"Word: {obj['word']}, Length: {obj['length']}"))

ctx.submit("map_operations")

FlatMap Operation

Transform each element into zero or more output elements.

def flat_map(self, func) -> DataStream:
    """
    Transform each element into multiple output elements.
    
    Args:
        func: Function or FlatMapFunction that returns iterable
        
    Returns:
        New DataStream with flattened results
    """

Usage Examples

from ray.streaming.function import FlatMapFunction

# Split sentences into words
ctx.from_values("hello world", "ray streaming", "distributed computing") \
    .flat_map(lambda sentence: sentence.split()) \
    .map(lambda word: word.upper()) \
    .sink(lambda word: print(f"Word: {word}"))

# Using custom FlatMapFunction
class TokenizeFlatMapFunction(FlatMapFunction):
    def flat_map(self, value, collector):
        words = value.split()
        for word in words:
            if len(word) > 3:  # Only emit words longer than 3 characters
                collector.collect(word.lower())

ctx.from_values("The quick brown fox jumps over the lazy dog") \
    .flat_map(TokenizeFlatMapFunction()) \
    .sink(print)

# Generate multiple outputs per input
ctx.from_collection([1, 2, 3]) \
    .flat_map(lambda x: [x, x*2, x*3]) \
    .sink(lambda x: print(f"Generated: {x}"))

ctx.submit("flatmap_operations")

Filter Operation

Keep only elements that satisfy a predicate condition.

def filter(self, func) -> DataStream:
    """
    Filter elements based on a predicate function.
    
    Args:
        func: Function or FilterFunction returning boolean
        
    Returns:
        New DataStream with filtered elements
    """

Usage Examples

from ray.streaming.function import FilterFunction

# Simple filtering
ctx.from_collection(range(10)) \
    .filter(lambda x: x % 2 == 0) \
    .sink(lambda x: print(f"Even: {x}"))

# Using custom FilterFunction
class PositiveFilterFunction(FilterFunction):
    def filter(self, value):
        return value > 0

ctx.from_collection([-3, -1, 0, 1, 5, -2, 8]) \
    .filter(PositiveFilterFunction()) \
    .sink(lambda x: print(f"Positive: {x}"))

# Complex filtering with string operations
ctx.from_values("apple", "banana", "cherry", "date", "elderberry") \
    .filter(lambda fruit: len(fruit) > 5 and 'e' in fruit) \
    .sink(lambda fruit: print(f"Long fruit with 'e': {fruit}"))

ctx.submit("filter_operations")

Keyed Operations

Key-By Operation

Partition stream by key for stateful operations.

def key_by(self, func) -> KeyDataStream:
    """
    Partition stream by key extracted using the provided function.
    
    Args:
        func: Function or KeyFunction to extract key from elements
        
    Returns:
        KeyDataStream partitioned by the key function
    """

Usage Examples

from ray.streaming.function import KeyFunction

# Group by key for word counting
ctx.from_values("apple", "banana", "apple", "cherry", "banana", "apple") \
    .map(lambda word: (word, 1)) \
    .key_by(lambda pair: pair[0]) \
    .reduce(lambda old, new: (old[0], old[1] + new[1])) \
    .sink(lambda result: print(f"Count: {result[0]} = {result[1]}"))

# Using custom KeyFunction
class CategoryKeyFunction(KeyFunction):
    def key_by(self, value):
        # Group items by their category
        categories = {
            'fruits': ['apple', 'banana', 'cherry'],
            'vegetables': ['carrot', 'broccoli', 'spinach'],
            'grains': ['rice', 'wheat', 'oats']
        }
        for category, items in categories.items():
            if value in items:
                return category
        return 'other'

ctx.from_values("apple", "carrot", "banana", "rice", "broccoli") \
    .key_by(CategoryKeyFunction()) \
    .reduce(lambda old, new: f"{old},{new}" if isinstance(old, str) else f"{old},{new}") \
    .sink(lambda result: print(f"Category items: {result}"))

ctx.submit("keyed_operations")

Reduce Operation

Combine elements with the same key using a reduce function.

def reduce(self, func) -> DataStream:
    """
    Reduce elements with the same key using the provided function.
    
    Args:
        func: Function or ReduceFunction for combining values
        
    Returns:
        DataStream with reduced values per key
    """

Usage Examples

from ray.streaming.function import ReduceFunction

# Sum values by key
ctx.from_values(("A", 10), ("B", 5), ("A", 15), ("B", 20), ("A", 8)) \
    .key_by(lambda pair: pair[0]) \
    .reduce(lambda old, new: (old[0], old[1] + new[1])) \
    .sink(lambda result: print(f"Sum for {result[0]}: {result[1]}"))

# Using custom ReduceFunction
class MaxReduceFunction(ReduceFunction):
    def reduce(self, old_value, new_value):
        return max(old_value, new_value, key=lambda x: x[1])

ctx.from_values(("user1", 100), ("user2", 85), ("user1", 120), ("user2", 95)) \
    .key_by(lambda pair: pair[0]) \
    .reduce(MaxReduceFunction()) \
    .sink(lambda result: print(f"Max score for {result[0]}: {result[1]}"))

# String aggregation
ctx.from_values(("group1", "a"), ("group2", "x"), ("group1", "b"), ("group2", "y")) \
    .key_by(lambda pair: pair[0]) \
    .reduce(lambda old, new: (old[0], old[1] + new[1])) \
    .sink(lambda result: print(f"Concatenated {result[0]}: {result[1]}"))

ctx.submit("reduce_operations")

Stream Composition

Union Operation

Merge multiple streams of the same type.

def union(self, *streams) -> UnionStream:
    """
    Union this stream with other streams.
    
    Args:
        *streams: DataStreams to union with this stream
        
    Returns:
        UnionStream containing elements from all input streams
    """

Usage Examples

# Union multiple data sources
stream1 = ctx.from_values("source1-a", "source1-b", "source1-c")
stream2 = ctx.from_values("source2-x", "source2-y", "source2-z") 
stream3 = ctx.from_values("source3-1", "source3-2", "source3-3")

# Union all streams
unified = stream1.union(stream2, stream3) \
    .map(lambda x: f"Unified: {x}") \
    .sink(print)

# Union with different processing branches
numbers = ctx.from_collection(range(20))
evens = numbers.filter(lambda x: x % 2 == 0).map(lambda x: f"Even: {x}")
odds = numbers.filter(lambda x: x % 2 == 1).map(lambda x: f"Odd: {x}")

evens.union(odds).sink(print)

ctx.submit("union_operations")

Partitioning Strategies

Broadcast Partitioning

Send all elements to every parallel instance of the next operator.

def broadcast(self) -> DataStream:
    """
    Broadcast all elements to every parallel instance.
    
    Returns:
        DataStream with broadcast partitioning
    """

Usage Examples

# Broadcast configuration data to all workers
config_stream = ctx.from_values("config1", "config2", "config3") \
    .broadcast() \
    .map(lambda config: f"All workers got: {config}") \
    .sink(print)

# Broadcast lookup table
lookup_data = ctx.from_values(("key1", "value1"), ("key2", "value2")) \
    .broadcast() \
    .map(lambda pair: f"Lookup: {pair[0]} -> {pair[1]}") \
    .sink(print)

ctx.submit("broadcast_operations")

Custom Partitioning

Use custom partitioning logic to control data distribution.

def partition_by(self, partition_func) -> DataStream:
    """
    Partition stream using custom partitioning function.
    
    Args:
        partition_func: Function or Partition instance for custom partitioning
        
    Returns:
        DataStream with custom partitioning
    """

Usage Examples

from ray.streaming.partition import Partition

# Hash-based partitioning
def hash_partition(element):
    return hash(str(element)) % 4

ctx.from_collection(range(20)) \
    .partition_by(hash_partition) \
    .map(lambda x: f"Partitioned: {x}") \
    .sink(print)

# Custom partitioning class
class RegionPartition(Partition):
    def partition(self, record, num_partitions):
        region_map = {"US": 0, "EU": 1, "ASIA": 2}
        region = record.get("region", "OTHER")
        return region_map.get(region, 3) % num_partitions

regions_data = [
    {"id": 1, "region": "US", "data": "user1"},
    {"id": 2, "region": "EU", "data": "user2"}, 
    {"id": 3, "region": "ASIA", "data": "user3"}
]

ctx.from_collection(regions_data) \
    .partition_by(RegionPartition()) \
    .map(lambda record: f"Region {record['region']}: {record['data']}") \
    .sink(print)

ctx.submit("partition_operations")

Forward Partitioning

Forward elements locally to the next operator.

def forward(self) -> DataStream:
    """
    Forward elements locally to avoid network transfer.
    
    Returns:
        DataStream with forward partitioning
    """

Output Operations

Sink Operation

Define output behavior for stream processing results.

def sink(self, func) -> StreamSink:
    """
    Create a sink for the stream.
    
    Args:
        func: Function or SinkFunction for handling output
        
    Returns:
        StreamSink representing the output operation
    """

Usage Examples

from ray.streaming.function import SinkFunction

# Simple sink with lambda
ctx.from_collection([1, 2, 3, 4, 5]) \
    .map(lambda x: x ** 2) \
    .sink(lambda x: print(f"Result: {x}"))

# Custom SinkFunction
class FileSinkFunction(SinkFunction):
    def __init__(self, filename):
        self.filename = filename
        self.file = None
        
    def open(self, runtime_context):
        self.file = open(self.filename, 'w')
        
    def sink(self, value):
        self.file.write(f"{value}\n")
        self.file.flush()
        
    def close(self):
        if self.file:
            self.file.close()

ctx.from_values("line1", "line2", "line3") \
    .sink(FileSinkFunction("output.txt"))

# Database sink simulation
class DatabaseSinkFunction(SinkFunction):
    def open(self, runtime_context):
        print("Connecting to database...")
        self.connection = "mock_db_connection"
        
    def sink(self, value):
        print(f"INSERT INTO results VALUES ('{value}')")
        
    def close(self):
        print("Closing database connection")

ctx.from_collection(range(5)) \
    .map(lambda x: f"record_{x}") \
    .sink(DatabaseSinkFunction())

ctx.submit("sink_operations")

Advanced Stream Patterns

Windowing Simulation

Although Ray Streaming doesn't have built-in windowing, you can simulate time-based processing.

import time
from datetime import datetime

# Time-based processing simulation
def timestamped_data():
    for i in range(10):
        yield {"timestamp": datetime.now().isoformat(), "value": i}
        time.sleep(0.5)

class TimestampedSource(SourceFunction):
    def fetch(self, ctx):
        for data in timestamped_data():
            ctx.collect(data)

ctx.from_source(TimestampedSource()) \
    .map(lambda record: f"Time: {record['timestamp']}, Value: {record['value']}") \
    .sink(print)

Multi-Stream Processing

Process multiple streams with different operations.

# Create multiple processing branches
source = ctx.from_collection(range(100))

# Branch 1: Even numbers processing
evens = source.filter(lambda x: x % 2 == 0) \
    .map(lambda x: f"Even: {x}")

# Branch 2: Odd numbers processing  
odds = source.filter(lambda x: x % 2 == 1) \
    .map(lambda x: f"Odd: {x}")

# Branch 3: Multiples of 5
fives = source.filter(lambda x: x % 5 == 0) \
    .map(lambda x: f"Multiple of 5: {x}")

# Combine all branches
evens.union(odds, fives) \
    .sink(print)

Performance Considerations

Operator Chaining

Control operator chaining for performance optimization.

# Disable chaining for better parallelism
stream = ctx.from_collection(large_dataset) \
    .disable_chain() \
    .map(expensive_transformation) \
    .disable_chain() \
    .filter(complex_predicate)

# Use forward for local processing
local_stream = ctx.from_collection(data) \
    .forward() \
    .map(local_transformation)

Parallelism Configuration

Configure parallelism for different operations.

# Different parallelism for different operations
ctx.from_collection(data) \
    .set_parallelism(8) \
    .map(cpu_intensive_function) \
    .set_parallelism(4) \
    .reduce(reduction_function) \
    .set_parallelism(1) \
    .sink(output_function)

See Also

  • Data Streams Documentation - Stream classes and basic transformations
  • Source Functions Documentation - Custom data source implementation
  • Streaming Context Documentation - StreamingContext and job management
  • Cross-Language Support Documentation - Python/Java integration details

Install with Tessl CLI

npx tessl i tessl/pypi-ray-streaming

docs

cross-language.md

data-streams.md

index.md

source-functions.md

stream-operations.md

streaming-context.md

tile.json