Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Interfaces and implementations for writing data to external systems. Sinks provide the exit point for processed data from Bytewax dataflows, with support for exactly-once processing guarantees through state management and recovery.
Abstract base classes that define the sink interface for different output patterns.
class Sink[X]: ...
class StatefulSinkPartition[X, S]:
def write_batch(self, values: List[X]) -> None: ...
def snapshot(self) -> S: ...
def close(self) -> None: ...
class StatelessSinkPartition[X]:
def write_batch(self, values: List[X]) -> None: ...
def close(self) -> None: ...StatefulSinkPartition Methods:
write_batch(values): Write a batch of values to the sinksnapshot(): Return state for recovery (must be pickle-able and immutable)close(): Clean up resources when dataflow completesStatelessSinkPartition Methods:
write_batch(values): Write a batch of values to the sinkclose(): Clean up resourcesConcrete sink types for different partitioning and delivery strategies.
class FixedPartitionedSink[X, S](Sink[Tuple[str, X]]):
def list_parts(self) -> List[str]: ...
def part_fn(self, item_key: str) -> int: ...
def build_part(self, step_id: str, for_part: str, resume_state: Optional[S]) -> StatefulSinkPartition[X, S]: ...
class DynamicSink[X](Sink[X]):
def build(self, step_id: str, worker_index: int, worker_count: int) -> StatelessSinkPartition[X]: ...FixedPartitionedSink Methods:
list_parts(): Return list of partition identifiers available to this workerpart_fn(item_key): Map item key to partition index for routingbuild_part(): Create partition handler for specific partitionDynamicSink Methods:
build(): Create sink partition for specific workerUsage Examples:
# Custom database sink with partitioning
class DatabaseSink(FixedPartitionedSink):
def __init__(self, connection_string, table_name):
self.connection_string = connection_string
self.table_name = table_name
def list_parts(self):
# Return partitions this worker can write to
return ["partition_0", "partition_1"]
def part_fn(self, item_key):
# Route items by key hash
return hash(item_key) % 2
def build_part(self, step_id, for_part, resume_state):
return DatabasePartition(
self.connection_string,
self.table_name,
for_part,
resume_state
)
# Simple dynamic sink
class FileSink(DynamicSink):
def __init__(self, file_path):
self.file_path = file_path
def build(self, step_id, worker_index, worker_count):
worker_file = f"{self.file_path}.worker_{worker_index}"
return FilePartition(worker_file)File Writing with State:
class FilePartition(StatefulSinkPartition):
def __init__(self, file_path, resume_state=None):
self.file_path = file_path
self.bytes_written = resume_state or 0
self.file_handle = None
self._open_file()
def _open_file(self):
# Open file and seek to resume position
self.file_handle = open(self.file_path, 'a')
if self.bytes_written > 0:
# Verify file position matches state
current_size = os.path.getsize(self.file_path)
if current_size != self.bytes_written:
raise ValueError(f"File size mismatch: expected {self.bytes_written}, got {current_size}")
def write_batch(self, values):
for value in values:
line = json.dumps(value) + '\n'
bytes_to_write = len(line.encode('utf-8'))
self.file_handle.write(line)
self.bytes_written += bytes_to_write
self.file_handle.flush()
os.fsync(self.file_handle.fileno()) # Ensure durability
def snapshot(self):
return self.bytes_written
def close(self):
if self.file_handle:
self.file_handle.close()Database Writing with Transactions:
class DatabasePartition(StatefulSinkPartition):
def __init__(self, connection_string, table_name, partition_id, resume_state=None):
self.connection_string = connection_string
self.table_name = table_name
self.partition_id = partition_id
self.last_offset = resume_state or 0
self.connection = None
self._connect()
def _connect(self):
self.connection = psycopg2.connect(self.connection_string)
self.connection.autocommit = False
def write_batch(self, values):
cursor = self.connection.cursor()
try:
for i, value in enumerate(values):
current_offset = self.last_offset + i + 1
# Use offset as idempotency key
cursor.execute("""
INSERT INTO {} (partition_id, offset, data)
VALUES (%s, %s, %s)
ON CONFLICT (partition_id, offset) DO NOTHING
""".format(self.table_name),
(self.partition_id, current_offset, json.dumps(value)))
self.connection.commit()
self.last_offset += len(values)
except Exception as e:
self.connection.rollback()
raise e
finally:
cursor.close()
def snapshot(self):
return self.last_offset
def close(self):
if self.connection:
self.connection.close()HTTP API Sink with Retry:
import time
import requests
from typing import List
class APIPartition(StatelessSinkPartition):
def __init__(self, endpoint, api_key, max_retries=3):
self.endpoint = endpoint
self.headers = {"Authorization": f"Bearer {api_key}"}
self.max_retries = max_retries
def write_batch(self, values):
payload = {"events": values}
for attempt in range(self.max_retries):
try:
response = requests.post(
self.endpoint,
json=payload,
headers=self.headers,
timeout=30
)
if response.status_code == 200:
return # Success
elif response.status_code == 429: # Rate limited
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
response.raise_for_status()
except requests.RequestException as e:
if attempt == self.max_retries - 1:
raise e
time.sleep(2 ** attempt)
def close(self):
# No resources to clean up
passBatch Accumulation Pattern:
class BatchingSink(StatefulSinkPartition):
def __init__(self, target_sink, batch_size=1000, flush_interval=timedelta(seconds=30)):
self.target_sink = target_sink
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = []
self.last_flush = datetime.now()
def write_batch(self, values):
self.buffer.extend(values)
# Flush if buffer is full or enough time has passed
now = datetime.now()
should_flush = (
len(self.buffer) >= self.batch_size or
now - self.last_flush >= self.flush_interval
)
if should_flush and self.buffer:
self.target_sink.write_batch(self.buffer)
self.buffer.clear()
self.last_flush = now
def snapshot(self):
# Include buffered data in state
return {
"buffer": self.buffer.copy(),
"last_flush": self.last_flush.isoformat(),
"target_state": self.target_sink.snapshot() if hasattr(self.target_sink, 'snapshot') else None
}
def close(self):
# Flush remaining buffer on close
if self.buffer:
self.target_sink.write_batch(self.buffer)
self.target_sink.close()Multi-destination Sink:
class FanOutSink(StatelessSinkPartition):
def __init__(self, sinks: List[StatelessSinkPartition]):
self.sinks = sinks
def write_batch(self, values):
# Write to all sinks, collecting any errors
errors = []
for i, sink in enumerate(self.sinks):
try:
sink.write_batch(values)
except Exception as e:
errors.append(f"Sink {i}: {e}")
if errors:
raise RuntimeError(f"Failed to write to some sinks: {'; '.join(errors)}")
def close(self):
for sink in self.sinks:
try:
sink.close()
except Exception:
pass # Best effort cleanupExactly-Once Guarantees:
For exactly-once processing, sinks should:
class ExactlyOnceSink(StatefulSinkPartition):
def __init__(self, target_system):
self.target_system = target_system
self.processed_offsets = set() # Track processed items
def write_batch(self, values):
# Filter out already processed items using state
new_values = []
for offset, value in values: # Assume values include offset
if offset not in self.processed_offsets:
new_values.append(value)
self.processed_offsets.add(offset)
if new_values:
self.target_system.write_batch(new_values)
def snapshot(self):
return list(self.processed_offsets)Install with Tessl CLI
npx tessl i tessl/pypi-bytewax