CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ray-streaming

Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API

Pending
Overview
Eval results
Files

source-functions.mddocs/

Source Functions and Data Ingestion

This document covers the source function interfaces and implementations in Ray Streaming, including built-in source functions and patterns for creating custom data sources.

Overview

Source functions are the entry points for data ingestion in Ray Streaming. They provide the mechanism to:

  • Read data from external systems (files, databases, message queues)
  • Generate synthetic data streams
  • Create custom data ingestion patterns
  • Handle parallel data source execution

SourceFunction Interface

The base interface for all source functions in Ray Streaming.

Core API

from ray.streaming.function import SourceFunction, SourceContext

class SourceFunction:
    def init(self, parallel: int, index: int) -> None
    def fetch(self, ctx: SourceContext) -> None
    def close(self) -> None
    
    # Inherited from Function base class
    def open(self, runtime_context) -> None
    def save_checkpoint(self) -> object
    def load_checkpoint(self, checkpoint_obj) -> None

SourceContext Interface

The context interface used by source functions to emit elements.

class SourceContext:
    def collect(self, element) -> None

Built-in Source Functions

Ray Streaming provides several built-in source function implementations.

CollectionSourceFunction

Creates a stream from a Python collection (list, tuple, etc.).

from ray.streaming.function import CollectionSourceFunction

class CollectionSourceFunction(SourceFunction):
    def __init__(self, values)
    def init(self, parallel: int, index: int) -> None
    def fetch(self, ctx: SourceContext) -> None

LocalFileSourceFunction

Reads data from a local text file line by line.

from ray.streaming.function import LocalFileSourceFunction

class LocalFileSourceFunction(SourceFunction):
    def __init__(self, filename: str)
    def init(self, parallel: int, index: int) -> None
    def fetch(self, ctx: SourceContext) -> None

Capabilities

Creating Streams from Collections

Use built-in collection source for in-memory data.

from ray.streaming import StreamingContext
from ray.streaming.function import CollectionSourceFunction

ctx = StreamingContext.Builder().build()

# Using StreamingContext convenience method
stream1 = ctx.from_collection([1, 2, 3, 4, 5])

# Using source function directly
collection_source = CollectionSourceFunction([10, 20, 30, 40, 50])
stream2 = ctx.source(collection_source)

# Process the streams
stream1.map(lambda x: x * 2).sink(print)
stream2.filter(lambda x: x > 25).sink(print)

ctx.submit("collection_job")

Reading from Text Files

Use built-in file source for line-by-line file processing.

from ray.streaming import StreamingContext
from ray.streaming.function import LocalFileSourceFunction

ctx = StreamingContext.Builder().build()

# Using StreamingContext convenience method
stream1 = ctx.read_text_file("input.txt")

# Using source function directly
file_source = LocalFileSourceFunction("data.txt")
stream2 = ctx.source(file_source)

# Process file content
stream1.flat_map(lambda line: line.split()) \
    .map(lambda word: word.lower()) \
    .sink(print)

ctx.submit("file_processing_job")

Custom Source Functions

Create custom source functions by implementing the SourceFunction interface.

from ray.streaming.function import SourceFunction
import time
import random

class NumberGeneratorSource(SourceFunction):
    def init(self, parallel, index):
        """Initialize source function with parallelism info"""
        self.parallel = parallel
        self.index = index
        self.count = 0
        self.max_count = 100
    
    def fetch(self, ctx):
        """Generate and emit elements"""
        while self.count < self.max_count:
            # Generate numbers based on parallel index
            number = (self.count * self.parallel) + self.index
            ctx.collect(number)
            self.count += 1
            time.sleep(0.1)  # Simulate data arrival rate
    
    def close(self):
        """Cleanup resources"""
        print(f"Source {self.index} closing after {self.count} elements")

# Use custom source
ctx = StreamingContext.Builder().build()
stream = ctx.source(NumberGeneratorSource())
stream.map(lambda x: f"Generated: {x}").sink(print)
ctx.submit("number_generator_job")

Advanced Source Patterns

Stateful Source Functions

Maintain state across checkpoint cycles for fault tolerance.

from ray.streaming.function import SourceFunction
import json

class StatefulCounterSource(SourceFunction):
    def init(self, parallel, index):
        self.parallel = parallel
        self.index = index
        self.counter = 0
        self.max_count = 1000
    
    def fetch(self, ctx):
        while self.counter < self.max_count:
            ctx.collect(f"count-{self.counter}-from-{self.index}")
            self.counter += 1
            
            # Simulate processing delay
            if self.counter % 100 == 0:
                time.sleep(0.5)
    
    def save_checkpoint(self):
        """Save current state for fault tolerance"""
        return {
            'counter': self.counter,
            'parallel': self.parallel,
            'index': self.index
        }
    
    def load_checkpoint(self, checkpoint_obj):
        """Restore state from checkpoint"""
        if checkpoint_obj:
            self.counter = checkpoint_obj['counter']
            self.parallel = checkpoint_obj['parallel'] 
            self.index = checkpoint_obj['index']

External System Source

Connect to external systems like databases or message queues.

from ray.streaming.function import SourceFunction
import time

class DatabaseSource(SourceFunction):
    def __init__(self, connection_string, query):
        self.connection_string = connection_string
        self.query = query
        self.connection = None
        self.cursor = None
    
    def init(self, parallel, index):
        self.parallel = parallel
        self.index = index
        
    def open(self, runtime_context):
        """Initialize database connection"""
        # Simulated database connection
        print(f"Connecting to database: {self.connection_string}")
        self.connection = "mock_connection"
        self.cursor = "mock_cursor"
    
    def fetch(self, ctx):
        """Fetch data from database"""
        # Simulated database query results
        mock_results = [
            {"id": i, "name": f"record_{i}", "value": i * 10}
            for i in range(1, 101)
        ]
        
        for record in mock_results:
            ctx.collect(record)
            time.sleep(0.01)  # Simulate query time
    
    def close(self):
        """Clean up database connections"""
        if self.cursor:
            print("Closing cursor")
        if self.connection:
            print("Closing database connection")

# Usage
ctx = StreamingContext.Builder().build()
db_source = DatabaseSource("postgresql://localhost/mydb", "SELECT * FROM users")
stream = ctx.source(db_source)
stream.map(lambda record: f"User: {record['name']}, Value: {record['value']}") \
    .sink(print)
ctx.submit("database_job")

Parallel Source Processing

Handle parallel execution across multiple source instances.

from ray.streaming.function import SourceFunction
import time

class ParallelRangeSource(SourceFunction):
    def __init__(self, start, end):
        self.start = start
        self.end = end
        
    def init(self, parallel, index):
        """Distribute range across parallel instances"""
        self.parallel = parallel
        self.index = index
        
        # Calculate range for this parallel instance
        total_range = self.end - self.start
        range_per_instance = total_range // parallel
        
        self.local_start = self.start + (index * range_per_instance)
        if index == parallel - 1:  # Last instance gets remainder
            self.local_end = self.end
        else:
            self.local_end = self.local_start + range_per_instance
            
        print(f"Instance {index}/{parallel} handling range {self.local_start}-{self.local_end}")
    
    def fetch(self, ctx):
        """Generate numbers in assigned range"""
        for num in range(self.local_start, self.local_end):
            ctx.collect(num)
            if num % 1000 == 0:
                time.sleep(0.1)  # Periodic pause

# Usage with parallelism
ctx = StreamingContext.Builder().build()
parallel_source = ParallelRangeSource(0, 10000)
stream = ctx.source(parallel_source)
stream.set_parallelism(4) \
    .map(lambda x: x ** 2) \
    .sink(lambda x: print(f"Square: {x}"))
ctx.submit("parallel_range_job")

Real-time Data Sources

Streaming Data Simulation

Create continuous data streams that simulate real-time data sources.

from ray.streaming.function import SourceFunction
import time
import random
import json
from datetime import datetime

class SensorDataSource(SourceFunction):
    def __init__(self, sensor_id, measurement_interval=1.0):
        self.sensor_id = sensor_id
        self.measurement_interval = measurement_interval
        
    def init(self, parallel, index):
        self.parallel = parallel
        self.index = index
        # Each parallel instance simulates different sensors
        self.actual_sensor_id = f"{self.sensor_id}_{index}"
        
    def fetch(self, ctx):
        """Generate continuous sensor readings"""
        reading_count = 0
        while True:  # Continuous stream
            timestamp = datetime.now().isoformat()
            temperature = 20 + random.uniform(-5, 15)
            humidity = 50 + random.uniform(-20, 30)
            
            sensor_reading = {
                "sensor_id": self.actual_sensor_id,
                "timestamp": timestamp,
                "temperature": round(temperature, 2),
                "humidity": round(humidity, 2),
                "reading_count": reading_count
            }
            
            ctx.collect(sensor_reading)
            reading_count += 1
            time.sleep(self.measurement_interval)

# Process sensor data stream
ctx = StreamingContext.Builder().build()
sensor_stream = ctx.source(SensorDataSource("temp_sensor", 0.5))
sensor_stream.set_parallelism(3) \
    .filter(lambda reading: reading["temperature"] > 25) \
    .map(lambda reading: f"HIGH TEMP: {reading['sensor_id']} - {reading['temperature']}°C") \
    .sink(print)
ctx.submit("sensor_monitoring_job")

Error Handling and Resilience

Robust Source Implementation

Handle errors gracefully in source functions.

from ray.streaming.function import SourceFunction
import time
import random

class ResilientSource(SourceFunction):
    def __init__(self, failure_rate=0.1):
        self.failure_rate = failure_rate
        self.retry_count = 0
        self.max_retries = 3
        
    def init(self, parallel, index):
        self.parallel = parallel
        self.index = index
        self.processed_count = 0
        
    def fetch(self, ctx):
        while self.processed_count < 1000:
            try:
                # Simulate potential failures
                if random.random() < self.failure_rate:
                    raise Exception(f"Simulated failure at count {self.processed_count}")
                
                # Normal processing
                data = f"data-{self.processed_count}-{self.index}"
                ctx.collect(data)
                self.processed_count += 1
                self.retry_count = 0  # Reset retry count on success
                
            except Exception as e:
                self.retry_count += 1
                if self.retry_count <= self.max_retries:
                    print(f"Source {self.index} error: {e}, retry {self.retry_count}/{self.max_retries}")
                    time.sleep(1 * self.retry_count)  # Exponential backoff
                else:
                    print(f"Source {self.index} failed permanently after {self.max_retries} retries")
                    raise e
                    
            time.sleep(0.1)

# Usage
ctx = StreamingContext.Builder().build()
resilient_stream = ctx.source(ResilientSource(failure_rate=0.05))
resilient_stream.map(lambda x: f"Processed: {x}").sink(print)
ctx.submit("resilient_job")

Best Practices

Source Function Guidelines

  1. Initialization: Use init() for parallel-aware setup, open() for resource initialization
  2. Resource Management: Always clean up resources in close() method
  3. Checkpointing: Implement checkpoint methods for stateful sources
  4. Error Handling: Handle failures gracefully with retry logic
  5. Parallelism: Design sources to work correctly with parallel execution
  6. Performance: Consider data generation rate and memory usage

Configuration and Tuning

# Configure source parallelism and performance
ctx = StreamingContext.Builder() \
    .option("streaming.source.parallelism", "4") \
    .option("streaming.checkpoint.interval", "10000") \
    .build()

source_stream = ctx.source(MyCustomSource()) \
    .set_parallelism(4) \
    .with_config("streaming.buffer.size", "2048")

See Also

  • Streaming Context Documentation - StreamingContext and job management
  • Data Streams Documentation - Stream transformation operations
  • Stream Operations Documentation - Available stream transformations
  • Cross-Language Support Documentation - Python/Java integration details

Install with Tessl CLI

npx tessl i tessl/pypi-ray-streaming

docs

cross-language.md

data-streams.md

index.md

source-functions.md

stream-operations.md

streaming-context.md

tile.json