Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Interfaces and implementations for reading data from external systems. Sources provide the entry point for data into Bytewax dataflows, with support for various input patterns including polling, streaming, and batch processing.
Abstract base classes that define the source interface for different input patterns.
class Source[X]: ...
class StatefulSourcePartition[X, S]:
def next_batch(self) -> Iterable[X]: ...
def next_awake(self) -> Optional[datetime]: ...
def snapshot(self) -> S: ...
def close(self) -> None: ...
class StatelessSourcePartition[X]:
def next_batch(self) -> Iterable[X]: ...
def next_awake(self) -> Optional[datetime]: ...
def close(self) -> None: ...StatefulSourcePartition Methods:
next_batch(): Return next batch of items (empty if none available)next_awake(): Return when to next call next_batch (None for immediate)snapshot(): Return state for recovery (must be pickle-able)close(): Clean up resources when dataflow completesStatelessSourcePartition Methods:
next_batch(): Return next batch of itemsnext_awake(): Return when to next call next_batchclose(): Clean up resourcesConcrete source types for different partitioning strategies.
class FixedPartitionedSource[X, S](Source[X]):
def list_parts(self) -> List[str]: ...
def build_part(self, step_id: str, for_part: str, resume_state: Optional[S]) -> StatefulSourcePartition[X, S]: ...
class DynamicSource[X](Source[X]):
def build(self, step_id: str, worker_index: int, worker_count: int) -> StatelessSourcePartition[X]: ...
class SimplePollingSource[X](DynamicSource[X]):
def __init__(self, getter: Callable[[], Iterable[X]], interval: timedelta = timedelta(seconds=1)): ...FixedPartitionedSource Methods:
list_parts(): Return list of partition identifiersbuild_part(): Create partition handler for specific partitionDynamicSource Methods:
build(): Create source partition for specific workerSimplePollingSource Parameters:
getter (Callable): Function that returns iterable of itemsinterval (timedelta): Polling intervalUsage Examples:
from datetime import timedelta
# Simple polling source
def fetch_data():
# Poll external API or database
return api_client.get_latest_events()
polling_source = SimplePollingSource(fetch_data, interval=timedelta(seconds=5))
# Custom stateful source
class DatabaseSource(FixedPartitionedSource):
def __init__(self, connection_string, table_name):
self.connection_string = connection_string
self.table_name = table_name
def list_parts(self):
# Return partition identifiers (e.g., database shards)
return ["shard_0", "shard_1", "shard_2"]
def build_part(self, step_id, for_part, resume_state):
return DatabasePartition(self.connection_string, self.table_name, for_part, resume_state)Utility functions for working with sources and batching data.
def batch(it: Iterable[X], size: int) -> Iterator[List[X]]: ...
def batch_async(ait: AsyncIterable[X], size: int) -> AsyncIterator[List[X]]: ...
def batch_getter(getter: Callable[[], Iterable[X]], size: int) -> Callable[[], List[X]]: ...
def batch_getter_ex(getter: Callable[[], Iterable[X]], size: int) -> Callable[[], List[X]]: ...batch Parameters:
it (Iterable[X]): Input iterable to batchsize (int): Maximum batch sizebatch_getter Parameters:
getter (Callable): Function returning iterablesize (int): Maximum batch sizeUsage Examples:
# Batch an iterable
for batch in batch([1, 2, 3, 4, 5], 2):
print(batch) # [1, 2], [3, 4], [5]
# Batch async iterable
async for batch in batch_async(async_generator(), 10):
process_batch(batch)
# Batch a getter function
batched_getter = batch_getter(lambda: fetch_items(), 100)Exceptions that can be raised by sources for testing and error handling.
class AbortExecution(RuntimeError):
"""Raise this from next_batch to abort for testing purposes."""
...Usage Example:
class TestSource(StatelessSourcePartition):
def __init__(self, data, fail_after=None):
self.data = list(data)
self.index = 0
self.fail_after = fail_after
def next_batch(self):
if self.fail_after and self.index >= self.fail_after:
raise AbortExecution("Test failure")
if self.index < len(self.data):
item = self.data[self.index]
self.index += 1
return [item]
return []Polling Pattern:
class APIPollingSource(DynamicSource):
def __init__(self, api_endpoint, poll_interval=timedelta(seconds=10)):
self.api_endpoint = api_endpoint
self.poll_interval = poll_interval
def build(self, step_id, worker_index, worker_count):
return APIPartition(self.api_endpoint, self.poll_interval)
class APIPartition(StatelessSourcePartition):
def __init__(self, endpoint, interval):
self.endpoint = endpoint
self.interval = interval
self.last_fetch = None
def next_batch(self):
# Fetch data from API
response = requests.get(self.endpoint)
if response.ok:
return response.json().get('items', [])
return []
def next_awake(self):
if self.last_fetch:
return self.last_fetch + self.interval
return NoneFile Reading Pattern:
class FileSource(FixedPartitionedSource):
def __init__(self, file_paths):
self.file_paths = file_paths
def list_parts(self):
return [str(i) for i in range(len(self.file_paths))]
def build_part(self, step_id, for_part, resume_state):
file_index = int(for_part)
file_path = self.file_paths[file_index]
return FilePartition(file_path, resume_state)
class FilePartition(StatefulSourcePartition):
def __init__(self, file_path, resume_state):
self.file_path = file_path
self.position = resume_state or 0
self.file_handle = None
def next_batch(self):
if not self.file_handle:
self.file_handle = open(self.file_path, 'r')
self.file_handle.seek(self.position)
lines = []
for _ in range(100): # Read up to 100 lines per batch
line = self.file_handle.readline()
if not line: # End of file
raise StopIteration
lines.append(line.strip())
self.position = self.file_handle.tell()
return lines
def snapshot(self):
return self.position
def close(self):
if self.file_handle:
self.file_handle.close()Queue-based Pattern:
import queue
from threading import Thread
class QueueSource(DynamicSource):
def __init__(self, queue_size=1000):
self.queue = queue.Queue(maxsize=queue_size)
self.producer_thread = None
def build(self, step_id, worker_index, worker_count):
if not self.producer_thread:
self.producer_thread = Thread(target=self._producer)
self.producer_thread.start()
return QueuePartition(self.queue)
def _producer(self):
# Background thread that feeds the queue
while True:
data = external_data_source.fetch()
try:
self.queue.put(data, timeout=1)
except queue.Full:
pass # Drop data if queue is full
class QueuePartition(StatelessSourcePartition):
def __init__(self, queue):
self.queue = queue
def next_batch(self):
items = []
try:
# Get up to 10 items without blocking
for _ in range(10):
item = self.queue.get_nowait()
items.append(item)
except queue.Empty:
pass
return items
def next_awake(self):
# Check queue again in 100ms if empty
return datetime.now() + timedelta(milliseconds=100) if not items else NoneInstall with Tessl CLI
npx tessl i tessl/pypi-bytewax