CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

topics-channels.mddocs/

Topics and Channels

Topic and channel management for message distribution in Faust applications. Topics represent Kafka topics with configuration, partitioning, and serialization support, while channels provide a generic interface for sending and receiving messages with flexible routing capabilities.

Capabilities

Topic Management

Kafka topic interface for publishing and subscribing to message streams. Topics provide type-safe message handling with configurable partitioning, serialization, and Kafka-specific settings like replication factor and retention policies.

class Topic:
    def __init__(
        self,
        app: App,
        *,
        topic: str,
        key_type: type = None,
        value_type: type = None,
        key_serializer: str = None,
        value_serializer: str = None,
        partitions: int = None,
        retention: int = None,
        compacting: bool = None,
        deleting: bool = None,
        replicas: int = None,
        acks: bool = True,
        delivery_guarantee: str = 'at_least_once',
        maxsize: int = None,
        root: Topic = None,
        config: dict = None,
        **kwargs
    ):
        """
        Create a new Kafka topic.
        
        Args:
            app: The Faust application instance
            topic: Topic name in Kafka
            key_type: Type for message keys (for serialization)
            value_type: Type for message values (for serialization)
            key_serializer: Serializer name for keys
            value_serializer: Serializer name for values
            partitions: Number of topic partitions
            retention: Message retention time in seconds
            compacting: Enable log compaction
            deleting: Enable log deletion
            replicas: Replication factor
            acks: Acknowledgment level
            delivery_guarantee: Message delivery guarantee
            maxsize: Maximum queue size
            root: Parent topic for derived topics
            config: Additional Kafka topic configuration
        """

    async def send(
        self,
        key: any = None,
        value: any = None,
        *,
        partition: int = None,
        timestamp: float = None,
        headers: dict = None,
        schema: Schema = None,
        key_serializer: str = None,
        value_serializer: str = None,
        callback: callable = None,
        force: bool = False
    ) -> FutureMessage:
        """
        Send message to topic asynchronously.
        
        Args:
            key: Message key
            value: Message value
            partition: Target partition (optional)
            timestamp: Message timestamp
            headers: Message headers
            schema: Custom schema for serialization
            key_serializer: Override key serializer
            value_serializer: Override value serializer
            callback: Callback for send completion
            force: Force send even if app not started
            
        Returns:
            Future representing the send operation
        """

    def send_soon(
        self,
        key: any = None,
        value: any = None,
        *,
        partition: int = None,
        timestamp: float = None,
        headers: dict = None,
        schema: Schema = None,
        key_serializer: str = None,
        value_serializer: str = None,
        callback: callable = None,
        force: bool = False,
        eager_partitioning: bool = None
    ) -> FutureMessage:
        """
        Send message to topic without waiting for completion.
        
        Args:
            key: Message key
            value: Message value
            partition: Target partition (optional)
            timestamp: Message timestamp
            headers: Message headers
            schema: Custom schema for serialization
            key_serializer: Override key serializer
            value_serializer: Override value serializer
            callback: Callback for send completion
            force: Force send even if app not started
            eager_partitioning: Determine partition immediately
            
        Returns:
            Future representing the send operation
        """

    def stream(self, **kwargs) -> Stream:
        """
        Create a stream that consumes from this topic.
        
        Args:
            **kwargs: Stream configuration options
            
        Returns:
            Stream instance for processing messages
        """

    def events(self, **kwargs) -> Stream:
        """
        Create an event stream that consumes from this topic.
        
        Args:
            **kwargs: Stream configuration options
            
        Returns:
            Stream instance with Event objects
        """

    def get_partition_key(self, key: any, partition: int = None) -> int:
        """
        Get partition number for a given key.
        
        Args:
            key: Message key
            partition: Explicit partition override
            
        Returns:
            Partition number for the key
        """

    @property
    def name(self) -> str:
        """Topic name in Kafka."""

    @property
    def key_type(self) -> type:
        """Type for message keys."""

    @property
    def value_type(self) -> type:
        """Type for message values."""

    @property
    def partitions(self) -> int:
        """Number of topic partitions."""

    @property
    def config(self) -> dict:
        """Kafka topic configuration."""

Channel Interface

Generic communication channel interface for sending and receiving messages. Channels provide a unified abstraction over different transport mechanisms and can be used independently of Kafka topics for in-memory message passing or custom routing.

class Channel:
    def __init__(
        self,
        app: App,
        *,
        key_type: type = None,
        value_type: type = None,
        maxsize: int = None,
        **kwargs
    ):
        """
        Create a new communication channel.
        
        Args:
            app: The Faust application instance
            key_type: Type for message keys
            value_type: Type for message values
            maxsize: Maximum queue size
        """

    async def send(
        self,
        value: any = None,
        *,
        key: any = None,
        partition: int = None,
        timestamp: float = None,
        headers: dict = None,
        schema: Schema = None,
        key_serializer: str = None,
        value_serializer: str = None,
        callback: callable = None,
        force: bool = False
    ) -> any:
        """
        Send message to channel asynchronously.
        
        Args:
            value: Message value
            key: Message key (optional)
            partition: Target partition (optional)
            timestamp: Message timestamp
            headers: Message headers
            schema: Custom schema for serialization
            key_serializer: Override key serializer
            value_serializer: Override value serializer
            callback: Callback for send completion
            force: Force send even if app not started
            
        Returns:
            Result of the send operation
        """

    def send_soon(
        self,
        value: any = None,
        *,
        key: any = None,
        partition: int = None,
        timestamp: float = None,
        headers: dict = None,
        schema: Schema = None,
        key_serializer: str = None,
        value_serializer: str = None,
        callback: callable = None,
        force: bool = False,
        eager_partitioning: bool = None
    ) -> any:
        """
        Send message to channel without waiting for completion.
        
        Args:
            value: Message value
            key: Message key (optional)
            partition: Target partition (optional)
            timestamp: Message timestamp
            headers: Message headers
            schema: Custom schema for serialization
            key_serializer: Override key serializer
            value_serializer: Override value serializer
            callback: Callback for send completion
            force: Force send even if app not started
            eager_partitioning: Determine partition immediately
            
        Returns:
            Future representing the send operation
        """

    def stream(self, **kwargs) -> Stream:
        """
        Create a stream that consumes from this channel.
        
        Args:
            **kwargs: Stream configuration options
            
        Returns:
            Stream instance for processing messages
        """

    def events(self, **kwargs) -> Stream:
        """
        Create an event stream that consumes from this channel.
        
        Args:
            **kwargs: Stream configuration options
            
        Returns:
            Stream instance with Event objects
        """

    @property
    def key_type(self) -> type:
        """Type for message keys."""

    @property
    def value_type(self) -> type:
        """Type for message values."""

    @property
    def maxsize(self) -> int:
        """Maximum queue size."""

Topic Configuration

Advanced topic configuration and management utilities for Kafka-specific features and optimizations.

def create_topic(
    app: App,
    topic: str,
    *,
    partitions: int = None,
    replication_factor: int = None,
    config: dict = None,
    **kwargs
) -> Topic:
    """
    Create a topic with specific configuration.
    
    Args:
        app: Faust application instance
        topic: Topic name
        partitions: Number of partitions
        replication_factor: Replication factor
        config: Kafka topic configuration
        
    Returns:
        Configured Topic instance
    """

class TopicManager:
    def __init__(self, app: App):
        """
        Topic lifecycle management.
        
        Args:
            app: Faust application instance
        """

    async def create_topic(
        self,
        topic: str,
        partitions: int,
        replication_factor: int,
        **config
    ) -> None:
        """
        Create topic in Kafka cluster.
        
        Args:
            topic: Topic name
            partitions: Number of partitions
            replication_factor: Replication factor
            **config: Additional topic configuration
        """

    async def delete_topic(self, topic: str) -> None:
        """
        Delete topic from Kafka cluster.
        
        Args:
            topic: Topic name to delete
        """

    async def list_topics(self) -> list:
        """
        List all topics in Kafka cluster.
        
        Returns:
            List of topic names
        """

Usage Examples

Basic Topic Usage

import faust

app = faust.App('my-app', broker='kafka://localhost:9092')

# Define a topic with type annotations
orders_topic = app.topic('orders', value_type=dict)

# Send messages to topic
@app.timer(interval=5.0)
async def produce_orders():
    order = {'id': 123, 'product': 'widget', 'quantity': 5}
    await orders_topic.send(key='order-123', value=order)

# Consume from topic
@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        print(f"Processing order: {order}")

Advanced Topic Configuration

# Topic with custom configuration
events_topic = app.topic(
    'events',
    key_type=str,
    value_type=dict,
    partitions=8,
    retention=86400,  # 24 hours
    compacting=True,
    config={
        'cleanup.policy': 'compact',
        'segment.bytes': 104857600,  # 100MB
        'min.cleanable.dirty.ratio': 0.1
    }
)

# Send with custom serialization
await events_topic.send(
    key='user-123',
    value={'event': 'login', 'timestamp': time.time()},
    headers={'source': 'web', 'version': '1.0'},
    value_serializer='json'
)

Channel-based Communication

# In-memory channel for internal communication
notifications_channel = app.channel(value_type=dict)

@app.agent(notifications_channel)
async def handle_notifications(notifications):
    async for notification in notifications:
        print(f"Notification: {notification['message']}")

# Send to channel from anywhere in the application
async def send_notification(message: str, user_id: str):
    await notifications_channel.send({
        'message': message,
        'user_id': user_id,
        'timestamp': time.time()
    })

Topic Partitioning

# Custom partitioning logic
user_events_topic = app.topic('user-events', key_type=str, value_type=dict)

async def send_user_event(user_id: str, event_data: dict):
    # Ensure events for the same user go to the same partition
    partition = hash(user_id) % user_events_topic.partitions
    
    await user_events_topic.send(
        key=user_id,
        value=event_data,
        partition=partition
    )

Type Interfaces

from typing import Protocol, Optional, Dict, Any, Callable, AsyncIterator

class TopicT(Protocol):
    """Type interface for Topic."""
    
    name: str
    key_type: Optional[type]
    value_type: Optional[type]
    partitions: int
    
    async def send(
        self,
        key: Any = None,
        value: Any = None,
        *,
        partition: Optional[int] = None,
        **kwargs
    ) -> Any: ...
    
    def stream(self, **kwargs) -> 'StreamT': ...

class ChannelT(Protocol):
    """Type interface for Channel."""
    
    key_type: Optional[type]
    value_type: Optional[type]
    maxsize: Optional[int]
    
    async def send(
        self,
        value: Any = None,
        *,
        key: Any = None,
        **kwargs
    ) -> Any: ...
    
    def stream(self, **kwargs) -> 'StreamT': ...

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json