0
# Topic Operations
1
2
Streaming data operations including topic creation, message publishing, message consuming, and topic administration.
3
4
## Capabilities
5
6
### Topic Client
7
8
The topic client provides comprehensive operations for managing topics and streaming data.
9
10
```python { .api }
11
class TopicClient:
12
def __init__(self, driver: Driver, settings: TopicClientSettings = None):
13
"""
14
Create topic operations client.
15
16
Args:
17
driver (Driver): YDB driver instance
18
settings (TopicClientSettings, optional): Client configuration
19
"""
20
21
def create_topic(
22
self,
23
path: str,
24
settings: CreateTopicSettings
25
):
26
"""
27
Create a new topic.
28
29
Args:
30
path (str): Topic path
31
settings (CreateTopicSettings): Topic creation configuration
32
"""
33
34
def describe_topic(
35
self,
36
path: str,
37
settings: DescribeTopicSettings = None
38
) -> TopicDescription:
39
"""
40
Get topic description and metadata.
41
42
Args:
43
path (str): Topic path
44
settings (DescribeTopicSettings, optional): Description settings
45
46
Returns:
47
TopicDescription: Topic configuration and metadata
48
"""
49
50
def alter_topic(
51
self,
52
path: str,
53
settings: AlterTopicSettings
54
):
55
"""
56
Alter topic configuration.
57
58
Args:
59
path (str): Topic path
60
settings (AlterTopicSettings): Alteration settings
61
"""
62
63
def drop_topic(
64
self,
65
path: str,
66
settings: DropTopicSettings = None
67
):
68
"""
69
Delete topic.
70
71
Args:
72
path (str): Topic path
73
settings (DropTopicSettings, optional): Drop settings
74
"""
75
76
def writer(
77
self,
78
topic_path: str,
79
producer_id: str = None,
80
settings: TopicWriterSettings = None
81
) -> TopicWriter:
82
"""
83
Create topic writer for publishing messages.
84
85
Args:
86
topic_path (str): Topic path to write to
87
producer_id (str, optional): Producer identifier
88
settings (TopicWriterSettings, optional): Writer configuration
89
90
Returns:
91
TopicWriter: Topic writer instance
92
"""
93
94
def reader(
95
self,
96
settings: TopicReaderSettings
97
) -> TopicReader:
98
"""
99
Create topic reader for consuming messages.
100
101
Args:
102
settings (TopicReaderSettings): Reader configuration
103
104
Returns:
105
TopicReader: Topic reader instance
106
"""
107
108
class TopicClientSettings:
109
def __init__(
110
self,
111
default_compression_codec: TopicCodec = None,
112
request_timeout: float = None
113
):
114
"""
115
Topic client configuration.
116
117
Args:
118
default_compression_codec (TopicCodec, optional): Default compression
119
request_timeout (float, optional): Default request timeout
120
"""
121
```
122
123
### Topic Writer
124
125
Publisher interface for sending messages to topics with batching and delivery guarantees.
126
127
```python { .api }
128
class TopicWriter:
129
def __init__(
130
self,
131
driver: Driver,
132
topic_path: str,
133
producer_id: str = None,
134
settings: TopicWriterSettings = None
135
):
136
"""
137
Create topic writer for message publishing.
138
139
Args:
140
driver (Driver): YDB driver instance
141
topic_path (str): Topic path to write to
142
producer_id (str, optional): Producer identifier
143
settings (TopicWriterSettings, optional): Writer configuration
144
"""
145
146
def write(
147
self,
148
messages: Union[TopicWriterMessage, List[TopicWriterMessage]]
149
) -> TopicWriteResult:
150
"""
151
Write messages to topic.
152
153
Args:
154
messages (Union[TopicWriterMessage, List[TopicWriterMessage]]): Messages to write
155
156
Returns:
157
TopicWriteResult: Write operation result
158
"""
159
160
def flush(self) -> TopicWriteResult:
161
"""
162
Flush pending messages and wait for acknowledgments.
163
164
Returns:
165
TopicWriteResult: Flush operation result
166
"""
167
168
def close(self, timeout: float = None):
169
"""
170
Close writer and flush pending messages.
171
172
Args:
173
timeout (float, optional): Close timeout
174
"""
175
176
def __enter__(self) -> 'TopicWriter':
177
"""
178
Enter context manager.
179
180
Returns:
181
TopicWriter: Writer instance
182
"""
183
184
def __exit__(self, exc_type, exc_val, exc_tb):
185
"""
186
Exit context manager and close writer.
187
"""
188
189
@property
190
def init_info(self) -> TopicWriterInitInfo:
191
"""Get writer initialization information."""
192
193
class TopicWriterMessage:
194
def __init__(
195
self,
196
data: bytes,
197
seq_no: int = None,
198
created_at: datetime = None,
199
message_group_id: str = None,
200
metadata: Dict[str, str] = None,
201
codec: TopicCodec = None
202
):
203
"""
204
Message for topic publishing.
205
206
Args:
207
data (bytes): Message payload
208
seq_no (int, optional): Sequence number for ordering
209
created_at (datetime, optional): Message creation timestamp
210
message_group_id (str, optional): Message group identifier
211
metadata (Dict[str, str], optional): Message metadata
212
codec (TopicCodec, optional): Message compression codec
213
"""
214
215
@property
216
def data(self) -> bytes:
217
"""Message payload."""
218
219
@property
220
def seq_no(self) -> int:
221
"""Message sequence number."""
222
223
@property
224
def created_at(self) -> datetime:
225
"""Message creation timestamp."""
226
227
@property
228
def message_group_id(self) -> str:
229
"""Message group identifier."""
230
231
@property
232
def metadata(self) -> Dict[str, str]:
233
"""Message metadata."""
234
235
@property
236
def codec(self) -> TopicCodec:
237
"""Message compression codec."""
238
239
class TopicWriterSettings:
240
def __init__(
241
self,
242
producer_id: str = None,
243
write_session_meta: Dict[str, str] = None,
244
codec: TopicCodec = None,
245
get_last_seq_no: bool = False,
246
update_token_interval: float = None,
247
partition_id: int = None,
248
message_group_id: str = None
249
):
250
"""
251
Topic writer configuration.
252
253
Args:
254
producer_id (str, optional): Producer identifier
255
write_session_meta (Dict[str, str], optional): Session metadata
256
codec (TopicCodec, optional): Default compression codec
257
get_last_seq_no (bool): Retrieve last sequence number on init
258
update_token_interval (float, optional): Token update interval
259
partition_id (int, optional): Target partition ID
260
message_group_id (str, optional): Default message group ID
261
"""
262
263
class TopicWriteResult:
264
def __init__(
265
self,
266
acks: List[TopicWriterAck] = None,
267
errors: List[TopicWriterError] = None
268
):
269
"""
270
Result of topic write operation.
271
272
Args:
273
acks (List[TopicWriterAck], optional): Acknowledged messages
274
errors (List[TopicWriterError], optional): Write errors
275
"""
276
277
@property
278
def acks(self) -> List[TopicWriterAck]:
279
"""Acknowledged messages."""
280
281
@property
282
def errors(self) -> List[TopicWriterError]:
283
"""Write errors."""
284
285
@property
286
def has_errors(self) -> bool:
287
"""True if write operation had errors."""
288
289
class TopicWriterInitInfo:
290
def __init__(
291
self,
292
topic_path: str,
293
producer_id: str,
294
last_seq_no: int = None,
295
session_id: str = None
296
):
297
"""
298
Topic writer initialization information.
299
300
Args:
301
topic_path (str): Topic path
302
producer_id (str): Producer identifier
303
last_seq_no (int, optional): Last sequence number
304
session_id (str, optional): Write session identifier
305
"""
306
307
@property
308
def topic_path(self) -> str:
309
"""Topic path."""
310
311
@property
312
def producer_id(self) -> str:
313
"""Producer identifier."""
314
315
@property
316
def last_seq_no(self) -> int:
317
"""Last sequence number."""
318
319
@property
320
def session_id(self) -> str:
321
"""Write session identifier."""
322
```
323
324
### Topic Reader
325
326
Consumer interface for reading messages from topics with automatic partition assignment and offset management.
327
328
```python { .api }
329
class TopicReader:
330
def __init__(
331
self,
332
driver: Driver,
333
settings: TopicReaderSettings
334
):
335
"""
336
Create topic reader for message consumption.
337
338
Args:
339
driver (Driver): YDB driver instance
340
settings (TopicReaderSettings): Reader configuration
341
"""
342
343
def receive_message(self, timeout: float = None) -> TopicReaderMessage:
344
"""
345
Receive next message from topic.
346
347
Args:
348
timeout (float, optional): Receive timeout
349
350
Returns:
351
TopicReaderMessage: Received message
352
"""
353
354
def receive_batch(self, timeout: float = None) -> TopicReaderBatch:
355
"""
356
Receive batch of messages from topic.
357
358
Args:
359
timeout (float, optional): Receive timeout
360
361
Returns:
362
TopicReaderBatch: Batch of messages
363
"""
364
365
def commit(self, message: TopicReaderMessage):
366
"""
367
Commit message processing.
368
369
Args:
370
message (TopicReaderMessage): Message to commit
371
"""
372
373
def commit_batch(self, batch: TopicReaderBatch):
374
"""
375
Commit batch processing.
376
377
Args:
378
batch (TopicReaderBatch): Batch to commit
379
"""
380
381
def close(self, timeout: float = None):
382
"""
383
Close reader and release resources.
384
385
Args:
386
timeout (float, optional): Close timeout
387
"""
388
389
def __enter__(self) -> 'TopicReader':
390
"""Enter context manager."""
391
392
def __exit__(self, exc_type, exc_val, exc_tb):
393
"""Exit context manager and close reader."""
394
395
def __iter__(self) -> Iterator[TopicReaderMessage]:
396
"""
397
Iterate over messages.
398
399
Returns:
400
Iterator[TopicReaderMessage]: Message iterator
401
"""
402
403
class TopicReaderMessage:
404
def __init__(
405
self,
406
data: bytes,
407
seq_no: int,
408
created_at: datetime,
409
message_group_id: str = None,
410
offset: int = None,
411
metadata: Dict[str, str] = None,
412
partition_session: 'PartitionSession' = None
413
):
414
"""
415
Message received from topic.
416
417
Args:
418
data (bytes): Message payload
419
seq_no (int): Message sequence number
420
created_at (datetime): Message creation timestamp
421
message_group_id (str, optional): Message group identifier
422
offset (int, optional): Message offset in partition
423
metadata (Dict[str, str], optional): Message metadata
424
partition_session (PartitionSession, optional): Source partition session
425
"""
426
427
@property
428
def data(self) -> bytes:
429
"""Message payload."""
430
431
@property
432
def seq_no(self) -> int:
433
"""Message sequence number."""
434
435
@property
436
def created_at(self) -> datetime:
437
"""Message creation timestamp."""
438
439
@property
440
def message_group_id(self) -> str:
441
"""Message group identifier."""
442
443
@property
444
def offset(self) -> int:
445
"""Message offset in partition."""
446
447
@property
448
def metadata(self) -> Dict[str, str]:
449
"""Message metadata."""
450
451
@property
452
def partition_session(self) -> 'PartitionSession':
453
"""Source partition session."""
454
455
def commit(self):
456
"""Commit this message."""
457
458
class TopicReaderBatch:
459
def __init__(
460
self,
461
messages: List[TopicReaderMessage],
462
partition_session: 'PartitionSession'
463
):
464
"""
465
Batch of messages from topic.
466
467
Args:
468
messages (List[TopicReaderMessage]): Messages in batch
469
partition_session (PartitionSession): Source partition session
470
"""
471
472
@property
473
def messages(self) -> List[TopicReaderMessage]:
474
"""Messages in batch."""
475
476
@property
477
def partition_session(self) -> 'PartitionSession':
478
"""Source partition session."""
479
480
def commit(self):
481
"""Commit entire batch."""
482
483
def __iter__(self) -> Iterator[TopicReaderMessage]:
484
"""Iterate over messages in batch."""
485
486
def __len__(self) -> int:
487
"""Get number of messages in batch."""
488
489
class TopicReaderSettings:
490
def __init__(
491
self,
492
consumer_name: str,
493
topics: List[TopicReaderSelector],
494
buffer_size_bytes: int = None,
495
max_memory_usage_bytes: int = None,
496
max_lag: timedelta = None,
497
read_timeout: float = None,
498
commit_timeout: float = None,
499
with_metadata_fields: List[str] = None,
500
decompress_messages: bool = True
501
):
502
"""
503
Topic reader configuration.
504
505
Args:
506
consumer_name (str): Consumer identifier
507
topics (List[TopicReaderSelector]): Topics to read from
508
buffer_size_bytes (int, optional): Read buffer size
509
max_memory_usage_bytes (int, optional): Maximum memory usage
510
max_lag (timedelta, optional): Maximum allowed lag
511
read_timeout (float, optional): Read operation timeout
512
commit_timeout (float, optional): Commit operation timeout
513
with_metadata_fields (List[str], optional): Metadata fields to include
514
decompress_messages (bool): Automatically decompress messages
515
"""
516
517
class TopicReaderSelector:
518
def __init__(
519
self,
520
path: str,
521
partitions: List[int] = None,
522
read_from: datetime = None,
523
max_lag: timedelta = None
524
):
525
"""
526
Topic selection for reader.
527
528
Args:
529
path (str): Topic path
530
partitions (List[int], optional): Specific partitions to read
531
read_from (datetime, optional): Start reading from timestamp
532
max_lag (timedelta, optional): Maximum allowed lag for this topic
533
"""
534
535
@property
536
def path(self) -> str:
537
"""Topic path."""
538
539
@property
540
def partitions(self) -> List[int]:
541
"""Specific partitions to read."""
542
543
@property
544
def read_from(self) -> datetime:
545
"""Start reading from timestamp."""
546
547
@property
548
def max_lag(self) -> timedelta:
549
"""Maximum allowed lag."""
550
```
551
552
### Topic Configuration
553
554
Topic creation and management settings with partitioning and retention policies.
555
556
```python { .api }
557
class TopicDescription:
558
def __init__(
559
self,
560
path: str,
561
partitions_count: int = None,
562
retention_period: timedelta = None,
563
retention_storage_mb: int = None,
564
supported_codecs: List[TopicCodec] = None,
565
partition_write_speed_bytes_per_second: int = None,
566
partition_write_burst_bytes: int = None,
567
attributes: Dict[str, str] = None,
568
consumers: List[TopicConsumer] = None,
569
metering_mode: TopicMeteringMode = None,
570
partition_count_limit: int = None
571
):
572
"""
573
Topic configuration and metadata.
574
575
Args:
576
path (str): Topic path
577
partitions_count (int, optional): Number of partitions
578
retention_period (timedelta, optional): Message retention period
579
retention_storage_mb (int, optional): Storage retention limit in MB
580
supported_codecs (List[TopicCodec], optional): Supported compression codecs
581
partition_write_speed_bytes_per_second (int, optional): Write speed limit per partition
582
partition_write_burst_bytes (int, optional): Write burst limit per partition
583
attributes (Dict[str, str], optional): Topic attributes
584
consumers (List[TopicConsumer], optional): Topic consumers
585
metering_mode (TopicMeteringMode, optional): Metering mode
586
partition_count_limit (int, optional): Maximum partition count
587
"""
588
589
@property
590
def path(self) -> str:
591
"""Topic path."""
592
593
@property
594
def partitions_count(self) -> int:
595
"""Number of partitions."""
596
597
@property
598
def retention_period(self) -> timedelta:
599
"""Message retention period."""
600
601
@property
602
def supported_codecs(self) -> List[TopicCodec]:
603
"""Supported compression codecs."""
604
605
@property
606
def consumers(self) -> List[TopicConsumer]:
607
"""Topic consumers."""
608
609
class CreateTopicSettings:
610
def __init__(
611
self,
612
partitions_count: int = 1,
613
retention_period: timedelta = None,
614
retention_storage_mb: int = None,
615
supported_codecs: List[TopicCodec] = None,
616
partition_write_speed_bytes_per_second: int = None,
617
partition_write_burst_bytes: int = None,
618
attributes: Dict[str, str] = None,
619
consumers: List[TopicConsumer] = None,
620
metering_mode: TopicMeteringMode = None
621
):
622
"""
623
Settings for topic creation.
624
625
Args:
626
partitions_count (int): Number of partitions to create
627
retention_period (timedelta, optional): Message retention period
628
retention_storage_mb (int, optional): Storage retention limit
629
supported_codecs (List[TopicCodec], optional): Allowed compression codecs
630
partition_write_speed_bytes_per_second (int, optional): Write speed limit
631
partition_write_burst_bytes (int, optional): Write burst limit
632
attributes (Dict[str, str], optional): Topic attributes
633
consumers (List[TopicConsumer], optional): Initial consumers
634
metering_mode (TopicMeteringMode, optional): Billing metering mode
635
"""
636
637
class AlterTopicSettings:
638
def __init__(
639
self,
640
alter_partitions_count: int = None,
641
set_retention_period: timedelta = None,
642
set_retention_storage_mb: int = None,
643
set_supported_codecs: List[TopicCodec] = None,
644
set_partition_write_speed_bytes_per_second: int = None,
645
set_partition_write_burst_bytes: int = None,
646
alter_attributes: Dict[str, str] = None,
647
add_consumers: List[TopicConsumer] = None,
648
drop_consumers: List[str] = None,
649
alter_consumers: List[TopicAlterConsumer] = None,
650
set_metering_mode: TopicMeteringMode = None
651
):
652
"""
653
Settings for topic alteration.
654
655
Args:
656
alter_partitions_count (int, optional): New partition count
657
set_retention_period (timedelta, optional): New retention period
658
set_retention_storage_mb (int, optional): New storage retention limit
659
set_supported_codecs (List[TopicCodec], optional): New codec list
660
set_partition_write_speed_bytes_per_second (int, optional): New speed limit
661
set_partition_write_burst_bytes (int, optional): New burst limit
662
alter_attributes (Dict[str, str], optional): Attribute changes
663
add_consumers (List[TopicConsumer], optional): Consumers to add
664
drop_consumers (List[str], optional): Consumer names to remove
665
alter_consumers (List[TopicAlterConsumer], optional): Consumer modifications
666
set_metering_mode (TopicMeteringMode, optional): New metering mode
667
"""
668
669
class TopicConsumer:
670
def __init__(
671
self,
672
name: str,
673
supported_codecs: List[TopicCodec] = None,
674
read_from: datetime = None,
675
attributes: Dict[str, str] = None,
676
important: bool = False
677
):
678
"""
679
Topic consumer configuration.
680
681
Args:
682
name (str): Consumer name
683
supported_codecs (List[TopicCodec], optional): Supported codecs
684
read_from (datetime, optional): Start reading from timestamp
685
attributes (Dict[str, str], optional): Consumer attributes
686
important (bool): Whether consumer is important for retention
687
"""
688
689
@property
690
def name(self) -> str:
691
"""Consumer name."""
692
693
@property
694
def supported_codecs(self) -> List[TopicCodec]:
695
"""Supported compression codecs."""
696
697
@property
698
def read_from(self) -> datetime:
699
"""Start reading timestamp."""
700
701
@property
702
def important(self) -> bool:
703
"""Whether consumer is important."""
704
705
class TopicCodec(enum.Enum):
706
"""Message compression codecs."""
707
RAW = "raw"
708
GZIP = "gzip"
709
LZOP = "lzop"
710
ZSTD = "zstd"
711
712
class TopicMeteringMode(enum.Enum):
713
"""Topic metering modes for billing."""
714
UNSPECIFIED = "unspecified"
715
RESERVED_CAPACITY = "reserved_capacity"
716
REQUEST_UNITS = "request_units"
717
```
718
719
### Async Topic Operations
720
721
Asynchronous versions of topic operations for high-performance applications.
722
723
```python { .api }
724
class TopicWriterAsyncIO:
725
def __init__(
726
self,
727
driver: Driver,
728
topic_path: str,
729
producer_id: str = None,
730
settings: TopicWriterSettings = None
731
):
732
"""
733
Asynchronous topic writer.
734
735
Args:
736
driver (Driver): Async YDB driver instance
737
topic_path (str): Topic path to write to
738
producer_id (str, optional): Producer identifier
739
settings (TopicWriterSettings, optional): Writer configuration
740
"""
741
742
async def __aenter__(self) -> 'TopicWriterAsyncIO':
743
"""Enter async context manager."""
744
745
async def __aexit__(self, exc_type, exc_val, exc_tb):
746
"""Exit async context manager."""
747
748
async def write(
749
self,
750
messages: Union[TopicWriterMessage, List[TopicWriterMessage]]
751
) -> TopicWriteResult:
752
"""
753
Write messages asynchronously.
754
755
Args:
756
messages (Union[TopicWriterMessage, List[TopicWriterMessage]]): Messages to write
757
758
Returns:
759
TopicWriteResult: Write operation result
760
"""
761
762
async def flush(self) -> TopicWriteResult:
763
"""Flush pending messages asynchronously."""
764
765
async def close(self, timeout: float = None):
766
"""Close writer asynchronously."""
767
768
class TopicReaderAsyncIO:
769
def __init__(
770
self,
771
driver: Driver,
772
settings: TopicReaderSettings
773
):
774
"""
775
Asynchronous topic reader.
776
777
Args:
778
driver (Driver): Async YDB driver instance
779
settings (TopicReaderSettings): Reader configuration
780
"""
781
782
async def __aenter__(self) -> 'TopicReaderAsyncIO':
783
"""Enter async context manager."""
784
785
async def __aexit__(self, exc_type, exc_val, exc_tb):
786
"""Exit async context manager."""
787
788
async def receive_message(self, timeout: float = None) -> TopicReaderMessage:
789
"""
790
Receive message asynchronously.
791
792
Args:
793
timeout (float, optional): Receive timeout
794
795
Returns:
796
TopicReaderMessage: Received message
797
"""
798
799
async def receive_batch(self, timeout: float = None) -> TopicReaderBatch:
800
"""
801
Receive batch asynchronously.
802
803
Args:
804
timeout (float, optional): Receive timeout
805
806
Returns:
807
TopicReaderBatch: Batch of messages
808
"""
809
810
def __aiter__(self) -> AsyncIterator[TopicReaderMessage]:
811
"""Async iterator over messages."""
812
813
async def __anext__(self) -> TopicReaderMessage:
814
"""Get next message asynchronously."""
815
816
async def close(self, timeout: float = None):
817
"""Close reader asynchronously."""
818
819
class TopicClientAsyncIO:
820
def __init__(self, driver: Driver, settings: TopicClientSettings = None):
821
"""Asynchronous topic client."""
822
823
async def create_topic(self, path: str, settings: CreateTopicSettings):
824
"""Create topic asynchronously."""
825
826
async def describe_topic(
827
self,
828
path: str,
829
settings: DescribeTopicSettings = None
830
) -> TopicDescription:
831
"""Describe topic asynchronously."""
832
833
async def alter_topic(self, path: str, settings: AlterTopicSettings):
834
"""Alter topic asynchronously."""
835
836
async def drop_topic(self, path: str, settings: DropTopicSettings = None):
837
"""Drop topic asynchronously."""
838
```
839
840
## Usage Examples
841
842
### Basic Topic Publishing
843
844
```python
845
import ydb
846
847
# Create driver and topic client
848
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
849
driver.wait(fail_fast=True)
850
851
topic_client = ydb.TopicClient(driver)
852
853
# Create topic
854
create_settings = ydb.CreateTopicSettings(
855
partitions_count=3,
856
retention_period=timedelta(days=7),
857
supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP]
858
)
859
860
topic_client.create_topic("/local/events", create_settings)
861
862
# Write messages
863
with topic_client.writer("/local/events", producer_id="producer-1") as writer:
864
# Write single message
865
message = ydb.TopicWriterMessage(
866
data=b'{"event": "user_login", "user_id": 123}',
867
message_group_id="user-123"
868
)
869
writer.write(message)
870
871
# Write batch of messages
872
messages = [
873
ydb.TopicWriterMessage(
874
data=f'{{"event": "page_view", "page": "/home", "user_id": {i}}}'.encode(),
875
message_group_id=f"user-{i}"
876
)
877
for i in range(100, 110)
878
]
879
880
result = writer.write(messages)
881
writer.flush()
882
883
print(f"Written {len(result.acks)} messages")
884
```
885
886
### Message Consumption
887
888
```python
889
# Create consumer
890
consumer_settings = ydb.TopicReaderSettings(
891
consumer_name="analytics-consumer",
892
topics=[
893
ydb.TopicReaderSelector(
894
path="/local/events",
895
read_from=datetime.now() - timedelta(hours=1)
896
)
897
],
898
buffer_size_bytes=1024*1024, # 1MB buffer
899
max_lag=timedelta(minutes=5)
900
)
901
902
# Read messages
903
with topic_client.reader(consumer_settings) as reader:
904
for message in reader:
905
try:
906
# Process message
907
event_data = json.loads(message.data.decode())
908
print(f"Processing event: {event_data}")
909
910
# Commit message after successful processing
911
message.commit()
912
913
except json.JSONDecodeError:
914
print(f"Failed to parse message: {message.data}")
915
# Still commit to skip malformed messages
916
message.commit()
917
except Exception as e:
918
print(f"Error processing message: {e}")
919
# Don't commit - message will be redelivered
920
break
921
```
922
923
### Batch Processing
924
925
```python
926
# Process messages in batches for better throughput
927
with topic_client.reader(consumer_settings) as reader:
928
while True:
929
try:
930
batch = reader.receive_batch(timeout=30.0)
931
932
if not batch.messages:
933
continue
934
935
# Process batch
936
events = []
937
for message in batch:
938
try:
939
event = json.loads(message.data.decode())
940
events.append(event)
941
except json.JSONDecodeError:
942
print(f"Skipping malformed message: {message.data}")
943
944
# Bulk insert to database
945
if events:
946
process_events_batch(events)
947
948
# Commit entire batch
949
batch.commit()
950
print(f"Processed batch of {len(batch)} messages")
951
952
except ydb.TimeoutError:
953
print("No messages received in timeout period")
954
continue
955
except KeyboardInterrupt:
956
print("Shutting down consumer...")
957
break
958
```
959
960
### Async Topic Operations
961
962
```python
963
import asyncio
964
import ydb.aio as ydb_aio
965
966
async def async_topic_producer():
967
async with ydb_aio.Driver(...) as driver:
968
topic_client = ydb.TopicClientAsyncIO(driver)
969
970
# Create topic
971
await topic_client.create_topic(
972
"/local/async_events",
973
ydb.CreateTopicSettings(partitions_count=5)
974
)
975
976
# Write messages asynchronously
977
async with ydb.TopicWriterAsyncIO(
978
driver,
979
"/local/async_events",
980
producer_id="async-producer"
981
) as writer:
982
983
# Generate and write messages
984
for i in range(1000):
985
message = ydb.TopicWriterMessage(
986
data=f'{{"id": {i}, "timestamp": "{datetime.now().isoformat()}"}}'.encode(),
987
seq_no=i
988
)
989
990
await writer.write(message)
991
992
if i % 100 == 0:
993
await writer.flush()
994
print(f"Written {i+1} messages")
995
996
async def async_topic_consumer():
997
async with ydb_aio.Driver(...) as driver:
998
reader_settings = ydb.TopicReaderSettings(
999
consumer_name="async-consumer",
1000
topics=[ydb.TopicReaderSelector("/local/async_events")]
1001
)
1002
1003
async with ydb.TopicReaderAsyncIO(driver, reader_settings) as reader:
1004
async for message in reader:
1005
# Process message asynchronously
1006
await process_message_async(message.data)
1007
message.commit()
1008
1009
# Run async operations
1010
asyncio.run(async_topic_producer())
1011
asyncio.run(async_topic_consumer())
1012
```
1013
1014
### Topic Administration
1015
1016
```python
1017
def manage_topic_lifecycle():
1018
topic_client = ydb.TopicClient(driver)
1019
topic_path = "/local/user_events"
1020
1021
# Create topic with consumers
1022
consumers = [
1023
ydb.TopicConsumer(
1024
name="analytics",
1025
important=True,
1026
read_from=datetime.now()
1027
),
1028
ydb.TopicConsumer(
1029
name="archival",
1030
important=False
1031
)
1032
]
1033
1034
create_settings = ydb.CreateTopicSettings(
1035
partitions_count=10,
1036
retention_period=timedelta(days=30),
1037
retention_storage_mb=10000,
1038
consumers=consumers,
1039
attributes={"team": "analytics", "env": "prod"}
1040
)
1041
1042
topic_client.create_topic(topic_path, create_settings)
1043
1044
# Describe topic
1045
description = topic_client.describe_topic(topic_path)
1046
print(f"Topic: {description.path}")
1047
print(f"Partitions: {description.partitions_count}")
1048
print(f"Retention: {description.retention_period}")
1049
print(f"Consumers: {[c.name for c in description.consumers]}")
1050
1051
# Alter topic - add partition and consumer
1052
alter_settings = ydb.AlterTopicSettings(
1053
alter_partitions_count=15,
1054
add_consumers=[
1055
ydb.TopicConsumer(name="realtime", important=True)
1056
],
1057
set_retention_period=timedelta(days=45)
1058
)
1059
1060
topic_client.alter_topic(topic_path, alter_settings)
1061
1062
# Verify changes
1063
updated_description = topic_client.describe_topic(topic_path)
1064
print(f"Updated partitions: {updated_description.partitions_count}")
1065
print(f"Updated consumers: {[c.name for c in updated_description.consumers]}")
1066
1067
manage_topic_lifecycle()
1068
```
1069
1070
### Error Handling and Monitoring
1071
1072
```python
1073
def robust_topic_processing():
1074
reader_settings = ydb.TopicReaderSettings(
1075
consumer_name="robust-consumer",
1076
topics=[ydb.TopicReaderSelector("/local/events")],
1077
read_timeout=10.0,
1078
commit_timeout=5.0
1079
)
1080
1081
retry_count = 0
1082
max_retries = 3
1083
1084
while retry_count < max_retries:
1085
try:
1086
with topic_client.reader(reader_settings) as reader:
1087
retry_count = 0 # Reset on successful connection
1088
1089
while True:
1090
try:
1091
message = reader.receive_message(timeout=30.0)
1092
1093
# Process with timeout
1094
process_start = time.time()
1095
result = process_message_with_timeout(message.data, timeout=10.0)
1096
process_time = time.time() - process_start
1097
1098
if result:
1099
message.commit()
1100
print(f"Processed message in {process_time:.2f}s")
1101
else:
1102
print("Processing failed, skipping message")
1103
message.commit() # Skip failed messages
1104
1105
except ydb.TimeoutError:
1106
print("No messages received, continuing...")
1107
continue
1108
except ydb.TopicReaderPartitionExpiredError as e:
1109
print(f"Partition expired: {e}, reconnecting...")
1110
break
1111
except Exception as e:
1112
print(f"Unexpected error: {e}")
1113
time.sleep(1.0)
1114
1115
except ydb.ConnectionError as e:
1116
retry_count += 1
1117
backoff = min(2 ** retry_count, 30)
1118
print(f"Connection failed, retrying in {backoff}s... ({retry_count}/{max_retries})")
1119
time.sleep(backoff)
1120
except KeyboardInterrupt:
1121
print("Shutting down gracefully...")
1122
break
1123
1124
if retry_count >= max_retries:
1125
print("Max retries exceeded, giving up")
1126
1127
robust_topic_processing()
1128
```
1129
1130
## Type Definitions
1131
1132
```python { .api }
1133
# Type aliases for topic operations
1134
TopicPath = str
1135
ProducerId = str
1136
ConsumerName = str
1137
MessageGroupId = str
1138
PartitionId = int
1139
MessageOffset = int
1140
SequenceNumber = int
1141
1142
# Message handling
1143
MessageData = bytes
1144
MessageMetadata = Dict[str, str]
1145
MessageHandler = Callable[[TopicReaderMessage], bool]
1146
AsyncMessageHandler = Callable[[TopicReaderMessage], Awaitable[bool]]
1147
1148
# Batch processing
1149
BatchProcessor = Callable[[List[TopicReaderMessage]], bool]
1150
AsyncBatchProcessor = Callable[[List[TopicReaderMessage]], Awaitable[bool]]
1151
```