Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pre-built connectors for common external systems including Kafka, files, stdio, and demo sources. These connectors provide production-ready integration with popular data systems and serve as examples for building custom connectors.
High-performance Kafka integration with built-in serialization/deserialization support.
# Kafka operators (from bytewax.connectors.kafka.operators)
def input(step_id: str, flow: Dataflow, brokers: List[str], topics: List[str], starting_offset: str = "stored", consumer_configs: Optional[Dict[str, Any]] = None, batch_size: int = 1000) -> KafkaSourceMessage: ...
def output(step_id: str, up: Stream[KafkaSinkMessage], brokers: List[str], topic: str, producer_configs: Optional[Dict[str, Any]] = None) -> None: ...
# Message types
class KafkaSourceMessage:
key: Optional[bytes]
value: bytes
topic: str
partition: int
offset: int
timestamp: Optional[datetime]
class KafkaSinkMessage:
def __init__(self, key: Optional[bytes], value: bytes): ...
key: Optional[bytes]
value: bytesKafka Input Parameters:
step_id (str): Unique identifierflow (Dataflow): Target dataflowbrokers (List[str]): Kafka broker addressestopics (List[str]): Topics to consume fromstarting_offset (str): Where to start consuming ("earliest", "latest", "stored")consumer_configs (Dict): Additional Kafka consumer configurationbatch_size (int): Maximum messages per batchKafka Output Parameters:
step_id (str): Unique identifierup (Stream[KafkaSinkMessage]): Stream of messages to sendbrokers (List[str]): Kafka broker addressestopic (str): Topic to produce toproducer_configs (Dict): Additional Kafka producer configurationUsage Examples:
from bytewax.connectors.kafka import operators as kop
import bytewax.operators as op
import json
# Kafka input with deserialization
kafka_stream = kop.input(
"kafka_input",
flow,
brokers=["localhost:9092"],
topics=["events", "metrics"],
starting_offset="latest"
)
# Access the successful messages and errors separately
messages = kafka_stream.oks
errors = kafka_stream.errs
# Process messages
def deserialize_json(msg):
try:
return json.loads(msg.value.decode('utf-8'))
except json.JSONDecodeError:
return None
events = op.filter_map("deserialize", messages, deserialize_json)
# Kafka output with serialization
def create_kafka_message(event):
key = event.get("user_id", "").encode('utf-8') if event.get("user_id") else None
value = json.dumps(event).encode('utf-8')
return KafkaSinkMessage(key, value)
kafka_messages = op.map("serialize", processed_events, create_kafka_message)
kop.output(
"kafka_output",
kafka_messages,
brokers=["localhost:9092"],
topic="processed_events"
)Built-in serializers and deserializers for common data formats.
# From bytewax.connectors.kafka.serde
class BytesDeserializer: ...
class JsonDeserializer: ...
class AvroDeserializer:
def __init__(self, schema_registry_url: str, schema_id: Optional[int] = None): ...
class BytesSerializer: ...
class JsonSerializer: ...
class AvroSerializer:
def __init__(self, schema_registry_url: str, schema: str): ...Usage Example:
from bytewax.connectors.kafka.serde import JsonDeserializer, JsonSerializer
# Using deserializer
json_deserializer = JsonDeserializer()
deserialized = op.map("deserialize", kafka_messages, lambda msg: json_deserializer(msg.value))
# Using serializer
json_serializer = JsonSerializer()
serialized = op.map("serialize", events, lambda event: json_serializer(event))File system integration for reading from and writing to files with various formats.
class FileSource:
def __init__(self, paths: List[str], batch_size: int = 1000, encoding: str = "utf-8"): ...
class FileSink:
def __init__(self, path: str, encoding: str = "utf-8", append: bool = False): ...
class DirSource:
def __init__(self, paths: List[str], pattern: str = "*", recursive: bool = False): ...
class CSVSource:
def __init__(self, paths: List[str], **csv_kwargs): ...FileSource Parameters:
paths (List[str]): File paths to read frombatch_size (int): Lines per batchencoding (str): File encodingFileSink Parameters:
path (str): Output file pathencoding (str): File encodingappend (bool): Append to existing fileUsage Examples:
from bytewax.connectors.files import FileSource, FileSink, CSVSource
# Read from multiple files
file_stream = op.input("files", flow, FileSource([
"/data/logs/app.log",
"/data/logs/error.log"
], batch_size=500))
# Write to file
op.output("file_output", processed_data, FileSink("/output/results.jsonl"))
# Read CSV files
csv_stream = op.input("csv", flow, CSVSource([
"/data/sales.csv",
"/data/inventory.csv"
], delimiter=',', skip_header=True))Simple connectors for stdin/stdout, useful for command-line tools and debugging.
class StdInSource: ...
class StdOutSink: ...Usage Examples:
from bytewax.connectors.stdio import StdInSource, StdOutSink
# Read from stdin
stdin_stream = op.input("stdin", flow, StdInSource())
# Write to stdout (useful for debugging)
op.output("stdout", results, StdOutSink())
# Pipeline example: process stdin to stdout
flow = Dataflow("stdin_processor")
lines = op.input("stdin", flow, StdInSource())
processed = op.map("process", lines, lambda line: line.upper().strip())
op.output("stdout", processed, StdOutSink())Connectors for generating demo data and testing scenarios.
class DemoSource:
def __init__(self, data: Iterable[Any], interval: timedelta = timedelta(seconds=1)): ...
class RandomMetricSource:
def __init__(self, metric_names: List[str], interval: timedelta = timedelta(seconds=1), min_value: float = 0.0, max_value: float = 100.0): ...DemoSource Parameters:
data (Iterable): Data to emit cyclicallyinterval (timedelta): Time between emissionsRandomMetricSource Parameters:
metric_names (List[str]): Names of metrics to generateinterval (timedelta): Generation intervalmin_value, max_value (float): Value rangeUsage Examples:
from bytewax.connectors.demo import DemoSource, RandomMetricSource
from datetime import timedelta
# Cycle through demo data
demo_data = [
{"user": "alice", "action": "login"},
{"user": "bob", "action": "purchase"},
{"user": "alice", "action": "logout"}
]
demo_stream = op.input("demo", flow, DemoSource(demo_data, interval=timedelta(seconds=2)))
# Generate random metrics
metrics_stream = op.input("metrics", flow, RandomMetricSource(
metric_names=["cpu_usage", "memory_usage", "disk_io"],
interval=timedelta(milliseconds=500),
min_value=0.0,
max_value=100.0
))HTTP API Connector:
class HTTPSource(DynamicSource):
def __init__(self, url, headers=None, poll_interval=timedelta(seconds=10)):
self.url = url
self.headers = headers or {}
self.poll_interval = poll_interval
def build(self, step_id, worker_index, worker_count):
return HTTPPartition(self.url, self.headers, self.poll_interval)
class HTTPPartition(StatelessSourcePartition):
def __init__(self, url, headers, poll_interval):
self.url = url
self.headers = headers
self.poll_interval = poll_interval
self.last_poll = None
def next_batch(self):
response = requests.get(self.url, headers=self.headers)
if response.ok:
data = response.json()
return data.get('items', [])
return []
def next_awake(self):
now = datetime.now()
if self.last_poll:
return self.last_poll + self.poll_interval
return nowWebSocket Connector:
import websocket
import json
from queue import Queue, Empty
class WebSocketSource(DynamicSource):
def __init__(self, url):
self.url = url
self.message_queue = Queue()
self.ws = None
def build(self, step_id, worker_index, worker_count):
if not self.ws:
self.ws = websocket.WebSocketApp(
self.url,
on_message=self._on_message,
on_error=self._on_error
)
# Start WebSocket in background thread
import threading
ws_thread = threading.Thread(target=self.ws.run_forever)
ws_thread.daemon = True
ws_thread.start()
return WebSocketPartition(self.message_queue)
def _on_message(self, ws, message):
try:
data = json.loads(message)
self.message_queue.put(data)
except json.JSONDecodeError:
pass
def _on_error(self, ws, error):
print(f"WebSocket error: {error}")
class WebSocketPartition(StatelessSourcePartition):
def __init__(self, message_queue):
self.message_queue = message_queue
def next_batch(self):
messages = []
try:
# Get up to 100 messages without blocking
for _ in range(100):
message = self.message_queue.get_nowait()
messages.append(message)
except Empty:
pass
return messages
def next_awake(self):
# Check again in 100ms if no messages
return datetime.now() + timedelta(milliseconds=100)Database Streaming Connector:
class DatabaseStreamSource(FixedPartitionedSource):
def __init__(self, connection_string, query, partition_column="id"):
self.connection_string = connection_string
self.query = query
self.partition_column = partition_column
def list_parts(self):
# Could partition by date, hash, etc.
return ["partition_0", "partition_1", "partition_2", "partition_3"]
def build_part(self, step_id, for_part, resume_state):
partition_id = int(for_part.split("_")[1])
return DatabasePartition(
self.connection_string,
self.query,
self.partition_column,
partition_id,
4, # total partitions
resume_state
)
class DatabasePartition(StatefulSourcePartition):
def __init__(self, connection_string, query, partition_column, partition_id, total_partitions, resume_state):
self.connection_string = connection_string
self.base_query = query
self.partition_column = partition_column
self.partition_id = partition_id
self.total_partitions = total_partitions
self.last_id = resume_state or 0
self.connection = None
def next_batch(self):
if not self.connection:
self.connection = psycopg2.connect(self.connection_string)
# Add partitioning and resume logic to query
query = f"""
{self.base_query}
WHERE {self.partition_column} > %s
AND {self.partition_column} %% %s = %s
ORDER BY {self.partition_column}
LIMIT 1000
"""
cursor = self.connection.cursor()
cursor.execute(query, (self.last_id, self.total_partitions, self.partition_id))
rows = cursor.fetchall()
if rows:
# Update last seen ID
self.last_id = max(row[0] for row in rows) # Assume first column is ID
return [dict(zip([desc[0] for desc in cursor.description], row)) for row in rows]
return []
def snapshot(self):
return self.last_id
def close(self):
if self.connection:
self.connection.close()Install with Tessl CLI
npx tessl i tessl/pypi-bytewax