CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Stream processing components for consuming and transforming data streams in real-time. Includes agents for processing message streams, stream transformation operations, and event handling for building reactive data processing pipelines.

Capabilities

Agent Class

Stream processing agents that consume from channels or topics. Agents are async functions decorated with @app.agent() that automatically handle message consumption, acknowledgment, error handling, and scaling.

class Agent:
    def __init__(
        self,
        fun,
        *,
        channel=None,
        name: str = None,
        concurrency: int = 1,
        sink: list = None,
        on_error: callable = None,
        supervisor_strategy: str = None,
        help: str = None,
        **kwargs
    ):
        """
        Stream processing agent.
        
        Args:
            fun: Async function to process stream
            channel: Channel or topic to consume from
            name: Agent name
            concurrency: Number of concurrent instances
            sink: Channels to forward results to
            on_error: Error handler function
            supervisor_strategy: Error recovery strategy
            help: Help text
        """

    async def send(
        self,
        value=None,
        *,
        key=None,
        partition: int = None
    ):
        """
        Send message to agent's channel.
        
        Args:
            value: Message value
            key: Message key
            partition: Target partition
        """

    async def ask(
        self,
        value=None,
        *,
        key=None,
        partition: int = None,
        reply_to: str = None,
        correlation_id: str = None
    ):
        """
        Send message and wait for reply (RPC-style).
        
        Args:
            value: Message value
            key: Message key
            partition: Target partition
            reply_to: Reply topic
            correlation_id: Request correlation ID
            
        Returns:
            Reply message
        """

    def cast(
        self,
        value=None,
        *,
        key=None,
        partition: int = None
    ):
        """
        Send message without waiting (fire-and-forget).
        
        Args:
            value: Message value
            key: Message key
            partition: Target partition
        """

    async def start(self):
        """Start the agent."""

    async def stop(self):
        """Stop the agent."""

    def cancel(self):
        """Cancel the agent."""

    @property
    def channel(self):
        """Agent's input channel."""

    @property
    def concurrency(self) -> int:
        """Number of concurrent instances."""

    @property
    def help(self) -> str:
        """Help text for CLI."""

Usage Example:

# Basic agent
@app.agent(app.topic('orders'))
async def process_orders(orders):
    async for order in orders:
        print(f'Processing order: {order}')

# Agent with RPC support
@app.agent(app.topic('calculations'))
async def calculator(calculations):
    async for calc in calculations:
        result = perform_calculation(calc.operation, calc.values)
        await calc.send(
            calc.reply_to,
            key=calc.correlation_id,
            value=result
        )

# Sending to agents
await process_orders.send({'id': 1, 'amount': 100})
result = await calculator.ask({'operation': 'sum', 'values': [1, 2, 3]})
calculator.cast({'operation': 'multiply', 'values': [4, 5]})

Stream Class

Stream processing interface providing transformation operations on data streams. Streams are async iterators that can be chained with functional programming operations.

class Stream:
    def __init__(self, channel, **kwargs):
        """
        Create stream from channel.
        
        Args:
            channel: Input channel
            **kwargs: Stream options
        """

    def __aiter__(self):
        """Async iterator interface."""

    async def __anext__(self):
        """Get next item from stream."""

    def items(self):
        """
        Iterate over (key, value) pairs.
        
        Returns:
            Stream of (key, value) tuples
        """

    def filter(self, fun):
        """
        Filter stream items based on predicate.
        
        Args:
            fun: Predicate function (item) -> bool
            
        Returns:
            Filtered stream
        """

    def map(self, fun):
        """
        Transform stream items.
        
        Args:
            fun: Transform function (item) -> new_item
            
        Returns:
            Transformed stream
        """

    def group_by(
        self,
        key,
        *,
        name: str = None,
        topic: str = None
    ):
        """
        Group stream by key function.
        
        Args:
            key: Key extraction function
            name: Group name
            topic: Intermediate topic for grouping
            
        Returns:
            Grouped stream
        """

    def take(
        self,
        max_: int,
        *,
        within: float = None
    ):
        """
        Take at most N items from stream.
        
        Args:
            max_: Maximum number of items
            within: Time window in seconds
            
        Returns:
            Limited stream
        """

    def rate_limit(
        self,
        rate: float,
        *,
        per: float = 1.0,
        within: float = None
    ):
        """
        Rate limit stream processing.
        
        Args:
            rate: Maximum rate (items per second)
            per: Time period for rate calculation
            within: Rate limit window
            
        Returns:
            Rate-limited stream
        """

    def buffer(
        self,
        size: int,
        *,
        timeout: float = None
    ):
        """
        Buffer stream items.
        
        Args:
            size: Buffer size
            timeout: Buffer flush timeout
            
        Returns:
            Buffered stream
        """

    def through(self, channel, **kwargs):
        """
        Route stream through another channel.
        
        Args:
            channel: Target channel
            **kwargs: Routing options
            
        Returns:
            Routed stream
        """

    def echo(self, *args, **kwargs):
        """
        Echo stream items to stdout.
        
        Returns:
            Echo stream
        """

    def join(self, *streams, **kwargs):
        """
        Join with other streams.
        
        Args:
            *streams: Streams to join with
            **kwargs: Join options
            
        Returns:
            Joined stream
        """

    def combine(self, *streams, **kwargs):
        """
        Combine with other streams.
        
        Args:
            *streams: Streams to combine
            **kwargs: Combine options
            
        Returns:
            Combined stream
        """

    def concat(self, *streams, **kwargs):
        """
        Concatenate with other streams.
        
        Args:
            *streams: Streams to concatenate
            **kwargs: Concat options
            
        Returns:
            Concatenated stream
        """

    def tee(self, *streams, **kwargs):
        """
        Split stream to multiple outputs.
        
        Args:
            *streams: Output streams
            **kwargs: Tee options
            
        Returns:
            Teed stream
        """

Usage Example:

# Basic stream processing
@app.agent(app.topic('numbers'))
async def process_numbers(stream):
    async for number in stream:
        print(f'Number: {number}')

# Stream transformations
@app.agent(app.topic('raw-data'))
async def transform_data(stream):
    async for item in stream.filter(lambda x: x.is_valid).map(lambda x: x.processed_value):
        await save_processed_item(item)

# Stream grouping and aggregation
@app.agent(app.topic('events'))
async def aggregate_events(stream):
    async for user_id, events in stream.group_by(lambda event: event.user_id):
        count = 0
        async for event in events:
            count += 1
            if count >= 10:
                await send_alert(user_id, count)
                count = 0

# Rate limiting and buffering
@app.agent(app.topic('api-calls'))
async def rate_limited_processing(stream):
    async for batch in stream.rate_limit(100.0).buffer(50, timeout=5.0):
        await process_batch(batch)

Event Class

Event container representing a single message in a stream with metadata, acknowledgment capabilities, and forwarding operations.

class Event:
    def __init__(
        self,
        key=None,
        value=None,
        headers: dict = None,
        message=None,
        timestamp: float = None
    ):
        """
        Stream event container.
        
        Args:
            key: Event key
            value: Event value
            headers: Event headers
            message: Underlying message object
            timestamp: Event timestamp
        """

    def ack(self):
        """Acknowledge event processing."""

    def reject(self):
        """Reject event (negative acknowledgment)."""

    async def send(
        self,
        channel,
        key=None,
        value=None,
        partition: int = None,
        timestamp: float = None,
        headers: dict = None,
        **kwargs
    ):
        """
        Send new event to channel.
        
        Args:
            channel: Target channel
            key: Event key (defaults to current key)
            value: Event value (defaults to current value)
            partition: Target partition
            timestamp: Event timestamp
            headers: Event headers
            **kwargs: Additional options
        """

    async def forward(
        self,
        channel,
        *,
        key=None,
        value=None,
        partition: int = None,
        timestamp: float = None,
        headers: dict = None,
        **kwargs
    ):
        """
        Forward event to another channel.
        
        Args:
            channel: Target channel
            key: Override key
            value: Override value
            partition: Target partition
            timestamp: Override timestamp
            headers: Additional headers
            **kwargs: Additional options
        """

    @property
    def key(self):
        """Event key."""

    @property
    def value(self):
        """Event value."""

    @property
    def headers(self) -> dict:
        """Event headers."""

    @property
    def message(self):
        """Underlying message object."""

    @property
    def timestamp(self) -> float:
        """Event timestamp (Unix timestamp)."""

Usage Example:

# Working with events
@app.agent(app.topic('transactions'))
async def process_transactions(stream):
    async for event in stream.events():
        try:
            # Process the transaction
            result = await process_transaction(event.value)
            
            # Forward successful results
            await event.forward(
                success_topic,
                value=result,
                headers={'processed_at': time.time()}
            )
            
            # Acknowledge processing
            event.ack()
            
        except ProcessingError as exc:
            # Forward to error topic
            await event.forward(
                error_topic,
                value={'error': str(exc), 'original': event.value}
            )
            event.ack()  # Still ack to avoid reprocessing
            
        except FatalError:
            # Reject for reprocessing
            event.reject()

# Creating custom events
async def send_notification(user_id, message):
    event = Event(
        key=user_id,
        value=message,
        headers={'type': 'notification', 'priority': 'high'},
        timestamp=time.time()
    )
    await event.send(notifications_topic)

Current Event Access

Function to access the currently processing event within an agent context.

def current_event() -> Event:
    """
    Get the currently processing event.
    
    Returns:
        Current event instance
        
    Raises:
        RuntimeError: If called outside agent context
    """

Usage Example:

@app.agent(app.topic('orders'))
async def process_orders(orders):
    async for order in orders:
        # Get current event for metadata access
        event = faust.current_event()
        
        # Log processing with event metadata
        print(f'Processing order {order.id} from partition {event.message.partition}')
        
        # Forward based on event headers
        if event.headers.get('priority') == 'high':
            await event.forward(priority_processing_topic)
        else:
            await event.forward(normal_processing_topic)
        
        event.ack()

Type Interfaces

from typing import Protocol, AsyncIterator

class AgentT(Protocol):
    """Type interface for stream processing agents."""
    
    async def send(self, value=None, *, key=None, partition=None): ...
    async def ask(self, value=None, *, key=None, **kwargs): ...
    def cast(self, value=None, *, key=None, partition=None): ...

class StreamT(Protocol):
    """Type interface for data streams."""
    
    def __aiter__(self) -> AsyncIterator: ...
    def filter(self, fun): ...
    def map(self, fun): ...
    def group_by(self, key, **kwargs): ...

class EventT(Protocol):
    """Type interface for stream events."""
    
    key: object
    value: object
    headers: dict
    timestamp: float
    
    def ack(self): ...
    def reject(self): ...
    async def send(self, channel, **kwargs): ...
    async def forward(self, channel, **kwargs): ...

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json