CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sources.mddocs/

Input Sources

Interfaces and implementations for reading data from external systems. Sources provide the entry point for data into Bytewax dataflows, with support for various input patterns including polling, streaming, and batch processing.

Capabilities

Source Base Classes

Abstract base classes that define the source interface for different input patterns.

class Source[X]: ...

class StatefulSourcePartition[X, S]:
    def next_batch(self) -> Iterable[X]: ...
    def next_awake(self) -> Optional[datetime]: ...
    def snapshot(self) -> S: ...
    def close(self) -> None: ...

class StatelessSourcePartition[X]:
    def next_batch(self) -> Iterable[X]: ...
    def next_awake(self) -> Optional[datetime]: ...
    def close(self) -> None: ...

StatefulSourcePartition Methods:

  • next_batch(): Return next batch of items (empty if none available)
  • next_awake(): Return when to next call next_batch (None for immediate)
  • snapshot(): Return state for recovery (must be pickle-able)
  • close(): Clean up resources when dataflow completes

StatelessSourcePartition Methods:

  • next_batch(): Return next batch of items
  • next_awake(): Return when to next call next_batch
  • close(): Clean up resources

Source Implementations

Concrete source types for different partitioning strategies.

class FixedPartitionedSource[X, S](Source[X]):
    def list_parts(self) -> List[str]: ...
    def build_part(self, step_id: str, for_part: str, resume_state: Optional[S]) -> StatefulSourcePartition[X, S]: ...

class DynamicSource[X](Source[X]):
    def build(self, step_id: str, worker_index: int, worker_count: int) -> StatelessSourcePartition[X]: ...

class SimplePollingSource[X](DynamicSource[X]):
    def __init__(self, getter: Callable[[], Iterable[X]], interval: timedelta = timedelta(seconds=1)): ...

FixedPartitionedSource Methods:

  • list_parts(): Return list of partition identifiers
  • build_part(): Create partition handler for specific partition

DynamicSource Methods:

  • build(): Create source partition for specific worker

SimplePollingSource Parameters:

  • getter (Callable): Function that returns iterable of items
  • interval (timedelta): Polling interval

Usage Examples:

from datetime import timedelta

# Simple polling source
def fetch_data():
    # Poll external API or database
    return api_client.get_latest_events()

polling_source = SimplePollingSource(fetch_data, interval=timedelta(seconds=5))

# Custom stateful source
class DatabaseSource(FixedPartitionedSource):
    def __init__(self, connection_string, table_name):
        self.connection_string = connection_string
        self.table_name = table_name
    
    def list_parts(self):
        # Return partition identifiers (e.g., database shards)
        return ["shard_0", "shard_1", "shard_2"]
    
    def build_part(self, step_id, for_part, resume_state):
        return DatabasePartition(self.connection_string, self.table_name, for_part, resume_state)

Helper Functions

Utility functions for working with sources and batching data.

def batch(it: Iterable[X], size: int) -> Iterator[List[X]]: ...

def batch_async(ait: AsyncIterable[X], size: int) -> AsyncIterator[List[X]]: ...

def batch_getter(getter: Callable[[], Iterable[X]], size: int) -> Callable[[], List[X]]: ...

def batch_getter_ex(getter: Callable[[], Iterable[X]], size: int) -> Callable[[], List[X]]: ...

batch Parameters:

  • it (Iterable[X]): Input iterable to batch
  • size (int): Maximum batch size

batch_getter Parameters:

  • getter (Callable): Function returning iterable
  • size (int): Maximum batch size

Usage Examples:

# Batch an iterable
for batch in batch([1, 2, 3, 4, 5], 2):
    print(batch)  # [1, 2], [3, 4], [5]

# Batch async iterable
async for batch in batch_async(async_generator(), 10):
    process_batch(batch)

# Batch a getter function
batched_getter = batch_getter(lambda: fetch_items(), 100)

Exception Handling

Exceptions that can be raised by sources for testing and error handling.

class AbortExecution(RuntimeError):
    """Raise this from next_batch to abort for testing purposes."""
    ...

Usage Example:

class TestSource(StatelessSourcePartition):
    def __init__(self, data, fail_after=None):
        self.data = list(data)
        self.index = 0
        self.fail_after = fail_after
    
    def next_batch(self):
        if self.fail_after and self.index >= self.fail_after:
            raise AbortExecution("Test failure")
        
        if self.index < len(self.data):
            item = self.data[self.index]
            self.index += 1
            return [item]
        return []

Source Implementation Patterns

Polling Pattern:

class APIPollingSource(DynamicSource):
    def __init__(self, api_endpoint, poll_interval=timedelta(seconds=10)):
        self.api_endpoint = api_endpoint
        self.poll_interval = poll_interval
    
    def build(self, step_id, worker_index, worker_count):
        return APIPartition(self.api_endpoint, self.poll_interval)

class APIPartition(StatelessSourcePartition):
    def __init__(self, endpoint, interval):
        self.endpoint = endpoint
        self.interval = interval
        self.last_fetch = None
    
    def next_batch(self):
        # Fetch data from API
        response = requests.get(self.endpoint)
        if response.ok:
            return response.json().get('items', [])
        return []
    
    def next_awake(self):
        if self.last_fetch:
            return self.last_fetch + self.interval
        return None

File Reading Pattern:

class FileSource(FixedPartitionedSource):
    def __init__(self, file_paths):
        self.file_paths = file_paths
    
    def list_parts(self):
        return [str(i) for i in range(len(self.file_paths))]
    
    def build_part(self, step_id, for_part, resume_state):
        file_index = int(for_part)
        file_path = self.file_paths[file_index]
        return FilePartition(file_path, resume_state)

class FilePartition(StatefulSourcePartition):
    def __init__(self, file_path, resume_state):
        self.file_path = file_path
        self.position = resume_state or 0
        self.file_handle = None
    
    def next_batch(self):
        if not self.file_handle:
            self.file_handle = open(self.file_path, 'r')
            self.file_handle.seek(self.position)
        
        lines = []
        for _ in range(100):  # Read up to 100 lines per batch
            line = self.file_handle.readline()
            if not line:  # End of file
                raise StopIteration
            lines.append(line.strip())
            self.position = self.file_handle.tell()
        
        return lines
    
    def snapshot(self):
        return self.position
    
    def close(self):
        if self.file_handle:
            self.file_handle.close()

Queue-based Pattern:

import queue
from threading import Thread

class QueueSource(DynamicSource):
    def __init__(self, queue_size=1000):
        self.queue = queue.Queue(maxsize=queue_size)
        self.producer_thread = None
    
    def build(self, step_id, worker_index, worker_count):
        if not self.producer_thread:
            self.producer_thread = Thread(target=self._producer)
            self.producer_thread.start()
        return QueuePartition(self.queue)
    
    def _producer(self):
        # Background thread that feeds the queue
        while True:
            data = external_data_source.fetch()
            try:
                self.queue.put(data, timeout=1)
            except queue.Full:
                pass  # Drop data if queue is full

class QueuePartition(StatelessSourcePartition):
    def __init__(self, queue):
        self.queue = queue
    
    def next_batch(self):
        items = []
        try:
            # Get up to 10 items without blocking
            for _ in range(10):
                item = self.queue.get_nowait()
                items.append(item)
        except queue.Empty:
            pass
        return items
    
    def next_awake(self):
        # Check queue again in 100ms if empty
        return datetime.now() + timedelta(milliseconds=100) if not items else None

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json