CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

connectors.mddocs/

Built-in Connectors

Pre-built connectors for common external systems including Kafka, files, stdio, and demo sources. These connectors provide production-ready integration with popular data systems and serve as examples for building custom connectors.

Capabilities

Kafka Connectors

High-performance Kafka integration with built-in serialization/deserialization support.

# Kafka operators (from bytewax.connectors.kafka.operators)
def input(step_id: str, flow: Dataflow, brokers: List[str], topics: List[str], starting_offset: str = "stored", consumer_configs: Optional[Dict[str, Any]] = None, batch_size: int = 1000) -> KafkaSourceMessage: ...

def output(step_id: str, up: Stream[KafkaSinkMessage], brokers: List[str], topic: str, producer_configs: Optional[Dict[str, Any]] = None) -> None: ...

# Message types
class KafkaSourceMessage:
    key: Optional[bytes]
    value: bytes
    topic: str
    partition: int
    offset: int
    timestamp: Optional[datetime]

class KafkaSinkMessage:
    def __init__(self, key: Optional[bytes], value: bytes): ...
    key: Optional[bytes]
    value: bytes

Kafka Input Parameters:

  • step_id (str): Unique identifier
  • flow (Dataflow): Target dataflow
  • brokers (List[str]): Kafka broker addresses
  • topics (List[str]): Topics to consume from
  • starting_offset (str): Where to start consuming ("earliest", "latest", "stored")
  • consumer_configs (Dict): Additional Kafka consumer configuration
  • batch_size (int): Maximum messages per batch

Kafka Output Parameters:

  • step_id (str): Unique identifier
  • up (Stream[KafkaSinkMessage]): Stream of messages to send
  • brokers (List[str]): Kafka broker addresses
  • topic (str): Topic to produce to
  • producer_configs (Dict): Additional Kafka producer configuration

Usage Examples:

from bytewax.connectors.kafka import operators as kop
import bytewax.operators as op
import json

# Kafka input with deserialization
kafka_stream = kop.input(
    "kafka_input",
    flow,
    brokers=["localhost:9092"],
    topics=["events", "metrics"],
    starting_offset="latest"
)

# Access the successful messages and errors separately
messages = kafka_stream.oks
errors = kafka_stream.errs

# Process messages
def deserialize_json(msg):
    try:
        return json.loads(msg.value.decode('utf-8'))
    except json.JSONDecodeError:
        return None

events = op.filter_map("deserialize", messages, deserialize_json)

# Kafka output with serialization
def create_kafka_message(event):
    key = event.get("user_id", "").encode('utf-8') if event.get("user_id") else None
    value = json.dumps(event).encode('utf-8')
    return KafkaSinkMessage(key, value)

kafka_messages = op.map("serialize", processed_events, create_kafka_message)

kop.output(
    "kafka_output",
    kafka_messages,
    brokers=["localhost:9092"],
    topic="processed_events"
)

Kafka Serialization/Deserialization

Built-in serializers and deserializers for common data formats.

# From bytewax.connectors.kafka.serde
class BytesDeserializer: ...
class JsonDeserializer: ...
class AvroDeserializer:
    def __init__(self, schema_registry_url: str, schema_id: Optional[int] = None): ...

class BytesSerializer: ...
class JsonSerializer: ...
class AvroSerializer:
    def __init__(self, schema_registry_url: str, schema: str): ...

Usage Example:

from bytewax.connectors.kafka.serde import JsonDeserializer, JsonSerializer

# Using deserializer
json_deserializer = JsonDeserializer()
deserialized = op.map("deserialize", kafka_messages, lambda msg: json_deserializer(msg.value))

# Using serializer  
json_serializer = JsonSerializer()
serialized = op.map("serialize", events, lambda event: json_serializer(event))

File Connectors

File system integration for reading from and writing to files with various formats.

class FileSource:
    def __init__(self, paths: List[str], batch_size: int = 1000, encoding: str = "utf-8"): ...

class FileSink:
    def __init__(self, path: str, encoding: str = "utf-8", append: bool = False): ...

class DirSource:
    def __init__(self, paths: List[str], pattern: str = "*", recursive: bool = False): ...

class CSVSource:
    def __init__(self, paths: List[str], **csv_kwargs): ...

FileSource Parameters:

  • paths (List[str]): File paths to read from
  • batch_size (int): Lines per batch
  • encoding (str): File encoding

FileSink Parameters:

  • path (str): Output file path
  • encoding (str): File encoding
  • append (bool): Append to existing file

Usage Examples:

from bytewax.connectors.files import FileSource, FileSink, CSVSource

# Read from multiple files
file_stream = op.input("files", flow, FileSource([
    "/data/logs/app.log",
    "/data/logs/error.log"
], batch_size=500))

# Write to file
op.output("file_output", processed_data, FileSink("/output/results.jsonl"))

# Read CSV files
csv_stream = op.input("csv", flow, CSVSource([
    "/data/sales.csv",
    "/data/inventory.csv"
], delimiter=',', skip_header=True))

Standard I/O Connectors

Simple connectors for stdin/stdout, useful for command-line tools and debugging.

class StdInSource: ...

class StdOutSink: ...

Usage Examples:

from bytewax.connectors.stdio import StdInSource, StdOutSink

# Read from stdin
stdin_stream = op.input("stdin", flow, StdInSource())

# Write to stdout (useful for debugging)
op.output("stdout", results, StdOutSink())

# Pipeline example: process stdin to stdout
flow = Dataflow("stdin_processor")
lines = op.input("stdin", flow, StdInSource())
processed = op.map("process", lines, lambda line: line.upper().strip())
op.output("stdout", processed, StdOutSink())

Demo and Testing Connectors

Connectors for generating demo data and testing scenarios.

class DemoSource:
    def __init__(self, data: Iterable[Any], interval: timedelta = timedelta(seconds=1)): ...

class RandomMetricSource:
    def __init__(self, metric_names: List[str], interval: timedelta = timedelta(seconds=1), min_value: float = 0.0, max_value: float = 100.0): ...

DemoSource Parameters:

  • data (Iterable): Data to emit cyclically
  • interval (timedelta): Time between emissions

RandomMetricSource Parameters:

  • metric_names (List[str]): Names of metrics to generate
  • interval (timedelta): Generation interval
  • min_value, max_value (float): Value range

Usage Examples:

from bytewax.connectors.demo import DemoSource, RandomMetricSource
from datetime import timedelta

# Cycle through demo data
demo_data = [
    {"user": "alice", "action": "login"},
    {"user": "bob", "action": "purchase"},
    {"user": "alice", "action": "logout"}
]

demo_stream = op.input("demo", flow, DemoSource(demo_data, interval=timedelta(seconds=2)))

# Generate random metrics
metrics_stream = op.input("metrics", flow, RandomMetricSource(
    metric_names=["cpu_usage", "memory_usage", "disk_io"],
    interval=timedelta(milliseconds=500),
    min_value=0.0,
    max_value=100.0
))

Custom Connector Patterns

HTTP API Connector:

class HTTPSource(DynamicSource):
    def __init__(self, url, headers=None, poll_interval=timedelta(seconds=10)):
        self.url = url
        self.headers = headers or {}
        self.poll_interval = poll_interval
    
    def build(self, step_id, worker_index, worker_count):
        return HTTPPartition(self.url, self.headers, self.poll_interval)

class HTTPPartition(StatelessSourcePartition):
    def __init__(self, url, headers, poll_interval):
        self.url = url
        self.headers = headers
        self.poll_interval = poll_interval
        self.last_poll = None
    
    def next_batch(self):
        response = requests.get(self.url, headers=self.headers)
        if response.ok:
            data = response.json()
            return data.get('items', [])
        return []
    
    def next_awake(self):
        now = datetime.now()
        if self.last_poll:
            return self.last_poll + self.poll_interval
        return now

WebSocket Connector:

import websocket
import json
from queue import Queue, Empty

class WebSocketSource(DynamicSource):
    def __init__(self, url):
        self.url = url
        self.message_queue = Queue()
        self.ws = None
    
    def build(self, step_id, worker_index, worker_count):
        if not self.ws:
            self.ws = websocket.WebSocketApp(
                self.url,
                on_message=self._on_message,
                on_error=self._on_error
            )
            # Start WebSocket in background thread
            import threading
            ws_thread = threading.Thread(target=self.ws.run_forever)
            ws_thread.daemon = True
            ws_thread.start()
        
        return WebSocketPartition(self.message_queue)
    
    def _on_message(self, ws, message):
        try:
            data = json.loads(message)
            self.message_queue.put(data)
        except json.JSONDecodeError:
            pass
    
    def _on_error(self, ws, error):
        print(f"WebSocket error: {error}")

class WebSocketPartition(StatelessSourcePartition):
    def __init__(self, message_queue):
        self.message_queue = message_queue
    
    def next_batch(self):
        messages = []
        try:
            # Get up to 100 messages without blocking
            for _ in range(100):
                message = self.message_queue.get_nowait()
                messages.append(message)
        except Empty:
            pass
        return messages
    
    def next_awake(self):
        # Check again in 100ms if no messages
        return datetime.now() + timedelta(milliseconds=100)

Database Streaming Connector:

class DatabaseStreamSource(FixedPartitionedSource):
    def __init__(self, connection_string, query, partition_column="id"):
        self.connection_string = connection_string
        self.query = query
        self.partition_column = partition_column
    
    def list_parts(self):
        # Could partition by date, hash, etc.
        return ["partition_0", "partition_1", "partition_2", "partition_3"]
    
    def build_part(self, step_id, for_part, resume_state):
        partition_id = int(for_part.split("_")[1])
        return DatabasePartition(
            self.connection_string, 
            self.query, 
            self.partition_column,
            partition_id, 
            4,  # total partitions
            resume_state
        )

class DatabasePartition(StatefulSourcePartition):
    def __init__(self, connection_string, query, partition_column, partition_id, total_partitions, resume_state):
        self.connection_string = connection_string
        self.base_query = query
        self.partition_column = partition_column
        self.partition_id = partition_id
        self.total_partitions = total_partitions
        self.last_id = resume_state or 0
        self.connection = None
    
    def next_batch(self):
        if not self.connection:
            self.connection = psycopg2.connect(self.connection_string)
        
        # Add partitioning and resume logic to query
        query = f"""
            {self.base_query}
            WHERE {self.partition_column} > %s 
            AND {self.partition_column} %% %s = %s
            ORDER BY {self.partition_column}
            LIMIT 1000
        """
        
        cursor = self.connection.cursor()
        cursor.execute(query, (self.last_id, self.total_partitions, self.partition_id))
        
        rows = cursor.fetchall()
        if rows:
            # Update last seen ID
            self.last_id = max(row[0] for row in rows)  # Assume first column is ID
            return [dict(zip([desc[0] for desc in cursor.description], row)) for row in rows]
        
        return []
    
    def snapshot(self):
        return self.last_id
    
    def close(self):
        if self.connection:
            self.connection.close()

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json