Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
—
Topic and channel management for message distribution in Faust applications. Topics represent Kafka topics with configuration, partitioning, and serialization support, while channels provide a generic interface for sending and receiving messages with flexible routing capabilities.
Kafka topic interface for publishing and subscribing to message streams. Topics provide type-safe message handling with configurable partitioning, serialization, and Kafka-specific settings like replication factor and retention policies.
class Topic:
def __init__(
self,
app: App,
*,
topic: str,
key_type: type = None,
value_type: type = None,
key_serializer: str = None,
value_serializer: str = None,
partitions: int = None,
retention: int = None,
compacting: bool = None,
deleting: bool = None,
replicas: int = None,
acks: bool = True,
delivery_guarantee: str = 'at_least_once',
maxsize: int = None,
root: Topic = None,
config: dict = None,
**kwargs
):
"""
Create a new Kafka topic.
Args:
app: The Faust application instance
topic: Topic name in Kafka
key_type: Type for message keys (for serialization)
value_type: Type for message values (for serialization)
key_serializer: Serializer name for keys
value_serializer: Serializer name for values
partitions: Number of topic partitions
retention: Message retention time in seconds
compacting: Enable log compaction
deleting: Enable log deletion
replicas: Replication factor
acks: Acknowledgment level
delivery_guarantee: Message delivery guarantee
maxsize: Maximum queue size
root: Parent topic for derived topics
config: Additional Kafka topic configuration
"""
async def send(
self,
key: any = None,
value: any = None,
*,
partition: int = None,
timestamp: float = None,
headers: dict = None,
schema: Schema = None,
key_serializer: str = None,
value_serializer: str = None,
callback: callable = None,
force: bool = False
) -> FutureMessage:
"""
Send message to topic asynchronously.
Args:
key: Message key
value: Message value
partition: Target partition (optional)
timestamp: Message timestamp
headers: Message headers
schema: Custom schema for serialization
key_serializer: Override key serializer
value_serializer: Override value serializer
callback: Callback for send completion
force: Force send even if app not started
Returns:
Future representing the send operation
"""
def send_soon(
self,
key: any = None,
value: any = None,
*,
partition: int = None,
timestamp: float = None,
headers: dict = None,
schema: Schema = None,
key_serializer: str = None,
value_serializer: str = None,
callback: callable = None,
force: bool = False,
eager_partitioning: bool = None
) -> FutureMessage:
"""
Send message to topic without waiting for completion.
Args:
key: Message key
value: Message value
partition: Target partition (optional)
timestamp: Message timestamp
headers: Message headers
schema: Custom schema for serialization
key_serializer: Override key serializer
value_serializer: Override value serializer
callback: Callback for send completion
force: Force send even if app not started
eager_partitioning: Determine partition immediately
Returns:
Future representing the send operation
"""
def stream(self, **kwargs) -> Stream:
"""
Create a stream that consumes from this topic.
Args:
**kwargs: Stream configuration options
Returns:
Stream instance for processing messages
"""
def events(self, **kwargs) -> Stream:
"""
Create an event stream that consumes from this topic.
Args:
**kwargs: Stream configuration options
Returns:
Stream instance with Event objects
"""
def get_partition_key(self, key: any, partition: int = None) -> int:
"""
Get partition number for a given key.
Args:
key: Message key
partition: Explicit partition override
Returns:
Partition number for the key
"""
@property
def name(self) -> str:
"""Topic name in Kafka."""
@property
def key_type(self) -> type:
"""Type for message keys."""
@property
def value_type(self) -> type:
"""Type for message values."""
@property
def partitions(self) -> int:
"""Number of topic partitions."""
@property
def config(self) -> dict:
"""Kafka topic configuration."""Generic communication channel interface for sending and receiving messages. Channels provide a unified abstraction over different transport mechanisms and can be used independently of Kafka topics for in-memory message passing or custom routing.
class Channel:
def __init__(
self,
app: App,
*,
key_type: type = None,
value_type: type = None,
maxsize: int = None,
**kwargs
):
"""
Create a new communication channel.
Args:
app: The Faust application instance
key_type: Type for message keys
value_type: Type for message values
maxsize: Maximum queue size
"""
async def send(
self,
value: any = None,
*,
key: any = None,
partition: int = None,
timestamp: float = None,
headers: dict = None,
schema: Schema = None,
key_serializer: str = None,
value_serializer: str = None,
callback: callable = None,
force: bool = False
) -> any:
"""
Send message to channel asynchronously.
Args:
value: Message value
key: Message key (optional)
partition: Target partition (optional)
timestamp: Message timestamp
headers: Message headers
schema: Custom schema for serialization
key_serializer: Override key serializer
value_serializer: Override value serializer
callback: Callback for send completion
force: Force send even if app not started
Returns:
Result of the send operation
"""
def send_soon(
self,
value: any = None,
*,
key: any = None,
partition: int = None,
timestamp: float = None,
headers: dict = None,
schema: Schema = None,
key_serializer: str = None,
value_serializer: str = None,
callback: callable = None,
force: bool = False,
eager_partitioning: bool = None
) -> any:
"""
Send message to channel without waiting for completion.
Args:
value: Message value
key: Message key (optional)
partition: Target partition (optional)
timestamp: Message timestamp
headers: Message headers
schema: Custom schema for serialization
key_serializer: Override key serializer
value_serializer: Override value serializer
callback: Callback for send completion
force: Force send even if app not started
eager_partitioning: Determine partition immediately
Returns:
Future representing the send operation
"""
def stream(self, **kwargs) -> Stream:
"""
Create a stream that consumes from this channel.
Args:
**kwargs: Stream configuration options
Returns:
Stream instance for processing messages
"""
def events(self, **kwargs) -> Stream:
"""
Create an event stream that consumes from this channel.
Args:
**kwargs: Stream configuration options
Returns:
Stream instance with Event objects
"""
@property
def key_type(self) -> type:
"""Type for message keys."""
@property
def value_type(self) -> type:
"""Type for message values."""
@property
def maxsize(self) -> int:
"""Maximum queue size."""Advanced topic configuration and management utilities for Kafka-specific features and optimizations.
def create_topic(
app: App,
topic: str,
*,
partitions: int = None,
replication_factor: int = None,
config: dict = None,
**kwargs
) -> Topic:
"""
Create a topic with specific configuration.
Args:
app: Faust application instance
topic: Topic name
partitions: Number of partitions
replication_factor: Replication factor
config: Kafka topic configuration
Returns:
Configured Topic instance
"""
class TopicManager:
def __init__(self, app: App):
"""
Topic lifecycle management.
Args:
app: Faust application instance
"""
async def create_topic(
self,
topic: str,
partitions: int,
replication_factor: int,
**config
) -> None:
"""
Create topic in Kafka cluster.
Args:
topic: Topic name
partitions: Number of partitions
replication_factor: Replication factor
**config: Additional topic configuration
"""
async def delete_topic(self, topic: str) -> None:
"""
Delete topic from Kafka cluster.
Args:
topic: Topic name to delete
"""
async def list_topics(self) -> list:
"""
List all topics in Kafka cluster.
Returns:
List of topic names
"""import faust
app = faust.App('my-app', broker='kafka://localhost:9092')
# Define a topic with type annotations
orders_topic = app.topic('orders', value_type=dict)
# Send messages to topic
@app.timer(interval=5.0)
async def produce_orders():
order = {'id': 123, 'product': 'widget', 'quantity': 5}
await orders_topic.send(key='order-123', value=order)
# Consume from topic
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
print(f"Processing order: {order}")# Topic with custom configuration
events_topic = app.topic(
'events',
key_type=str,
value_type=dict,
partitions=8,
retention=86400, # 24 hours
compacting=True,
config={
'cleanup.policy': 'compact',
'segment.bytes': 104857600, # 100MB
'min.cleanable.dirty.ratio': 0.1
}
)
# Send with custom serialization
await events_topic.send(
key='user-123',
value={'event': 'login', 'timestamp': time.time()},
headers={'source': 'web', 'version': '1.0'},
value_serializer='json'
)# In-memory channel for internal communication
notifications_channel = app.channel(value_type=dict)
@app.agent(notifications_channel)
async def handle_notifications(notifications):
async for notification in notifications:
print(f"Notification: {notification['message']}")
# Send to channel from anywhere in the application
async def send_notification(message: str, user_id: str):
await notifications_channel.send({
'message': message,
'user_id': user_id,
'timestamp': time.time()
})# Custom partitioning logic
user_events_topic = app.topic('user-events', key_type=str, value_type=dict)
async def send_user_event(user_id: str, event_data: dict):
# Ensure events for the same user go to the same partition
partition = hash(user_id) % user_events_topic.partitions
await user_events_topic.send(
key=user_id,
value=event_data,
partition=partition
)from typing import Protocol, Optional, Dict, Any, Callable, AsyncIterator
class TopicT(Protocol):
"""Type interface for Topic."""
name: str
key_type: Optional[type]
value_type: Optional[type]
partitions: int
async def send(
self,
key: Any = None,
value: Any = None,
*,
partition: Optional[int] = None,
**kwargs
) -> Any: ...
def stream(self, **kwargs) -> 'StreamT': ...
class ChannelT(Protocol):
"""Type interface for Channel."""
key_type: Optional[type]
value_type: Optional[type]
maxsize: Optional[int]
async def send(
self,
value: Any = None,
*,
key: Any = None,
**kwargs
) -> Any: ...
def stream(self, **kwargs) -> 'StreamT': ...Install with Tessl CLI
npx tessl i tessl/pypi-faust