Officially supported Python client for YDB distributed SQL database
Streaming data operations including topic creation, message publishing, message consuming, and topic administration.
The topic client provides comprehensive operations for managing topics and streaming data.
class TopicClient:
def __init__(self, driver: Driver, settings: TopicClientSettings = None):
"""
Create topic operations client.
Args:
driver (Driver): YDB driver instance
settings (TopicClientSettings, optional): Client configuration
"""
def create_topic(
self,
path: str,
settings: CreateTopicSettings
):
"""
Create a new topic.
Args:
path (str): Topic path
settings (CreateTopicSettings): Topic creation configuration
"""
def describe_topic(
self,
path: str,
settings: DescribeTopicSettings = None
) -> TopicDescription:
"""
Get topic description and metadata.
Args:
path (str): Topic path
settings (DescribeTopicSettings, optional): Description settings
Returns:
TopicDescription: Topic configuration and metadata
"""
def alter_topic(
self,
path: str,
settings: AlterTopicSettings
):
"""
Alter topic configuration.
Args:
path (str): Topic path
settings (AlterTopicSettings): Alteration settings
"""
def drop_topic(
self,
path: str,
settings: DropTopicSettings = None
):
"""
Delete topic.
Args:
path (str): Topic path
settings (DropTopicSettings, optional): Drop settings
"""
def writer(
self,
topic_path: str,
producer_id: str = None,
settings: TopicWriterSettings = None
) -> TopicWriter:
"""
Create topic writer for publishing messages.
Args:
topic_path (str): Topic path to write to
producer_id (str, optional): Producer identifier
settings (TopicWriterSettings, optional): Writer configuration
Returns:
TopicWriter: Topic writer instance
"""
def reader(
self,
settings: TopicReaderSettings
) -> TopicReader:
"""
Create topic reader for consuming messages.
Args:
settings (TopicReaderSettings): Reader configuration
Returns:
TopicReader: Topic reader instance
"""
class TopicClientSettings:
def __init__(
self,
default_compression_codec: TopicCodec = None,
request_timeout: float = None
):
"""
Topic client configuration.
Args:
default_compression_codec (TopicCodec, optional): Default compression
request_timeout (float, optional): Default request timeout
"""Publisher interface for sending messages to topics with batching and delivery guarantees.
class TopicWriter:
def __init__(
self,
driver: Driver,
topic_path: str,
producer_id: str = None,
settings: TopicWriterSettings = None
):
"""
Create topic writer for message publishing.
Args:
driver (Driver): YDB driver instance
topic_path (str): Topic path to write to
producer_id (str, optional): Producer identifier
settings (TopicWriterSettings, optional): Writer configuration
"""
def write(
self,
messages: Union[TopicWriterMessage, List[TopicWriterMessage]]
) -> TopicWriteResult:
"""
Write messages to topic.
Args:
messages (Union[TopicWriterMessage, List[TopicWriterMessage]]): Messages to write
Returns:
TopicWriteResult: Write operation result
"""
def flush(self) -> TopicWriteResult:
"""
Flush pending messages and wait for acknowledgments.
Returns:
TopicWriteResult: Flush operation result
"""
def close(self, timeout: float = None):
"""
Close writer and flush pending messages.
Args:
timeout (float, optional): Close timeout
"""
def __enter__(self) -> 'TopicWriter':
"""
Enter context manager.
Returns:
TopicWriter: Writer instance
"""
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit context manager and close writer.
"""
@property
def init_info(self) -> TopicWriterInitInfo:
"""Get writer initialization information."""
class TopicWriterMessage:
def __init__(
self,
data: bytes,
seq_no: int = None,
created_at: datetime = None,
message_group_id: str = None,
metadata: Dict[str, str] = None,
codec: TopicCodec = None
):
"""
Message for topic publishing.
Args:
data (bytes): Message payload
seq_no (int, optional): Sequence number for ordering
created_at (datetime, optional): Message creation timestamp
message_group_id (str, optional): Message group identifier
metadata (Dict[str, str], optional): Message metadata
codec (TopicCodec, optional): Message compression codec
"""
@property
def data(self) -> bytes:
"""Message payload."""
@property
def seq_no(self) -> int:
"""Message sequence number."""
@property
def created_at(self) -> datetime:
"""Message creation timestamp."""
@property
def message_group_id(self) -> str:
"""Message group identifier."""
@property
def metadata(self) -> Dict[str, str]:
"""Message metadata."""
@property
def codec(self) -> TopicCodec:
"""Message compression codec."""
class TopicWriterSettings:
def __init__(
self,
producer_id: str = None,
write_session_meta: Dict[str, str] = None,
codec: TopicCodec = None,
get_last_seq_no: bool = False,
update_token_interval: float = None,
partition_id: int = None,
message_group_id: str = None
):
"""
Topic writer configuration.
Args:
producer_id (str, optional): Producer identifier
write_session_meta (Dict[str, str], optional): Session metadata
codec (TopicCodec, optional): Default compression codec
get_last_seq_no (bool): Retrieve last sequence number on init
update_token_interval (float, optional): Token update interval
partition_id (int, optional): Target partition ID
message_group_id (str, optional): Default message group ID
"""
class TopicWriteResult:
def __init__(
self,
acks: List[TopicWriterAck] = None,
errors: List[TopicWriterError] = None
):
"""
Result of topic write operation.
Args:
acks (List[TopicWriterAck], optional): Acknowledged messages
errors (List[TopicWriterError], optional): Write errors
"""
@property
def acks(self) -> List[TopicWriterAck]:
"""Acknowledged messages."""
@property
def errors(self) -> List[TopicWriterError]:
"""Write errors."""
@property
def has_errors(self) -> bool:
"""True if write operation had errors."""
class TopicWriterInitInfo:
def __init__(
self,
topic_path: str,
producer_id: str,
last_seq_no: int = None,
session_id: str = None
):
"""
Topic writer initialization information.
Args:
topic_path (str): Topic path
producer_id (str): Producer identifier
last_seq_no (int, optional): Last sequence number
session_id (str, optional): Write session identifier
"""
@property
def topic_path(self) -> str:
"""Topic path."""
@property
def producer_id(self) -> str:
"""Producer identifier."""
@property
def last_seq_no(self) -> int:
"""Last sequence number."""
@property
def session_id(self) -> str:
"""Write session identifier."""Consumer interface for reading messages from topics with automatic partition assignment and offset management.
class TopicReader:
def __init__(
self,
driver: Driver,
settings: TopicReaderSettings
):
"""
Create topic reader for message consumption.
Args:
driver (Driver): YDB driver instance
settings (TopicReaderSettings): Reader configuration
"""
def receive_message(self, timeout: float = None) -> TopicReaderMessage:
"""
Receive next message from topic.
Args:
timeout (float, optional): Receive timeout
Returns:
TopicReaderMessage: Received message
"""
def receive_batch(self, timeout: float = None) -> TopicReaderBatch:
"""
Receive batch of messages from topic.
Args:
timeout (float, optional): Receive timeout
Returns:
TopicReaderBatch: Batch of messages
"""
def commit(self, message: TopicReaderMessage):
"""
Commit message processing.
Args:
message (TopicReaderMessage): Message to commit
"""
def commit_batch(self, batch: TopicReaderBatch):
"""
Commit batch processing.
Args:
batch (TopicReaderBatch): Batch to commit
"""
def close(self, timeout: float = None):
"""
Close reader and release resources.
Args:
timeout (float, optional): Close timeout
"""
def __enter__(self) -> 'TopicReader':
"""Enter context manager."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and close reader."""
def __iter__(self) -> Iterator[TopicReaderMessage]:
"""
Iterate over messages.
Returns:
Iterator[TopicReaderMessage]: Message iterator
"""
class TopicReaderMessage:
def __init__(
self,
data: bytes,
seq_no: int,
created_at: datetime,
message_group_id: str = None,
offset: int = None,
metadata: Dict[str, str] = None,
partition_session: 'PartitionSession' = None
):
"""
Message received from topic.
Args:
data (bytes): Message payload
seq_no (int): Message sequence number
created_at (datetime): Message creation timestamp
message_group_id (str, optional): Message group identifier
offset (int, optional): Message offset in partition
metadata (Dict[str, str], optional): Message metadata
partition_session (PartitionSession, optional): Source partition session
"""
@property
def data(self) -> bytes:
"""Message payload."""
@property
def seq_no(self) -> int:
"""Message sequence number."""
@property
def created_at(self) -> datetime:
"""Message creation timestamp."""
@property
def message_group_id(self) -> str:
"""Message group identifier."""
@property
def offset(self) -> int:
"""Message offset in partition."""
@property
def metadata(self) -> Dict[str, str]:
"""Message metadata."""
@property
def partition_session(self) -> 'PartitionSession':
"""Source partition session."""
def commit(self):
"""Commit this message."""
class TopicReaderBatch:
def __init__(
self,
messages: List[TopicReaderMessage],
partition_session: 'PartitionSession'
):
"""
Batch of messages from topic.
Args:
messages (List[TopicReaderMessage]): Messages in batch
partition_session (PartitionSession): Source partition session
"""
@property
def messages(self) -> List[TopicReaderMessage]:
"""Messages in batch."""
@property
def partition_session(self) -> 'PartitionSession':
"""Source partition session."""
def commit(self):
"""Commit entire batch."""
def __iter__(self) -> Iterator[TopicReaderMessage]:
"""Iterate over messages in batch."""
def __len__(self) -> int:
"""Get number of messages in batch."""
class TopicReaderSettings:
def __init__(
self,
consumer_name: str,
topics: List[TopicReaderSelector],
buffer_size_bytes: int = None,
max_memory_usage_bytes: int = None,
max_lag: timedelta = None,
read_timeout: float = None,
commit_timeout: float = None,
with_metadata_fields: List[str] = None,
decompress_messages: bool = True
):
"""
Topic reader configuration.
Args:
consumer_name (str): Consumer identifier
topics (List[TopicReaderSelector]): Topics to read from
buffer_size_bytes (int, optional): Read buffer size
max_memory_usage_bytes (int, optional): Maximum memory usage
max_lag (timedelta, optional): Maximum allowed lag
read_timeout (float, optional): Read operation timeout
commit_timeout (float, optional): Commit operation timeout
with_metadata_fields (List[str], optional): Metadata fields to include
decompress_messages (bool): Automatically decompress messages
"""
class TopicReaderSelector:
def __init__(
self,
path: str,
partitions: List[int] = None,
read_from: datetime = None,
max_lag: timedelta = None
):
"""
Topic selection for reader.
Args:
path (str): Topic path
partitions (List[int], optional): Specific partitions to read
read_from (datetime, optional): Start reading from timestamp
max_lag (timedelta, optional): Maximum allowed lag for this topic
"""
@property
def path(self) -> str:
"""Topic path."""
@property
def partitions(self) -> List[int]:
"""Specific partitions to read."""
@property
def read_from(self) -> datetime:
"""Start reading from timestamp."""
@property
def max_lag(self) -> timedelta:
"""Maximum allowed lag."""Topic creation and management settings with partitioning and retention policies.
class TopicDescription:
def __init__(
self,
path: str,
partitions_count: int = None,
retention_period: timedelta = None,
retention_storage_mb: int = None,
supported_codecs: List[TopicCodec] = None,
partition_write_speed_bytes_per_second: int = None,
partition_write_burst_bytes: int = None,
attributes: Dict[str, str] = None,
consumers: List[TopicConsumer] = None,
metering_mode: TopicMeteringMode = None,
partition_count_limit: int = None
):
"""
Topic configuration and metadata.
Args:
path (str): Topic path
partitions_count (int, optional): Number of partitions
retention_period (timedelta, optional): Message retention period
retention_storage_mb (int, optional): Storage retention limit in MB
supported_codecs (List[TopicCodec], optional): Supported compression codecs
partition_write_speed_bytes_per_second (int, optional): Write speed limit per partition
partition_write_burst_bytes (int, optional): Write burst limit per partition
attributes (Dict[str, str], optional): Topic attributes
consumers (List[TopicConsumer], optional): Topic consumers
metering_mode (TopicMeteringMode, optional): Metering mode
partition_count_limit (int, optional): Maximum partition count
"""
@property
def path(self) -> str:
"""Topic path."""
@property
def partitions_count(self) -> int:
"""Number of partitions."""
@property
def retention_period(self) -> timedelta:
"""Message retention period."""
@property
def supported_codecs(self) -> List[TopicCodec]:
"""Supported compression codecs."""
@property
def consumers(self) -> List[TopicConsumer]:
"""Topic consumers."""
class CreateTopicSettings:
def __init__(
self,
partitions_count: int = 1,
retention_period: timedelta = None,
retention_storage_mb: int = None,
supported_codecs: List[TopicCodec] = None,
partition_write_speed_bytes_per_second: int = None,
partition_write_burst_bytes: int = None,
attributes: Dict[str, str] = None,
consumers: List[TopicConsumer] = None,
metering_mode: TopicMeteringMode = None
):
"""
Settings for topic creation.
Args:
partitions_count (int): Number of partitions to create
retention_period (timedelta, optional): Message retention period
retention_storage_mb (int, optional): Storage retention limit
supported_codecs (List[TopicCodec], optional): Allowed compression codecs
partition_write_speed_bytes_per_second (int, optional): Write speed limit
partition_write_burst_bytes (int, optional): Write burst limit
attributes (Dict[str, str], optional): Topic attributes
consumers (List[TopicConsumer], optional): Initial consumers
metering_mode (TopicMeteringMode, optional): Billing metering mode
"""
class AlterTopicSettings:
def __init__(
self,
alter_partitions_count: int = None,
set_retention_period: timedelta = None,
set_retention_storage_mb: int = None,
set_supported_codecs: List[TopicCodec] = None,
set_partition_write_speed_bytes_per_second: int = None,
set_partition_write_burst_bytes: int = None,
alter_attributes: Dict[str, str] = None,
add_consumers: List[TopicConsumer] = None,
drop_consumers: List[str] = None,
alter_consumers: List[TopicAlterConsumer] = None,
set_metering_mode: TopicMeteringMode = None
):
"""
Settings for topic alteration.
Args:
alter_partitions_count (int, optional): New partition count
set_retention_period (timedelta, optional): New retention period
set_retention_storage_mb (int, optional): New storage retention limit
set_supported_codecs (List[TopicCodec], optional): New codec list
set_partition_write_speed_bytes_per_second (int, optional): New speed limit
set_partition_write_burst_bytes (int, optional): New burst limit
alter_attributes (Dict[str, str], optional): Attribute changes
add_consumers (List[TopicConsumer], optional): Consumers to add
drop_consumers (List[str], optional): Consumer names to remove
alter_consumers (List[TopicAlterConsumer], optional): Consumer modifications
set_metering_mode (TopicMeteringMode, optional): New metering mode
"""
class TopicConsumer:
def __init__(
self,
name: str,
supported_codecs: List[TopicCodec] = None,
read_from: datetime = None,
attributes: Dict[str, str] = None,
important: bool = False
):
"""
Topic consumer configuration.
Args:
name (str): Consumer name
supported_codecs (List[TopicCodec], optional): Supported codecs
read_from (datetime, optional): Start reading from timestamp
attributes (Dict[str, str], optional): Consumer attributes
important (bool): Whether consumer is important for retention
"""
@property
def name(self) -> str:
"""Consumer name."""
@property
def supported_codecs(self) -> List[TopicCodec]:
"""Supported compression codecs."""
@property
def read_from(self) -> datetime:
"""Start reading timestamp."""
@property
def important(self) -> bool:
"""Whether consumer is important."""
class TopicCodec(enum.Enum):
"""Message compression codecs."""
RAW = "raw"
GZIP = "gzip"
LZOP = "lzop"
ZSTD = "zstd"
class TopicMeteringMode(enum.Enum):
"""Topic metering modes for billing."""
UNSPECIFIED = "unspecified"
RESERVED_CAPACITY = "reserved_capacity"
REQUEST_UNITS = "request_units"Asynchronous versions of topic operations for high-performance applications.
class TopicWriterAsyncIO:
def __init__(
self,
driver: Driver,
topic_path: str,
producer_id: str = None,
settings: TopicWriterSettings = None
):
"""
Asynchronous topic writer.
Args:
driver (Driver): Async YDB driver instance
topic_path (str): Topic path to write to
producer_id (str, optional): Producer identifier
settings (TopicWriterSettings, optional): Writer configuration
"""
async def __aenter__(self) -> 'TopicWriterAsyncIO':
"""Enter async context manager."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
async def write(
self,
messages: Union[TopicWriterMessage, List[TopicWriterMessage]]
) -> TopicWriteResult:
"""
Write messages asynchronously.
Args:
messages (Union[TopicWriterMessage, List[TopicWriterMessage]]): Messages to write
Returns:
TopicWriteResult: Write operation result
"""
async def flush(self) -> TopicWriteResult:
"""Flush pending messages asynchronously."""
async def close(self, timeout: float = None):
"""Close writer asynchronously."""
class TopicReaderAsyncIO:
def __init__(
self,
driver: Driver,
settings: TopicReaderSettings
):
"""
Asynchronous topic reader.
Args:
driver (Driver): Async YDB driver instance
settings (TopicReaderSettings): Reader configuration
"""
async def __aenter__(self) -> 'TopicReaderAsyncIO':
"""Enter async context manager."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
async def receive_message(self, timeout: float = None) -> TopicReaderMessage:
"""
Receive message asynchronously.
Args:
timeout (float, optional): Receive timeout
Returns:
TopicReaderMessage: Received message
"""
async def receive_batch(self, timeout: float = None) -> TopicReaderBatch:
"""
Receive batch asynchronously.
Args:
timeout (float, optional): Receive timeout
Returns:
TopicReaderBatch: Batch of messages
"""
def __aiter__(self) -> AsyncIterator[TopicReaderMessage]:
"""Async iterator over messages."""
async def __anext__(self) -> TopicReaderMessage:
"""Get next message asynchronously."""
async def close(self, timeout: float = None):
"""Close reader asynchronously."""
class TopicClientAsyncIO:
def __init__(self, driver: Driver, settings: TopicClientSettings = None):
"""Asynchronous topic client."""
async def create_topic(self, path: str, settings: CreateTopicSettings):
"""Create topic asynchronously."""
async def describe_topic(
self,
path: str,
settings: DescribeTopicSettings = None
) -> TopicDescription:
"""Describe topic asynchronously."""
async def alter_topic(self, path: str, settings: AlterTopicSettings):
"""Alter topic asynchronously."""
async def drop_topic(self, path: str, settings: DropTopicSettings = None):
"""Drop topic asynchronously."""import ydb
# Create driver and topic client
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
driver.wait(fail_fast=True)
topic_client = ydb.TopicClient(driver)
# Create topic
create_settings = ydb.CreateTopicSettings(
partitions_count=3,
retention_period=timedelta(days=7),
supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP]
)
topic_client.create_topic("/local/events", create_settings)
# Write messages
with topic_client.writer("/local/events", producer_id="producer-1") as writer:
# Write single message
message = ydb.TopicWriterMessage(
data=b'{"event": "user_login", "user_id": 123}',
message_group_id="user-123"
)
writer.write(message)
# Write batch of messages
messages = [
ydb.TopicWriterMessage(
data=f'{{"event": "page_view", "page": "/home", "user_id": {i}}}'.encode(),
message_group_id=f"user-{i}"
)
for i in range(100, 110)
]
result = writer.write(messages)
writer.flush()
print(f"Written {len(result.acks)} messages")# Create consumer
consumer_settings = ydb.TopicReaderSettings(
consumer_name="analytics-consumer",
topics=[
ydb.TopicReaderSelector(
path="/local/events",
read_from=datetime.now() - timedelta(hours=1)
)
],
buffer_size_bytes=1024*1024, # 1MB buffer
max_lag=timedelta(minutes=5)
)
# Read messages
with topic_client.reader(consumer_settings) as reader:
for message in reader:
try:
# Process message
event_data = json.loads(message.data.decode())
print(f"Processing event: {event_data}")
# Commit message after successful processing
message.commit()
except json.JSONDecodeError:
print(f"Failed to parse message: {message.data}")
# Still commit to skip malformed messages
message.commit()
except Exception as e:
print(f"Error processing message: {e}")
# Don't commit - message will be redelivered
break# Process messages in batches for better throughput
with topic_client.reader(consumer_settings) as reader:
while True:
try:
batch = reader.receive_batch(timeout=30.0)
if not batch.messages:
continue
# Process batch
events = []
for message in batch:
try:
event = json.loads(message.data.decode())
events.append(event)
except json.JSONDecodeError:
print(f"Skipping malformed message: {message.data}")
# Bulk insert to database
if events:
process_events_batch(events)
# Commit entire batch
batch.commit()
print(f"Processed batch of {len(batch)} messages")
except ydb.TimeoutError:
print("No messages received in timeout period")
continue
except KeyboardInterrupt:
print("Shutting down consumer...")
breakimport asyncio
import ydb.aio as ydb_aio
async def async_topic_producer():
async with ydb_aio.Driver(...) as driver:
topic_client = ydb.TopicClientAsyncIO(driver)
# Create topic
await topic_client.create_topic(
"/local/async_events",
ydb.CreateTopicSettings(partitions_count=5)
)
# Write messages asynchronously
async with ydb.TopicWriterAsyncIO(
driver,
"/local/async_events",
producer_id="async-producer"
) as writer:
# Generate and write messages
for i in range(1000):
message = ydb.TopicWriterMessage(
data=f'{{"id": {i}, "timestamp": "{datetime.now().isoformat()}"}}'.encode(),
seq_no=i
)
await writer.write(message)
if i % 100 == 0:
await writer.flush()
print(f"Written {i+1} messages")
async def async_topic_consumer():
async with ydb_aio.Driver(...) as driver:
reader_settings = ydb.TopicReaderSettings(
consumer_name="async-consumer",
topics=[ydb.TopicReaderSelector("/local/async_events")]
)
async with ydb.TopicReaderAsyncIO(driver, reader_settings) as reader:
async for message in reader:
# Process message asynchronously
await process_message_async(message.data)
message.commit()
# Run async operations
asyncio.run(async_topic_producer())
asyncio.run(async_topic_consumer())def manage_topic_lifecycle():
topic_client = ydb.TopicClient(driver)
topic_path = "/local/user_events"
# Create topic with consumers
consumers = [
ydb.TopicConsumer(
name="analytics",
important=True,
read_from=datetime.now()
),
ydb.TopicConsumer(
name="archival",
important=False
)
]
create_settings = ydb.CreateTopicSettings(
partitions_count=10,
retention_period=timedelta(days=30),
retention_storage_mb=10000,
consumers=consumers,
attributes={"team": "analytics", "env": "prod"}
)
topic_client.create_topic(topic_path, create_settings)
# Describe topic
description = topic_client.describe_topic(topic_path)
print(f"Topic: {description.path}")
print(f"Partitions: {description.partitions_count}")
print(f"Retention: {description.retention_period}")
print(f"Consumers: {[c.name for c in description.consumers]}")
# Alter topic - add partition and consumer
alter_settings = ydb.AlterTopicSettings(
alter_partitions_count=15,
add_consumers=[
ydb.TopicConsumer(name="realtime", important=True)
],
set_retention_period=timedelta(days=45)
)
topic_client.alter_topic(topic_path, alter_settings)
# Verify changes
updated_description = topic_client.describe_topic(topic_path)
print(f"Updated partitions: {updated_description.partitions_count}")
print(f"Updated consumers: {[c.name for c in updated_description.consumers]}")
manage_topic_lifecycle()def robust_topic_processing():
reader_settings = ydb.TopicReaderSettings(
consumer_name="robust-consumer",
topics=[ydb.TopicReaderSelector("/local/events")],
read_timeout=10.0,
commit_timeout=5.0
)
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
with topic_client.reader(reader_settings) as reader:
retry_count = 0 # Reset on successful connection
while True:
try:
message = reader.receive_message(timeout=30.0)
# Process with timeout
process_start = time.time()
result = process_message_with_timeout(message.data, timeout=10.0)
process_time = time.time() - process_start
if result:
message.commit()
print(f"Processed message in {process_time:.2f}s")
else:
print("Processing failed, skipping message")
message.commit() # Skip failed messages
except ydb.TimeoutError:
print("No messages received, continuing...")
continue
except ydb.TopicReaderPartitionExpiredError as e:
print(f"Partition expired: {e}, reconnecting...")
break
except Exception as e:
print(f"Unexpected error: {e}")
time.sleep(1.0)
except ydb.ConnectionError as e:
retry_count += 1
backoff = min(2 ** retry_count, 30)
print(f"Connection failed, retrying in {backoff}s... ({retry_count}/{max_retries})")
time.sleep(backoff)
except KeyboardInterrupt:
print("Shutting down gracefully...")
break
if retry_count >= max_retries:
print("Max retries exceeded, giving up")
robust_topic_processing()# Type aliases for topic operations
TopicPath = str
ProducerId = str
ConsumerName = str
MessageGroupId = str
PartitionId = int
MessageOffset = int
SequenceNumber = int
# Message handling
MessageData = bytes
MessageMetadata = Dict[str, str]
MessageHandler = Callable[[TopicReaderMessage], bool]
AsyncMessageHandler = Callable[[TopicReaderMessage], Awaitable[bool]]
# Batch processing
BatchProcessor = Callable[[List[TopicReaderMessage]], bool]
AsyncBatchProcessor = Callable[[List[TopicReaderMessage]], Awaitable[bool]]