CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sinks.mddocs/

Output Sinks

Interfaces and implementations for writing data to external systems. Sinks provide the exit point for processed data from Bytewax dataflows, with support for exactly-once processing guarantees through state management and recovery.

Capabilities

Sink Base Classes

Abstract base classes that define the sink interface for different output patterns.

class Sink[X]: ...

class StatefulSinkPartition[X, S]:
    def write_batch(self, values: List[X]) -> None: ...
    def snapshot(self) -> S: ...
    def close(self) -> None: ...

class StatelessSinkPartition[X]:
    def write_batch(self, values: List[X]) -> None: ...
    def close(self) -> None: ...

StatefulSinkPartition Methods:

  • write_batch(values): Write a batch of values to the sink
  • snapshot(): Return state for recovery (must be pickle-able and immutable)
  • close(): Clean up resources when dataflow completes

StatelessSinkPartition Methods:

  • write_batch(values): Write a batch of values to the sink
  • close(): Clean up resources

Sink Implementations

Concrete sink types for different partitioning and delivery strategies.

class FixedPartitionedSink[X, S](Sink[Tuple[str, X]]):
    def list_parts(self) -> List[str]: ...
    def part_fn(self, item_key: str) -> int: ...
    def build_part(self, step_id: str, for_part: str, resume_state: Optional[S]) -> StatefulSinkPartition[X, S]: ...

class DynamicSink[X](Sink[X]):
    def build(self, step_id: str, worker_index: int, worker_count: int) -> StatelessSinkPartition[X]: ...

FixedPartitionedSink Methods:

  • list_parts(): Return list of partition identifiers available to this worker
  • part_fn(item_key): Map item key to partition index for routing
  • build_part(): Create partition handler for specific partition

DynamicSink Methods:

  • build(): Create sink partition for specific worker

Usage Examples:

# Custom database sink with partitioning
class DatabaseSink(FixedPartitionedSink):
    def __init__(self, connection_string, table_name):
        self.connection_string = connection_string
        self.table_name = table_name
    
    def list_parts(self):
        # Return partitions this worker can write to
        return ["partition_0", "partition_1"]
    
    def part_fn(self, item_key):
        # Route items by key hash
        return hash(item_key) % 2
    
    def build_part(self, step_id, for_part, resume_state):
        return DatabasePartition(
            self.connection_string, 
            self.table_name, 
            for_part, 
            resume_state
        )

# Simple dynamic sink
class FileSink(DynamicSink):
    def __init__(self, file_path):
        self.file_path = file_path
    
    def build(self, step_id, worker_index, worker_count):
        worker_file = f"{self.file_path}.worker_{worker_index}"
        return FilePartition(worker_file)

Sink Implementation Patterns

File Writing with State:

class FilePartition(StatefulSinkPartition):
    def __init__(self, file_path, resume_state=None):
        self.file_path = file_path
        self.bytes_written = resume_state or 0
        self.file_handle = None
        self._open_file()
    
    def _open_file(self):
        # Open file and seek to resume position
        self.file_handle = open(self.file_path, 'a')
        if self.bytes_written > 0:
            # Verify file position matches state
            current_size = os.path.getsize(self.file_path)
            if current_size != self.bytes_written:
                raise ValueError(f"File size mismatch: expected {self.bytes_written}, got {current_size}")
    
    def write_batch(self, values):
        for value in values:
            line = json.dumps(value) + '\n'
            bytes_to_write = len(line.encode('utf-8'))
            self.file_handle.write(line)
            self.bytes_written += bytes_to_write
        
        self.file_handle.flush()
        os.fsync(self.file_handle.fileno())  # Ensure durability
    
    def snapshot(self):
        return self.bytes_written
    
    def close(self):
        if self.file_handle:
            self.file_handle.close()

Database Writing with Transactions:

class DatabasePartition(StatefulSinkPartition):
    def __init__(self, connection_string, table_name, partition_id, resume_state=None):
        self.connection_string = connection_string
        self.table_name = table_name
        self.partition_id = partition_id
        self.last_offset = resume_state or 0
        self.connection = None
        self._connect()
    
    def _connect(self):
        self.connection = psycopg2.connect(self.connection_string)
        self.connection.autocommit = False
    
    def write_batch(self, values):
        cursor = self.connection.cursor()
        try:
            for i, value in enumerate(values):
                current_offset = self.last_offset + i + 1
                
                # Use offset as idempotency key
                cursor.execute("""
                    INSERT INTO {} (partition_id, offset, data) 
                    VALUES (%s, %s, %s)
                    ON CONFLICT (partition_id, offset) DO NOTHING
                """.format(self.table_name), 
                (self.partition_id, current_offset, json.dumps(value)))
            
            self.connection.commit()
            self.last_offset += len(values)
            
        except Exception as e:
            self.connection.rollback()
            raise e
        finally:
            cursor.close()
    
    def snapshot(self):
        return self.last_offset
    
    def close(self):
        if self.connection:
            self.connection.close()

HTTP API Sink with Retry:

import time
import requests
from typing import List

class APIPartition(StatelessSinkPartition):
    def __init__(self, endpoint, api_key, max_retries=3):
        self.endpoint = endpoint
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.max_retries = max_retries
    
    def write_batch(self, values):
        payload = {"events": values}
        
        for attempt in range(self.max_retries):
            try:
                response = requests.post(
                    self.endpoint,
                    json=payload,
                    headers=self.headers,
                    timeout=30
                )
                
                if response.status_code == 200:
                    return  # Success
                elif response.status_code == 429:  # Rate limited
                    time.sleep(2 ** attempt)  # Exponential backoff
                    continue
                else:
                    response.raise_for_status()
                    
            except requests.RequestException as e:
                if attempt == self.max_retries - 1:
                    raise e
                time.sleep(2 ** attempt)
    
    def close(self):
        # No resources to clean up
        pass

Batch Accumulation Pattern:

class BatchingSink(StatefulSinkPartition):
    def __init__(self, target_sink, batch_size=1000, flush_interval=timedelta(seconds=30)):
        self.target_sink = target_sink
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer = []
        self.last_flush = datetime.now()
    
    def write_batch(self, values):
        self.buffer.extend(values)
        
        # Flush if buffer is full or enough time has passed
        now = datetime.now()
        should_flush = (
            len(self.buffer) >= self.batch_size or
            now - self.last_flush >= self.flush_interval
        )
        
        if should_flush and self.buffer:
            self.target_sink.write_batch(self.buffer)
            self.buffer.clear()
            self.last_flush = now
    
    def snapshot(self):
        # Include buffered data in state
        return {
            "buffer": self.buffer.copy(),
            "last_flush": self.last_flush.isoformat(),
            "target_state": self.target_sink.snapshot() if hasattr(self.target_sink, 'snapshot') else None
        }
    
    def close(self):
        # Flush remaining buffer on close
        if self.buffer:
            self.target_sink.write_batch(self.buffer)
        self.target_sink.close()

Multi-destination Sink:

class FanOutSink(StatelessSinkPartition):
    def __init__(self, sinks: List[StatelessSinkPartition]):
        self.sinks = sinks
    
    def write_batch(self, values):
        # Write to all sinks, collecting any errors
        errors = []
        for i, sink in enumerate(self.sinks):
            try:
                sink.write_batch(values)
            except Exception as e:
                errors.append(f"Sink {i}: {e}")
        
        if errors:
            raise RuntimeError(f"Failed to write to some sinks: {'; '.join(errors)}")
    
    def close(self):
        for sink in self.sinks:
            try:
                sink.close()
            except Exception:
                pass  # Best effort cleanup

Exactly-Once Guarantees:

For exactly-once processing, sinks should:

  1. Use idempotent operations when possible (e.g., upserts instead of inserts)
  2. Maintain offset/sequence numbers in state for deduplication
  3. Use transactions to ensure atomic writes with state updates
  4. Implement proper error handling with appropriate retry logic
class ExactlyOnceSink(StatefulSinkPartition):
    def __init__(self, target_system):
        self.target_system = target_system
        self.processed_offsets = set()  # Track processed items
    
    def write_batch(self, values):
        # Filter out already processed items using state
        new_values = []
        for offset, value in values:  # Assume values include offset
            if offset not in self.processed_offsets:
                new_values.append(value)
                self.processed_offsets.add(offset)
        
        if new_values:
            self.target_system.write_batch(new_values)
    
    def snapshot(self):
        return list(self.processed_offsets)

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json