Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
—
Stream processing components for consuming and transforming data streams in real-time. Includes agents for processing message streams, stream transformation operations, and event handling for building reactive data processing pipelines.
Stream processing agents that consume from channels or topics. Agents are async functions decorated with @app.agent() that automatically handle message consumption, acknowledgment, error handling, and scaling.
class Agent:
def __init__(
self,
fun,
*,
channel=None,
name: str = None,
concurrency: int = 1,
sink: list = None,
on_error: callable = None,
supervisor_strategy: str = None,
help: str = None,
**kwargs
):
"""
Stream processing agent.
Args:
fun: Async function to process stream
channel: Channel or topic to consume from
name: Agent name
concurrency: Number of concurrent instances
sink: Channels to forward results to
on_error: Error handler function
supervisor_strategy: Error recovery strategy
help: Help text
"""
async def send(
self,
value=None,
*,
key=None,
partition: int = None
):
"""
Send message to agent's channel.
Args:
value: Message value
key: Message key
partition: Target partition
"""
async def ask(
self,
value=None,
*,
key=None,
partition: int = None,
reply_to: str = None,
correlation_id: str = None
):
"""
Send message and wait for reply (RPC-style).
Args:
value: Message value
key: Message key
partition: Target partition
reply_to: Reply topic
correlation_id: Request correlation ID
Returns:
Reply message
"""
def cast(
self,
value=None,
*,
key=None,
partition: int = None
):
"""
Send message without waiting (fire-and-forget).
Args:
value: Message value
key: Message key
partition: Target partition
"""
async def start(self):
"""Start the agent."""
async def stop(self):
"""Stop the agent."""
def cancel(self):
"""Cancel the agent."""
@property
def channel(self):
"""Agent's input channel."""
@property
def concurrency(self) -> int:
"""Number of concurrent instances."""
@property
def help(self) -> str:
"""Help text for CLI."""Usage Example:
# Basic agent
@app.agent(app.topic('orders'))
async def process_orders(orders):
async for order in orders:
print(f'Processing order: {order}')
# Agent with RPC support
@app.agent(app.topic('calculations'))
async def calculator(calculations):
async for calc in calculations:
result = perform_calculation(calc.operation, calc.values)
await calc.send(
calc.reply_to,
key=calc.correlation_id,
value=result
)
# Sending to agents
await process_orders.send({'id': 1, 'amount': 100})
result = await calculator.ask({'operation': 'sum', 'values': [1, 2, 3]})
calculator.cast({'operation': 'multiply', 'values': [4, 5]})Stream processing interface providing transformation operations on data streams. Streams are async iterators that can be chained with functional programming operations.
class Stream:
def __init__(self, channel, **kwargs):
"""
Create stream from channel.
Args:
channel: Input channel
**kwargs: Stream options
"""
def __aiter__(self):
"""Async iterator interface."""
async def __anext__(self):
"""Get next item from stream."""
def items(self):
"""
Iterate over (key, value) pairs.
Returns:
Stream of (key, value) tuples
"""
def filter(self, fun):
"""
Filter stream items based on predicate.
Args:
fun: Predicate function (item) -> bool
Returns:
Filtered stream
"""
def map(self, fun):
"""
Transform stream items.
Args:
fun: Transform function (item) -> new_item
Returns:
Transformed stream
"""
def group_by(
self,
key,
*,
name: str = None,
topic: str = None
):
"""
Group stream by key function.
Args:
key: Key extraction function
name: Group name
topic: Intermediate topic for grouping
Returns:
Grouped stream
"""
def take(
self,
max_: int,
*,
within: float = None
):
"""
Take at most N items from stream.
Args:
max_: Maximum number of items
within: Time window in seconds
Returns:
Limited stream
"""
def rate_limit(
self,
rate: float,
*,
per: float = 1.0,
within: float = None
):
"""
Rate limit stream processing.
Args:
rate: Maximum rate (items per second)
per: Time period for rate calculation
within: Rate limit window
Returns:
Rate-limited stream
"""
def buffer(
self,
size: int,
*,
timeout: float = None
):
"""
Buffer stream items.
Args:
size: Buffer size
timeout: Buffer flush timeout
Returns:
Buffered stream
"""
def through(self, channel, **kwargs):
"""
Route stream through another channel.
Args:
channel: Target channel
**kwargs: Routing options
Returns:
Routed stream
"""
def echo(self, *args, **kwargs):
"""
Echo stream items to stdout.
Returns:
Echo stream
"""
def join(self, *streams, **kwargs):
"""
Join with other streams.
Args:
*streams: Streams to join with
**kwargs: Join options
Returns:
Joined stream
"""
def combine(self, *streams, **kwargs):
"""
Combine with other streams.
Args:
*streams: Streams to combine
**kwargs: Combine options
Returns:
Combined stream
"""
def concat(self, *streams, **kwargs):
"""
Concatenate with other streams.
Args:
*streams: Streams to concatenate
**kwargs: Concat options
Returns:
Concatenated stream
"""
def tee(self, *streams, **kwargs):
"""
Split stream to multiple outputs.
Args:
*streams: Output streams
**kwargs: Tee options
Returns:
Teed stream
"""Usage Example:
# Basic stream processing
@app.agent(app.topic('numbers'))
async def process_numbers(stream):
async for number in stream:
print(f'Number: {number}')
# Stream transformations
@app.agent(app.topic('raw-data'))
async def transform_data(stream):
async for item in stream.filter(lambda x: x.is_valid).map(lambda x: x.processed_value):
await save_processed_item(item)
# Stream grouping and aggregation
@app.agent(app.topic('events'))
async def aggregate_events(stream):
async for user_id, events in stream.group_by(lambda event: event.user_id):
count = 0
async for event in events:
count += 1
if count >= 10:
await send_alert(user_id, count)
count = 0
# Rate limiting and buffering
@app.agent(app.topic('api-calls'))
async def rate_limited_processing(stream):
async for batch in stream.rate_limit(100.0).buffer(50, timeout=5.0):
await process_batch(batch)Event container representing a single message in a stream with metadata, acknowledgment capabilities, and forwarding operations.
class Event:
def __init__(
self,
key=None,
value=None,
headers: dict = None,
message=None,
timestamp: float = None
):
"""
Stream event container.
Args:
key: Event key
value: Event value
headers: Event headers
message: Underlying message object
timestamp: Event timestamp
"""
def ack(self):
"""Acknowledge event processing."""
def reject(self):
"""Reject event (negative acknowledgment)."""
async def send(
self,
channel,
key=None,
value=None,
partition: int = None,
timestamp: float = None,
headers: dict = None,
**kwargs
):
"""
Send new event to channel.
Args:
channel: Target channel
key: Event key (defaults to current key)
value: Event value (defaults to current value)
partition: Target partition
timestamp: Event timestamp
headers: Event headers
**kwargs: Additional options
"""
async def forward(
self,
channel,
*,
key=None,
value=None,
partition: int = None,
timestamp: float = None,
headers: dict = None,
**kwargs
):
"""
Forward event to another channel.
Args:
channel: Target channel
key: Override key
value: Override value
partition: Target partition
timestamp: Override timestamp
headers: Additional headers
**kwargs: Additional options
"""
@property
def key(self):
"""Event key."""
@property
def value(self):
"""Event value."""
@property
def headers(self) -> dict:
"""Event headers."""
@property
def message(self):
"""Underlying message object."""
@property
def timestamp(self) -> float:
"""Event timestamp (Unix timestamp)."""Usage Example:
# Working with events
@app.agent(app.topic('transactions'))
async def process_transactions(stream):
async for event in stream.events():
try:
# Process the transaction
result = await process_transaction(event.value)
# Forward successful results
await event.forward(
success_topic,
value=result,
headers={'processed_at': time.time()}
)
# Acknowledge processing
event.ack()
except ProcessingError as exc:
# Forward to error topic
await event.forward(
error_topic,
value={'error': str(exc), 'original': event.value}
)
event.ack() # Still ack to avoid reprocessing
except FatalError:
# Reject for reprocessing
event.reject()
# Creating custom events
async def send_notification(user_id, message):
event = Event(
key=user_id,
value=message,
headers={'type': 'notification', 'priority': 'high'},
timestamp=time.time()
)
await event.send(notifications_topic)Function to access the currently processing event within an agent context.
def current_event() -> Event:
"""
Get the currently processing event.
Returns:
Current event instance
Raises:
RuntimeError: If called outside agent context
"""Usage Example:
@app.agent(app.topic('orders'))
async def process_orders(orders):
async for order in orders:
# Get current event for metadata access
event = faust.current_event()
# Log processing with event metadata
print(f'Processing order {order.id} from partition {event.message.partition}')
# Forward based on event headers
if event.headers.get('priority') == 'high':
await event.forward(priority_processing_topic)
else:
await event.forward(normal_processing_topic)
event.ack()from typing import Protocol, AsyncIterator
class AgentT(Protocol):
"""Type interface for stream processing agents."""
async def send(self, value=None, *, key=None, partition=None): ...
async def ask(self, value=None, *, key=None, **kwargs): ...
def cast(self, value=None, *, key=None, partition=None): ...
class StreamT(Protocol):
"""Type interface for data streams."""
def __aiter__(self) -> AsyncIterator: ...
def filter(self, fun): ...
def map(self, fun): ...
def group_by(self, key, **kwargs): ...
class EventT(Protocol):
"""Type interface for stream events."""
key: object
value: object
headers: dict
timestamp: float
def ack(self): ...
def reject(self): ...
async def send(self, channel, **kwargs): ...
async def forward(self, channel, **kwargs): ...Install with Tessl CLI
npx tessl i tessl/pypi-faust