CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Overview
Eval results
Files

topic-operations.mddocs/

Topic Operations

Streaming data operations including topic creation, message publishing, message consuming, and topic administration.

Capabilities

Topic Client

The topic client provides comprehensive operations for managing topics and streaming data.

class TopicClient:
    def __init__(self, driver: Driver, settings: TopicClientSettings = None):
        """
        Create topic operations client.
        
        Args:
            driver (Driver): YDB driver instance
            settings (TopicClientSettings, optional): Client configuration
        """

    def create_topic(
        self,
        path: str,
        settings: CreateTopicSettings
    ):
        """
        Create a new topic.
        
        Args:
            path (str): Topic path
            settings (CreateTopicSettings): Topic creation configuration
        """

    def describe_topic(
        self,
        path: str,
        settings: DescribeTopicSettings = None
    ) -> TopicDescription:
        """
        Get topic description and metadata.
        
        Args:
            path (str): Topic path
            settings (DescribeTopicSettings, optional): Description settings
            
        Returns:
            TopicDescription: Topic configuration and metadata
        """

    def alter_topic(
        self,
        path: str,
        settings: AlterTopicSettings
    ):
        """
        Alter topic configuration.
        
        Args:
            path (str): Topic path
            settings (AlterTopicSettings): Alteration settings
        """

    def drop_topic(
        self,
        path: str,
        settings: DropTopicSettings = None
    ):
        """
        Delete topic.
        
        Args:
            path (str): Topic path
            settings (DropTopicSettings, optional): Drop settings
        """

    def writer(
        self,
        topic_path: str,
        producer_id: str = None,
        settings: TopicWriterSettings = None
    ) -> TopicWriter:
        """
        Create topic writer for publishing messages.
        
        Args:
            topic_path (str): Topic path to write to
            producer_id (str, optional): Producer identifier
            settings (TopicWriterSettings, optional): Writer configuration
            
        Returns:
            TopicWriter: Topic writer instance
        """

    def reader(
        self,
        settings: TopicReaderSettings
    ) -> TopicReader:
        """
        Create topic reader for consuming messages.
        
        Args:
            settings (TopicReaderSettings): Reader configuration
            
        Returns:
            TopicReader: Topic reader instance
        """

class TopicClientSettings:
    def __init__(
        self,
        default_compression_codec: TopicCodec = None,
        request_timeout: float = None
    ):
        """
        Topic client configuration.
        
        Args:
            default_compression_codec (TopicCodec, optional): Default compression
            request_timeout (float, optional): Default request timeout
        """

Topic Writer

Publisher interface for sending messages to topics with batching and delivery guarantees.

class TopicWriter:
    def __init__(
        self,
        driver: Driver,
        topic_path: str,
        producer_id: str = None,
        settings: TopicWriterSettings = None
    ):
        """
        Create topic writer for message publishing.
        
        Args:
            driver (Driver): YDB driver instance
            topic_path (str): Topic path to write to
            producer_id (str, optional): Producer identifier
            settings (TopicWriterSettings, optional): Writer configuration
        """

    def write(
        self,
        messages: Union[TopicWriterMessage, List[TopicWriterMessage]]
    ) -> TopicWriteResult:
        """
        Write messages to topic.
        
        Args:
            messages (Union[TopicWriterMessage, List[TopicWriterMessage]]): Messages to write
            
        Returns:
            TopicWriteResult: Write operation result
        """

    def flush(self) -> TopicWriteResult:
        """
        Flush pending messages and wait for acknowledgments.
        
        Returns:
            TopicWriteResult: Flush operation result
        """

    def close(self, timeout: float = None):
        """
        Close writer and flush pending messages.
        
        Args:
            timeout (float, optional): Close timeout
        """

    def __enter__(self) -> 'TopicWriter':
        """
        Enter context manager.
        
        Returns:
            TopicWriter: Writer instance
        """

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Exit context manager and close writer.
        """

    @property
    def init_info(self) -> TopicWriterInitInfo:
        """Get writer initialization information."""

class TopicWriterMessage:
    def __init__(
        self,
        data: bytes,
        seq_no: int = None,
        created_at: datetime = None,
        message_group_id: str = None,
        metadata: Dict[str, str] = None,
        codec: TopicCodec = None
    ):
        """
        Message for topic publishing.
        
        Args:
            data (bytes): Message payload
            seq_no (int, optional): Sequence number for ordering
            created_at (datetime, optional): Message creation timestamp
            message_group_id (str, optional): Message group identifier
            metadata (Dict[str, str], optional): Message metadata
            codec (TopicCodec, optional): Message compression codec
        """

    @property
    def data(self) -> bytes:
        """Message payload."""

    @property
    def seq_no(self) -> int:
        """Message sequence number."""

    @property
    def created_at(self) -> datetime:
        """Message creation timestamp."""

    @property
    def message_group_id(self) -> str:
        """Message group identifier."""

    @property
    def metadata(self) -> Dict[str, str]:
        """Message metadata."""

    @property
    def codec(self) -> TopicCodec:
        """Message compression codec."""

class TopicWriterSettings:
    def __init__(
        self,
        producer_id: str = None,
        write_session_meta: Dict[str, str] = None,
        codec: TopicCodec = None,
        get_last_seq_no: bool = False,
        update_token_interval: float = None,
        partition_id: int = None,
        message_group_id: str = None
    ):
        """
        Topic writer configuration.
        
        Args:
            producer_id (str, optional): Producer identifier
            write_session_meta (Dict[str, str], optional): Session metadata
            codec (TopicCodec, optional): Default compression codec
            get_last_seq_no (bool): Retrieve last sequence number on init
            update_token_interval (float, optional): Token update interval
            partition_id (int, optional): Target partition ID
            message_group_id (str, optional): Default message group ID
        """

class TopicWriteResult:
    def __init__(
        self,
        acks: List[TopicWriterAck] = None,
        errors: List[TopicWriterError] = None
    ):
        """
        Result of topic write operation.
        
        Args:
            acks (List[TopicWriterAck], optional): Acknowledged messages
            errors (List[TopicWriterError], optional): Write errors
        """

    @property
    def acks(self) -> List[TopicWriterAck]:
        """Acknowledged messages."""

    @property
    def errors(self) -> List[TopicWriterError]:
        """Write errors."""

    @property
    def has_errors(self) -> bool:
        """True if write operation had errors."""

class TopicWriterInitInfo:
    def __init__(
        self,
        topic_path: str,
        producer_id: str,
        last_seq_no: int = None,
        session_id: str = None
    ):
        """
        Topic writer initialization information.
        
        Args:
            topic_path (str): Topic path
            producer_id (str): Producer identifier
            last_seq_no (int, optional): Last sequence number
            session_id (str, optional): Write session identifier
        """

    @property
    def topic_path(self) -> str:
        """Topic path."""

    @property
    def producer_id(self) -> str:
        """Producer identifier."""

    @property
    def last_seq_no(self) -> int:
        """Last sequence number."""

    @property
    def session_id(self) -> str:
        """Write session identifier."""

Topic Reader

Consumer interface for reading messages from topics with automatic partition assignment and offset management.

class TopicReader:
    def __init__(
        self,
        driver: Driver,
        settings: TopicReaderSettings
    ):
        """
        Create topic reader for message consumption.
        
        Args:
            driver (Driver): YDB driver instance
            settings (TopicReaderSettings): Reader configuration
        """

    def receive_message(self, timeout: float = None) -> TopicReaderMessage:
        """
        Receive next message from topic.
        
        Args:
            timeout (float, optional): Receive timeout
            
        Returns:
            TopicReaderMessage: Received message
        """

    def receive_batch(self, timeout: float = None) -> TopicReaderBatch:
        """
        Receive batch of messages from topic.
        
        Args:
            timeout (float, optional): Receive timeout
            
        Returns:
            TopicReaderBatch: Batch of messages
        """

    def commit(self, message: TopicReaderMessage):
        """
        Commit message processing.
        
        Args:
            message (TopicReaderMessage): Message to commit
        """

    def commit_batch(self, batch: TopicReaderBatch):
        """
        Commit batch processing.
        
        Args:
            batch (TopicReaderBatch): Batch to commit
        """

    def close(self, timeout: float = None):
        """
        Close reader and release resources.
        
        Args:
            timeout (float, optional): Close timeout
        """

    def __enter__(self) -> 'TopicReader':
        """Enter context manager."""

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context manager and close reader."""

    def __iter__(self) -> Iterator[TopicReaderMessage]:
        """
        Iterate over messages.
        
        Returns:
            Iterator[TopicReaderMessage]: Message iterator
        """

class TopicReaderMessage:
    def __init__(
        self,
        data: bytes,
        seq_no: int,
        created_at: datetime,
        message_group_id: str = None,
        offset: int = None,
        metadata: Dict[str, str] = None,
        partition_session: 'PartitionSession' = None
    ):
        """
        Message received from topic.
        
        Args:
            data (bytes): Message payload
            seq_no (int): Message sequence number
            created_at (datetime): Message creation timestamp
            message_group_id (str, optional): Message group identifier
            offset (int, optional): Message offset in partition
            metadata (Dict[str, str], optional): Message metadata
            partition_session (PartitionSession, optional): Source partition session
        """

    @property
    def data(self) -> bytes:
        """Message payload."""

    @property
    def seq_no(self) -> int:
        """Message sequence number."""

    @property
    def created_at(self) -> datetime:
        """Message creation timestamp."""

    @property
    def message_group_id(self) -> str:
        """Message group identifier."""

    @property
    def offset(self) -> int:
        """Message offset in partition."""

    @property
    def metadata(self) -> Dict[str, str]:
        """Message metadata."""

    @property
    def partition_session(self) -> 'PartitionSession':
        """Source partition session."""

    def commit(self):
        """Commit this message."""

class TopicReaderBatch:
    def __init__(
        self,
        messages: List[TopicReaderMessage],
        partition_session: 'PartitionSession'
    ):
        """
        Batch of messages from topic.
        
        Args:
            messages (List[TopicReaderMessage]): Messages in batch
            partition_session (PartitionSession): Source partition session
        """

    @property
    def messages(self) -> List[TopicReaderMessage]:
        """Messages in batch."""

    @property
    def partition_session(self) -> 'PartitionSession':
        """Source partition session."""

    def commit(self):
        """Commit entire batch."""

    def __iter__(self) -> Iterator[TopicReaderMessage]:
        """Iterate over messages in batch."""

    def __len__(self) -> int:
        """Get number of messages in batch."""

class TopicReaderSettings:
    def __init__(
        self,
        consumer_name: str,
        topics: List[TopicReaderSelector],
        buffer_size_bytes: int = None,
        max_memory_usage_bytes: int = None,
        max_lag: timedelta = None,
        read_timeout: float = None,
        commit_timeout: float = None,
        with_metadata_fields: List[str] = None,
        decompress_messages: bool = True
    ):
        """
        Topic reader configuration.
        
        Args:
            consumer_name (str): Consumer identifier
            topics (List[TopicReaderSelector]): Topics to read from
            buffer_size_bytes (int, optional): Read buffer size
            max_memory_usage_bytes (int, optional): Maximum memory usage
            max_lag (timedelta, optional): Maximum allowed lag
            read_timeout (float, optional): Read operation timeout
            commit_timeout (float, optional): Commit operation timeout
            with_metadata_fields (List[str], optional): Metadata fields to include
            decompress_messages (bool): Automatically decompress messages
        """

class TopicReaderSelector:
    def __init__(
        self,
        path: str,
        partitions: List[int] = None,
        read_from: datetime = None,
        max_lag: timedelta = None
    ):
        """
        Topic selection for reader.
        
        Args:
            path (str): Topic path
            partitions (List[int], optional): Specific partitions to read
            read_from (datetime, optional): Start reading from timestamp
            max_lag (timedelta, optional): Maximum allowed lag for this topic
        """

    @property
    def path(self) -> str:
        """Topic path."""

    @property
    def partitions(self) -> List[int]:
        """Specific partitions to read."""

    @property
    def read_from(self) -> datetime:
        """Start reading from timestamp."""

    @property
    def max_lag(self) -> timedelta:
        """Maximum allowed lag."""

Topic Configuration

Topic creation and management settings with partitioning and retention policies.

class TopicDescription:
    def __init__(
        self,
        path: str,
        partitions_count: int = None,
        retention_period: timedelta = None,
        retention_storage_mb: int = None,
        supported_codecs: List[TopicCodec] = None,
        partition_write_speed_bytes_per_second: int = None,
        partition_write_burst_bytes: int = None,
        attributes: Dict[str, str] = None,
        consumers: List[TopicConsumer] = None,
        metering_mode: TopicMeteringMode = None,
        partition_count_limit: int = None
    ):
        """
        Topic configuration and metadata.
        
        Args:
            path (str): Topic path
            partitions_count (int, optional): Number of partitions
            retention_period (timedelta, optional): Message retention period
            retention_storage_mb (int, optional): Storage retention limit in MB
            supported_codecs (List[TopicCodec], optional): Supported compression codecs
            partition_write_speed_bytes_per_second (int, optional): Write speed limit per partition
            partition_write_burst_bytes (int, optional): Write burst limit per partition
            attributes (Dict[str, str], optional): Topic attributes
            consumers (List[TopicConsumer], optional): Topic consumers
            metering_mode (TopicMeteringMode, optional): Metering mode
            partition_count_limit (int, optional): Maximum partition count
        """

    @property
    def path(self) -> str:
        """Topic path."""

    @property
    def partitions_count(self) -> int:
        """Number of partitions."""

    @property
    def retention_period(self) -> timedelta:
        """Message retention period."""

    @property
    def supported_codecs(self) -> List[TopicCodec]:
        """Supported compression codecs."""

    @property
    def consumers(self) -> List[TopicConsumer]:
        """Topic consumers."""

class CreateTopicSettings:
    def __init__(
        self,
        partitions_count: int = 1,
        retention_period: timedelta = None,
        retention_storage_mb: int = None,
        supported_codecs: List[TopicCodec] = None,
        partition_write_speed_bytes_per_second: int = None,
        partition_write_burst_bytes: int = None,
        attributes: Dict[str, str] = None,
        consumers: List[TopicConsumer] = None,
        metering_mode: TopicMeteringMode = None
    ):
        """
        Settings for topic creation.
        
        Args:
            partitions_count (int): Number of partitions to create
            retention_period (timedelta, optional): Message retention period
            retention_storage_mb (int, optional): Storage retention limit
            supported_codecs (List[TopicCodec], optional): Allowed compression codecs
            partition_write_speed_bytes_per_second (int, optional): Write speed limit
            partition_write_burst_bytes (int, optional): Write burst limit
            attributes (Dict[str, str], optional): Topic attributes
            consumers (List[TopicConsumer], optional): Initial consumers
            metering_mode (TopicMeteringMode, optional): Billing metering mode
        """

class AlterTopicSettings:
    def __init__(
        self,
        alter_partitions_count: int = None,
        set_retention_period: timedelta = None,
        set_retention_storage_mb: int = None,
        set_supported_codecs: List[TopicCodec] = None,
        set_partition_write_speed_bytes_per_second: int = None,
        set_partition_write_burst_bytes: int = None,
        alter_attributes: Dict[str, str] = None,
        add_consumers: List[TopicConsumer] = None,
        drop_consumers: List[str] = None,
        alter_consumers: List[TopicAlterConsumer] = None,
        set_metering_mode: TopicMeteringMode = None
    ):
        """
        Settings for topic alteration.
        
        Args:
            alter_partitions_count (int, optional): New partition count
            set_retention_period (timedelta, optional): New retention period
            set_retention_storage_mb (int, optional): New storage retention limit
            set_supported_codecs (List[TopicCodec], optional): New codec list
            set_partition_write_speed_bytes_per_second (int, optional): New speed limit
            set_partition_write_burst_bytes (int, optional): New burst limit
            alter_attributes (Dict[str, str], optional): Attribute changes
            add_consumers (List[TopicConsumer], optional): Consumers to add
            drop_consumers (List[str], optional): Consumer names to remove
            alter_consumers (List[TopicAlterConsumer], optional): Consumer modifications
            set_metering_mode (TopicMeteringMode, optional): New metering mode
        """

class TopicConsumer:
    def __init__(
        self,
        name: str,
        supported_codecs: List[TopicCodec] = None,
        read_from: datetime = None,
        attributes: Dict[str, str] = None,
        important: bool = False
    ):
        """
        Topic consumer configuration.
        
        Args:
            name (str): Consumer name
            supported_codecs (List[TopicCodec], optional): Supported codecs
            read_from (datetime, optional): Start reading from timestamp
            attributes (Dict[str, str], optional): Consumer attributes
            important (bool): Whether consumer is important for retention
        """

    @property
    def name(self) -> str:
        """Consumer name."""

    @property
    def supported_codecs(self) -> List[TopicCodec]:
        """Supported compression codecs."""

    @property
    def read_from(self) -> datetime:
        """Start reading timestamp."""

    @property
    def important(self) -> bool:
        """Whether consumer is important."""

class TopicCodec(enum.Enum):
    """Message compression codecs."""
    RAW = "raw"
    GZIP = "gzip"
    LZOP = "lzop"
    ZSTD = "zstd"

class TopicMeteringMode(enum.Enum):
    """Topic metering modes for billing."""
    UNSPECIFIED = "unspecified"
    RESERVED_CAPACITY = "reserved_capacity"
    REQUEST_UNITS = "request_units"

Async Topic Operations

Asynchronous versions of topic operations for high-performance applications.

class TopicWriterAsyncIO:
    def __init__(
        self,
        driver: Driver,
        topic_path: str,
        producer_id: str = None,
        settings: TopicWriterSettings = None
    ):
        """
        Asynchronous topic writer.
        
        Args:
            driver (Driver): Async YDB driver instance
            topic_path (str): Topic path to write to
            producer_id (str, optional): Producer identifier
            settings (TopicWriterSettings, optional): Writer configuration
        """

    async def __aenter__(self) -> 'TopicWriterAsyncIO':
        """Enter async context manager."""

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit async context manager."""

    async def write(
        self,
        messages: Union[TopicWriterMessage, List[TopicWriterMessage]]
    ) -> TopicWriteResult:
        """
        Write messages asynchronously.
        
        Args:
            messages (Union[TopicWriterMessage, List[TopicWriterMessage]]): Messages to write
            
        Returns:
            TopicWriteResult: Write operation result
        """

    async def flush(self) -> TopicWriteResult:
        """Flush pending messages asynchronously."""

    async def close(self, timeout: float = None):
        """Close writer asynchronously."""

class TopicReaderAsyncIO:
    def __init__(
        self,
        driver: Driver,
        settings: TopicReaderSettings
    ):
        """
        Asynchronous topic reader.
        
        Args:
            driver (Driver): Async YDB driver instance
            settings (TopicReaderSettings): Reader configuration
        """

    async def __aenter__(self) -> 'TopicReaderAsyncIO':
        """Enter async context manager."""

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit async context manager."""

    async def receive_message(self, timeout: float = None) -> TopicReaderMessage:
        """
        Receive message asynchronously.
        
        Args:
            timeout (float, optional): Receive timeout
            
        Returns:
            TopicReaderMessage: Received message
        """

    async def receive_batch(self, timeout: float = None) -> TopicReaderBatch:
        """
        Receive batch asynchronously.
        
        Args:
            timeout (float, optional): Receive timeout
            
        Returns:
            TopicReaderBatch: Batch of messages
        """

    def __aiter__(self) -> AsyncIterator[TopicReaderMessage]:
        """Async iterator over messages."""

    async def __anext__(self) -> TopicReaderMessage:
        """Get next message asynchronously."""

    async def close(self, timeout: float = None):
        """Close reader asynchronously."""

class TopicClientAsyncIO:
    def __init__(self, driver: Driver, settings: TopicClientSettings = None):
        """Asynchronous topic client."""

    async def create_topic(self, path: str, settings: CreateTopicSettings):
        """Create topic asynchronously."""

    async def describe_topic(
        self,
        path: str,
        settings: DescribeTopicSettings = None
    ) -> TopicDescription:
        """Describe topic asynchronously."""

    async def alter_topic(self, path: str, settings: AlterTopicSettings):
        """Alter topic asynchronously."""

    async def drop_topic(self, path: str, settings: DropTopicSettings = None):
        """Drop topic asynchronously."""

Usage Examples

Basic Topic Publishing

import ydb

# Create driver and topic client
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
driver.wait(fail_fast=True)

topic_client = ydb.TopicClient(driver)

# Create topic
create_settings = ydb.CreateTopicSettings(
    partitions_count=3,
    retention_period=timedelta(days=7),
    supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP]
)

topic_client.create_topic("/local/events", create_settings)

# Write messages
with topic_client.writer("/local/events", producer_id="producer-1") as writer:
    # Write single message
    message = ydb.TopicWriterMessage(
        data=b'{"event": "user_login", "user_id": 123}',
        message_group_id="user-123"
    )
    writer.write(message)
    
    # Write batch of messages
    messages = [
        ydb.TopicWriterMessage(
            data=f'{{"event": "page_view", "page": "/home", "user_id": {i}}}'.encode(),
            message_group_id=f"user-{i}"
        )
        for i in range(100, 110)
    ]
    
    result = writer.write(messages)
    writer.flush()
    
    print(f"Written {len(result.acks)} messages")

Message Consumption

# Create consumer
consumer_settings = ydb.TopicReaderSettings(
    consumer_name="analytics-consumer",
    topics=[
        ydb.TopicReaderSelector(
            path="/local/events",
            read_from=datetime.now() - timedelta(hours=1)
        )
    ],
    buffer_size_bytes=1024*1024,  # 1MB buffer
    max_lag=timedelta(minutes=5)
)

# Read messages
with topic_client.reader(consumer_settings) as reader:
    for message in reader:
        try:
            # Process message
            event_data = json.loads(message.data.decode())
            print(f"Processing event: {event_data}")
            
            # Commit message after successful processing
            message.commit()
            
        except json.JSONDecodeError:
            print(f"Failed to parse message: {message.data}")
            # Still commit to skip malformed messages
            message.commit()
        except Exception as e:
            print(f"Error processing message: {e}")
            # Don't commit - message will be redelivered
            break

Batch Processing

# Process messages in batches for better throughput
with topic_client.reader(consumer_settings) as reader:
    while True:
        try:
            batch = reader.receive_batch(timeout=30.0)
            
            if not batch.messages:
                continue
                
            # Process batch
            events = []
            for message in batch:
                try:
                    event = json.loads(message.data.decode())
                    events.append(event)
                except json.JSONDecodeError:
                    print(f"Skipping malformed message: {message.data}")
            
            # Bulk insert to database
            if events:
                process_events_batch(events)
                
            # Commit entire batch
            batch.commit()
            print(f"Processed batch of {len(batch)} messages")
            
        except ydb.TimeoutError:
            print("No messages received in timeout period")
            continue
        except KeyboardInterrupt:
            print("Shutting down consumer...")
            break

Async Topic Operations

import asyncio
import ydb.aio as ydb_aio

async def async_topic_producer():
    async with ydb_aio.Driver(...) as driver:
        topic_client = ydb.TopicClientAsyncIO(driver)
        
        # Create topic
        await topic_client.create_topic(
            "/local/async_events",
            ydb.CreateTopicSettings(partitions_count=5)
        )
        
        # Write messages asynchronously
        async with ydb.TopicWriterAsyncIO(
            driver,
            "/local/async_events",
            producer_id="async-producer"
        ) as writer:
            
            # Generate and write messages
            for i in range(1000):
                message = ydb.TopicWriterMessage(
                    data=f'{{"id": {i}, "timestamp": "{datetime.now().isoformat()}"}}'.encode(),
                    seq_no=i
                )
                
                await writer.write(message)
                
                if i % 100 == 0:
                    await writer.flush()
                    print(f"Written {i+1} messages")

async def async_topic_consumer():
    async with ydb_aio.Driver(...) as driver:
        reader_settings = ydb.TopicReaderSettings(
            consumer_name="async-consumer",
            topics=[ydb.TopicReaderSelector("/local/async_events")]
        )
        
        async with ydb.TopicReaderAsyncIO(driver, reader_settings) as reader:
            async for message in reader:
                # Process message asynchronously
                await process_message_async(message.data)
                message.commit()

# Run async operations
asyncio.run(async_topic_producer())
asyncio.run(async_topic_consumer())

Topic Administration

def manage_topic_lifecycle():
    topic_client = ydb.TopicClient(driver)
    topic_path = "/local/user_events"
    
    # Create topic with consumers
    consumers = [
        ydb.TopicConsumer(
            name="analytics",
            important=True,
            read_from=datetime.now()
        ),
        ydb.TopicConsumer(
            name="archival",
            important=False
        )
    ]
    
    create_settings = ydb.CreateTopicSettings(
        partitions_count=10,
        retention_period=timedelta(days=30),
        retention_storage_mb=10000,
        consumers=consumers,
        attributes={"team": "analytics", "env": "prod"}
    )
    
    topic_client.create_topic(topic_path, create_settings)
    
    # Describe topic
    description = topic_client.describe_topic(topic_path)
    print(f"Topic: {description.path}")
    print(f"Partitions: {description.partitions_count}")
    print(f"Retention: {description.retention_period}")
    print(f"Consumers: {[c.name for c in description.consumers]}")
    
    # Alter topic - add partition and consumer
    alter_settings = ydb.AlterTopicSettings(
        alter_partitions_count=15,
        add_consumers=[
            ydb.TopicConsumer(name="realtime", important=True)
        ],
        set_retention_period=timedelta(days=45)
    )
    
    topic_client.alter_topic(topic_path, alter_settings)
    
    # Verify changes
    updated_description = topic_client.describe_topic(topic_path)
    print(f"Updated partitions: {updated_description.partitions_count}")
    print(f"Updated consumers: {[c.name for c in updated_description.consumers]}")

manage_topic_lifecycle()

Error Handling and Monitoring

def robust_topic_processing():
    reader_settings = ydb.TopicReaderSettings(
        consumer_name="robust-consumer",
        topics=[ydb.TopicReaderSelector("/local/events")],
        read_timeout=10.0,
        commit_timeout=5.0
    )
    
    retry_count = 0
    max_retries = 3
    
    while retry_count < max_retries:
        try:
            with topic_client.reader(reader_settings) as reader:
                retry_count = 0  # Reset on successful connection
                
                while True:
                    try:
                        message = reader.receive_message(timeout=30.0)
                        
                        # Process with timeout
                        process_start = time.time()
                        result = process_message_with_timeout(message.data, timeout=10.0)
                        process_time = time.time() - process_start
                        
                        if result:
                            message.commit()
                            print(f"Processed message in {process_time:.2f}s")
                        else:
                            print("Processing failed, skipping message")
                            message.commit()  # Skip failed messages
                            
                    except ydb.TimeoutError:
                        print("No messages received, continuing...")
                        continue
                    except ydb.TopicReaderPartitionExpiredError as e:
                        print(f"Partition expired: {e}, reconnecting...")
                        break
                    except Exception as e:
                        print(f"Unexpected error: {e}")
                        time.sleep(1.0)
                        
        except ydb.ConnectionError as e:
            retry_count += 1
            backoff = min(2 ** retry_count, 30)
            print(f"Connection failed, retrying in {backoff}s... ({retry_count}/{max_retries})")
            time.sleep(backoff)
        except KeyboardInterrupt:
            print("Shutting down gracefully...")
            break
    
    if retry_count >= max_retries:
        print("Max retries exceeded, giving up")

robust_topic_processing()

Type Definitions

# Type aliases for topic operations
TopicPath = str
ProducerId = str
ConsumerName = str
MessageGroupId = str
PartitionId = int
MessageOffset = int
SequenceNumber = int

# Message handling
MessageData = bytes
MessageMetadata = Dict[str, str]
MessageHandler = Callable[[TopicReaderMessage], bool]
AsyncMessageHandler = Callable[[TopicReaderMessage], Awaitable[bool]]

# Batch processing
BatchProcessor = Callable[[List[TopicReaderMessage]], bool]
AsyncBatchProcessor = Callable[[List[TopicReaderMessage]], Awaitable[bool]]

Install with Tessl CLI

npx tessl i tessl/pypi-ydb

docs

async-operations.md

authentication.md

data-types.md

dbapi-interface.md

driver-connection.md

error-handling.md

index.md

query-service.md

schema-operations.md

sqlalchemy-integration.md

table-operations.md

topic-operations.md

tile.json