or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdauthentication.mddata-types.mddbapi-interface.mddriver-connection.mderror-handling.mdindex.mdquery-service.mdschema-operations.mdsqlalchemy-integration.mdtable-operations.mdtopic-operations.md

topic-operations.mddocs/

0

# Topic Operations

1

2

Streaming data operations including topic creation, message publishing, message consuming, and topic administration.

3

4

## Capabilities

5

6

### Topic Client

7

8

The topic client provides comprehensive operations for managing topics and streaming data.

9

10

```python { .api }

11

class TopicClient:

12

def __init__(self, driver: Driver, settings: TopicClientSettings = None):

13

"""

14

Create topic operations client.

15

16

Args:

17

driver (Driver): YDB driver instance

18

settings (TopicClientSettings, optional): Client configuration

19

"""

20

21

def create_topic(

22

self,

23

path: str,

24

settings: CreateTopicSettings

25

):

26

"""

27

Create a new topic.

28

29

Args:

30

path (str): Topic path

31

settings (CreateTopicSettings): Topic creation configuration

32

"""

33

34

def describe_topic(

35

self,

36

path: str,

37

settings: DescribeTopicSettings = None

38

) -> TopicDescription:

39

"""

40

Get topic description and metadata.

41

42

Args:

43

path (str): Topic path

44

settings (DescribeTopicSettings, optional): Description settings

45

46

Returns:

47

TopicDescription: Topic configuration and metadata

48

"""

49

50

def alter_topic(

51

self,

52

path: str,

53

settings: AlterTopicSettings

54

):

55

"""

56

Alter topic configuration.

57

58

Args:

59

path (str): Topic path

60

settings (AlterTopicSettings): Alteration settings

61

"""

62

63

def drop_topic(

64

self,

65

path: str,

66

settings: DropTopicSettings = None

67

):

68

"""

69

Delete topic.

70

71

Args:

72

path (str): Topic path

73

settings (DropTopicSettings, optional): Drop settings

74

"""

75

76

def writer(

77

self,

78

topic_path: str,

79

producer_id: str = None,

80

settings: TopicWriterSettings = None

81

) -> TopicWriter:

82

"""

83

Create topic writer for publishing messages.

84

85

Args:

86

topic_path (str): Topic path to write to

87

producer_id (str, optional): Producer identifier

88

settings (TopicWriterSettings, optional): Writer configuration

89

90

Returns:

91

TopicWriter: Topic writer instance

92

"""

93

94

def reader(

95

self,

96

settings: TopicReaderSettings

97

) -> TopicReader:

98

"""

99

Create topic reader for consuming messages.

100

101

Args:

102

settings (TopicReaderSettings): Reader configuration

103

104

Returns:

105

TopicReader: Topic reader instance

106

"""

107

108

class TopicClientSettings:

109

def __init__(

110

self,

111

default_compression_codec: TopicCodec = None,

112

request_timeout: float = None

113

):

114

"""

115

Topic client configuration.

116

117

Args:

118

default_compression_codec (TopicCodec, optional): Default compression

119

request_timeout (float, optional): Default request timeout

120

"""

121

```

122

123

### Topic Writer

124

125

Publisher interface for sending messages to topics with batching and delivery guarantees.

126

127

```python { .api }

128

class TopicWriter:

129

def __init__(

130

self,

131

driver: Driver,

132

topic_path: str,

133

producer_id: str = None,

134

settings: TopicWriterSettings = None

135

):

136

"""

137

Create topic writer for message publishing.

138

139

Args:

140

driver (Driver): YDB driver instance

141

topic_path (str): Topic path to write to

142

producer_id (str, optional): Producer identifier

143

settings (TopicWriterSettings, optional): Writer configuration

144

"""

145

146

def write(

147

self,

148

messages: Union[TopicWriterMessage, List[TopicWriterMessage]]

149

) -> TopicWriteResult:

150

"""

151

Write messages to topic.

152

153

Args:

154

messages (Union[TopicWriterMessage, List[TopicWriterMessage]]): Messages to write

155

156

Returns:

157

TopicWriteResult: Write operation result

158

"""

159

160

def flush(self) -> TopicWriteResult:

161

"""

162

Flush pending messages and wait for acknowledgments.

163

164

Returns:

165

TopicWriteResult: Flush operation result

166

"""

167

168

def close(self, timeout: float = None):

169

"""

170

Close writer and flush pending messages.

171

172

Args:

173

timeout (float, optional): Close timeout

174

"""

175

176

def __enter__(self) -> 'TopicWriter':

177

"""

178

Enter context manager.

179

180

Returns:

181

TopicWriter: Writer instance

182

"""

183

184

def __exit__(self, exc_type, exc_val, exc_tb):

185

"""

186

Exit context manager and close writer.

187

"""

188

189

@property

190

def init_info(self) -> TopicWriterInitInfo:

191

"""Get writer initialization information."""

192

193

class TopicWriterMessage:

194

def __init__(

195

self,

196

data: bytes,

197

seq_no: int = None,

198

created_at: datetime = None,

199

message_group_id: str = None,

200

metadata: Dict[str, str] = None,

201

codec: TopicCodec = None

202

):

203

"""

204

Message for topic publishing.

205

206

Args:

207

data (bytes): Message payload

208

seq_no (int, optional): Sequence number for ordering

209

created_at (datetime, optional): Message creation timestamp

210

message_group_id (str, optional): Message group identifier

211

metadata (Dict[str, str], optional): Message metadata

212

codec (TopicCodec, optional): Message compression codec

213

"""

214

215

@property

216

def data(self) -> bytes:

217

"""Message payload."""

218

219

@property

220

def seq_no(self) -> int:

221

"""Message sequence number."""

222

223

@property

224

def created_at(self) -> datetime:

225

"""Message creation timestamp."""

226

227

@property

228

def message_group_id(self) -> str:

229

"""Message group identifier."""

230

231

@property

232

def metadata(self) -> Dict[str, str]:

233

"""Message metadata."""

234

235

@property

236

def codec(self) -> TopicCodec:

237

"""Message compression codec."""

238

239

class TopicWriterSettings:

240

def __init__(

241

self,

242

producer_id: str = None,

243

write_session_meta: Dict[str, str] = None,

244

codec: TopicCodec = None,

245

get_last_seq_no: bool = False,

246

update_token_interval: float = None,

247

partition_id: int = None,

248

message_group_id: str = None

249

):

250

"""

251

Topic writer configuration.

252

253

Args:

254

producer_id (str, optional): Producer identifier

255

write_session_meta (Dict[str, str], optional): Session metadata

256

codec (TopicCodec, optional): Default compression codec

257

get_last_seq_no (bool): Retrieve last sequence number on init

258

update_token_interval (float, optional): Token update interval

259

partition_id (int, optional): Target partition ID

260

message_group_id (str, optional): Default message group ID

261

"""

262

263

class TopicWriteResult:

264

def __init__(

265

self,

266

acks: List[TopicWriterAck] = None,

267

errors: List[TopicWriterError] = None

268

):

269

"""

270

Result of topic write operation.

271

272

Args:

273

acks (List[TopicWriterAck], optional): Acknowledged messages

274

errors (List[TopicWriterError], optional): Write errors

275

"""

276

277

@property

278

def acks(self) -> List[TopicWriterAck]:

279

"""Acknowledged messages."""

280

281

@property

282

def errors(self) -> List[TopicWriterError]:

283

"""Write errors."""

284

285

@property

286

def has_errors(self) -> bool:

287

"""True if write operation had errors."""

288

289

class TopicWriterInitInfo:

290

def __init__(

291

self,

292

topic_path: str,

293

producer_id: str,

294

last_seq_no: int = None,

295

session_id: str = None

296

):

297

"""

298

Topic writer initialization information.

299

300

Args:

301

topic_path (str): Topic path

302

producer_id (str): Producer identifier

303

last_seq_no (int, optional): Last sequence number

304

session_id (str, optional): Write session identifier

305

"""

306

307

@property

308

def topic_path(self) -> str:

309

"""Topic path."""

310

311

@property

312

def producer_id(self) -> str:

313

"""Producer identifier."""

314

315

@property

316

def last_seq_no(self) -> int:

317

"""Last sequence number."""

318

319

@property

320

def session_id(self) -> str:

321

"""Write session identifier."""

322

```

323

324

### Topic Reader

325

326

Consumer interface for reading messages from topics with automatic partition assignment and offset management.

327

328

```python { .api }

329

class TopicReader:

330

def __init__(

331

self,

332

driver: Driver,

333

settings: TopicReaderSettings

334

):

335

"""

336

Create topic reader for message consumption.

337

338

Args:

339

driver (Driver): YDB driver instance

340

settings (TopicReaderSettings): Reader configuration

341

"""

342

343

def receive_message(self, timeout: float = None) -> TopicReaderMessage:

344

"""

345

Receive next message from topic.

346

347

Args:

348

timeout (float, optional): Receive timeout

349

350

Returns:

351

TopicReaderMessage: Received message

352

"""

353

354

def receive_batch(self, timeout: float = None) -> TopicReaderBatch:

355

"""

356

Receive batch of messages from topic.

357

358

Args:

359

timeout (float, optional): Receive timeout

360

361

Returns:

362

TopicReaderBatch: Batch of messages

363

"""

364

365

def commit(self, message: TopicReaderMessage):

366

"""

367

Commit message processing.

368

369

Args:

370

message (TopicReaderMessage): Message to commit

371

"""

372

373

def commit_batch(self, batch: TopicReaderBatch):

374

"""

375

Commit batch processing.

376

377

Args:

378

batch (TopicReaderBatch): Batch to commit

379

"""

380

381

def close(self, timeout: float = None):

382

"""

383

Close reader and release resources.

384

385

Args:

386

timeout (float, optional): Close timeout

387

"""

388

389

def __enter__(self) -> 'TopicReader':

390

"""Enter context manager."""

391

392

def __exit__(self, exc_type, exc_val, exc_tb):

393

"""Exit context manager and close reader."""

394

395

def __iter__(self) -> Iterator[TopicReaderMessage]:

396

"""

397

Iterate over messages.

398

399

Returns:

400

Iterator[TopicReaderMessage]: Message iterator

401

"""

402

403

class TopicReaderMessage:

404

def __init__(

405

self,

406

data: bytes,

407

seq_no: int,

408

created_at: datetime,

409

message_group_id: str = None,

410

offset: int = None,

411

metadata: Dict[str, str] = None,

412

partition_session: 'PartitionSession' = None

413

):

414

"""

415

Message received from topic.

416

417

Args:

418

data (bytes): Message payload

419

seq_no (int): Message sequence number

420

created_at (datetime): Message creation timestamp

421

message_group_id (str, optional): Message group identifier

422

offset (int, optional): Message offset in partition

423

metadata (Dict[str, str], optional): Message metadata

424

partition_session (PartitionSession, optional): Source partition session

425

"""

426

427

@property

428

def data(self) -> bytes:

429

"""Message payload."""

430

431

@property

432

def seq_no(self) -> int:

433

"""Message sequence number."""

434

435

@property

436

def created_at(self) -> datetime:

437

"""Message creation timestamp."""

438

439

@property

440

def message_group_id(self) -> str:

441

"""Message group identifier."""

442

443

@property

444

def offset(self) -> int:

445

"""Message offset in partition."""

446

447

@property

448

def metadata(self) -> Dict[str, str]:

449

"""Message metadata."""

450

451

@property

452

def partition_session(self) -> 'PartitionSession':

453

"""Source partition session."""

454

455

def commit(self):

456

"""Commit this message."""

457

458

class TopicReaderBatch:

459

def __init__(

460

self,

461

messages: List[TopicReaderMessage],

462

partition_session: 'PartitionSession'

463

):

464

"""

465

Batch of messages from topic.

466

467

Args:

468

messages (List[TopicReaderMessage]): Messages in batch

469

partition_session (PartitionSession): Source partition session

470

"""

471

472

@property

473

def messages(self) -> List[TopicReaderMessage]:

474

"""Messages in batch."""

475

476

@property

477

def partition_session(self) -> 'PartitionSession':

478

"""Source partition session."""

479

480

def commit(self):

481

"""Commit entire batch."""

482

483

def __iter__(self) -> Iterator[TopicReaderMessage]:

484

"""Iterate over messages in batch."""

485

486

def __len__(self) -> int:

487

"""Get number of messages in batch."""

488

489

class TopicReaderSettings:

490

def __init__(

491

self,

492

consumer_name: str,

493

topics: List[TopicReaderSelector],

494

buffer_size_bytes: int = None,

495

max_memory_usage_bytes: int = None,

496

max_lag: timedelta = None,

497

read_timeout: float = None,

498

commit_timeout: float = None,

499

with_metadata_fields: List[str] = None,

500

decompress_messages: bool = True

501

):

502

"""

503

Topic reader configuration.

504

505

Args:

506

consumer_name (str): Consumer identifier

507

topics (List[TopicReaderSelector]): Topics to read from

508

buffer_size_bytes (int, optional): Read buffer size

509

max_memory_usage_bytes (int, optional): Maximum memory usage

510

max_lag (timedelta, optional): Maximum allowed lag

511

read_timeout (float, optional): Read operation timeout

512

commit_timeout (float, optional): Commit operation timeout

513

with_metadata_fields (List[str], optional): Metadata fields to include

514

decompress_messages (bool): Automatically decompress messages

515

"""

516

517

class TopicReaderSelector:

518

def __init__(

519

self,

520

path: str,

521

partitions: List[int] = None,

522

read_from: datetime = None,

523

max_lag: timedelta = None

524

):

525

"""

526

Topic selection for reader.

527

528

Args:

529

path (str): Topic path

530

partitions (List[int], optional): Specific partitions to read

531

read_from (datetime, optional): Start reading from timestamp

532

max_lag (timedelta, optional): Maximum allowed lag for this topic

533

"""

534

535

@property

536

def path(self) -> str:

537

"""Topic path."""

538

539

@property

540

def partitions(self) -> List[int]:

541

"""Specific partitions to read."""

542

543

@property

544

def read_from(self) -> datetime:

545

"""Start reading from timestamp."""

546

547

@property

548

def max_lag(self) -> timedelta:

549

"""Maximum allowed lag."""

550

```

551

552

### Topic Configuration

553

554

Topic creation and management settings with partitioning and retention policies.

555

556

```python { .api }

557

class TopicDescription:

558

def __init__(

559

self,

560

path: str,

561

partitions_count: int = None,

562

retention_period: timedelta = None,

563

retention_storage_mb: int = None,

564

supported_codecs: List[TopicCodec] = None,

565

partition_write_speed_bytes_per_second: int = None,

566

partition_write_burst_bytes: int = None,

567

attributes: Dict[str, str] = None,

568

consumers: List[TopicConsumer] = None,

569

metering_mode: TopicMeteringMode = None,

570

partition_count_limit: int = None

571

):

572

"""

573

Topic configuration and metadata.

574

575

Args:

576

path (str): Topic path

577

partitions_count (int, optional): Number of partitions

578

retention_period (timedelta, optional): Message retention period

579

retention_storage_mb (int, optional): Storage retention limit in MB

580

supported_codecs (List[TopicCodec], optional): Supported compression codecs

581

partition_write_speed_bytes_per_second (int, optional): Write speed limit per partition

582

partition_write_burst_bytes (int, optional): Write burst limit per partition

583

attributes (Dict[str, str], optional): Topic attributes

584

consumers (List[TopicConsumer], optional): Topic consumers

585

metering_mode (TopicMeteringMode, optional): Metering mode

586

partition_count_limit (int, optional): Maximum partition count

587

"""

588

589

@property

590

def path(self) -> str:

591

"""Topic path."""

592

593

@property

594

def partitions_count(self) -> int:

595

"""Number of partitions."""

596

597

@property

598

def retention_period(self) -> timedelta:

599

"""Message retention period."""

600

601

@property

602

def supported_codecs(self) -> List[TopicCodec]:

603

"""Supported compression codecs."""

604

605

@property

606

def consumers(self) -> List[TopicConsumer]:

607

"""Topic consumers."""

608

609

class CreateTopicSettings:

610

def __init__(

611

self,

612

partitions_count: int = 1,

613

retention_period: timedelta = None,

614

retention_storage_mb: int = None,

615

supported_codecs: List[TopicCodec] = None,

616

partition_write_speed_bytes_per_second: int = None,

617

partition_write_burst_bytes: int = None,

618

attributes: Dict[str, str] = None,

619

consumers: List[TopicConsumer] = None,

620

metering_mode: TopicMeteringMode = None

621

):

622

"""

623

Settings for topic creation.

624

625

Args:

626

partitions_count (int): Number of partitions to create

627

retention_period (timedelta, optional): Message retention period

628

retention_storage_mb (int, optional): Storage retention limit

629

supported_codecs (List[TopicCodec], optional): Allowed compression codecs

630

partition_write_speed_bytes_per_second (int, optional): Write speed limit

631

partition_write_burst_bytes (int, optional): Write burst limit

632

attributes (Dict[str, str], optional): Topic attributes

633

consumers (List[TopicConsumer], optional): Initial consumers

634

metering_mode (TopicMeteringMode, optional): Billing metering mode

635

"""

636

637

class AlterTopicSettings:

638

def __init__(

639

self,

640

alter_partitions_count: int = None,

641

set_retention_period: timedelta = None,

642

set_retention_storage_mb: int = None,

643

set_supported_codecs: List[TopicCodec] = None,

644

set_partition_write_speed_bytes_per_second: int = None,

645

set_partition_write_burst_bytes: int = None,

646

alter_attributes: Dict[str, str] = None,

647

add_consumers: List[TopicConsumer] = None,

648

drop_consumers: List[str] = None,

649

alter_consumers: List[TopicAlterConsumer] = None,

650

set_metering_mode: TopicMeteringMode = None

651

):

652

"""

653

Settings for topic alteration.

654

655

Args:

656

alter_partitions_count (int, optional): New partition count

657

set_retention_period (timedelta, optional): New retention period

658

set_retention_storage_mb (int, optional): New storage retention limit

659

set_supported_codecs (List[TopicCodec], optional): New codec list

660

set_partition_write_speed_bytes_per_second (int, optional): New speed limit

661

set_partition_write_burst_bytes (int, optional): New burst limit

662

alter_attributes (Dict[str, str], optional): Attribute changes

663

add_consumers (List[TopicConsumer], optional): Consumers to add

664

drop_consumers (List[str], optional): Consumer names to remove

665

alter_consumers (List[TopicAlterConsumer], optional): Consumer modifications

666

set_metering_mode (TopicMeteringMode, optional): New metering mode

667

"""

668

669

class TopicConsumer:

670

def __init__(

671

self,

672

name: str,

673

supported_codecs: List[TopicCodec] = None,

674

read_from: datetime = None,

675

attributes: Dict[str, str] = None,

676

important: bool = False

677

):

678

"""

679

Topic consumer configuration.

680

681

Args:

682

name (str): Consumer name

683

supported_codecs (List[TopicCodec], optional): Supported codecs

684

read_from (datetime, optional): Start reading from timestamp

685

attributes (Dict[str, str], optional): Consumer attributes

686

important (bool): Whether consumer is important for retention

687

"""

688

689

@property

690

def name(self) -> str:

691

"""Consumer name."""

692

693

@property

694

def supported_codecs(self) -> List[TopicCodec]:

695

"""Supported compression codecs."""

696

697

@property

698

def read_from(self) -> datetime:

699

"""Start reading timestamp."""

700

701

@property

702

def important(self) -> bool:

703

"""Whether consumer is important."""

704

705

class TopicCodec(enum.Enum):

706

"""Message compression codecs."""

707

RAW = "raw"

708

GZIP = "gzip"

709

LZOP = "lzop"

710

ZSTD = "zstd"

711

712

class TopicMeteringMode(enum.Enum):

713

"""Topic metering modes for billing."""

714

UNSPECIFIED = "unspecified"

715

RESERVED_CAPACITY = "reserved_capacity"

716

REQUEST_UNITS = "request_units"

717

```

718

719

### Async Topic Operations

720

721

Asynchronous versions of topic operations for high-performance applications.

722

723

```python { .api }

724

class TopicWriterAsyncIO:

725

def __init__(

726

self,

727

driver: Driver,

728

topic_path: str,

729

producer_id: str = None,

730

settings: TopicWriterSettings = None

731

):

732

"""

733

Asynchronous topic writer.

734

735

Args:

736

driver (Driver): Async YDB driver instance

737

topic_path (str): Topic path to write to

738

producer_id (str, optional): Producer identifier

739

settings (TopicWriterSettings, optional): Writer configuration

740

"""

741

742

async def __aenter__(self) -> 'TopicWriterAsyncIO':

743

"""Enter async context manager."""

744

745

async def __aexit__(self, exc_type, exc_val, exc_tb):

746

"""Exit async context manager."""

747

748

async def write(

749

self,

750

messages: Union[TopicWriterMessage, List[TopicWriterMessage]]

751

) -> TopicWriteResult:

752

"""

753

Write messages asynchronously.

754

755

Args:

756

messages (Union[TopicWriterMessage, List[TopicWriterMessage]]): Messages to write

757

758

Returns:

759

TopicWriteResult: Write operation result

760

"""

761

762

async def flush(self) -> TopicWriteResult:

763

"""Flush pending messages asynchronously."""

764

765

async def close(self, timeout: float = None):

766

"""Close writer asynchronously."""

767

768

class TopicReaderAsyncIO:

769

def __init__(

770

self,

771

driver: Driver,

772

settings: TopicReaderSettings

773

):

774

"""

775

Asynchronous topic reader.

776

777

Args:

778

driver (Driver): Async YDB driver instance

779

settings (TopicReaderSettings): Reader configuration

780

"""

781

782

async def __aenter__(self) -> 'TopicReaderAsyncIO':

783

"""Enter async context manager."""

784

785

async def __aexit__(self, exc_type, exc_val, exc_tb):

786

"""Exit async context manager."""

787

788

async def receive_message(self, timeout: float = None) -> TopicReaderMessage:

789

"""

790

Receive message asynchronously.

791

792

Args:

793

timeout (float, optional): Receive timeout

794

795

Returns:

796

TopicReaderMessage: Received message

797

"""

798

799

async def receive_batch(self, timeout: float = None) -> TopicReaderBatch:

800

"""

801

Receive batch asynchronously.

802

803

Args:

804

timeout (float, optional): Receive timeout

805

806

Returns:

807

TopicReaderBatch: Batch of messages

808

"""

809

810

def __aiter__(self) -> AsyncIterator[TopicReaderMessage]:

811

"""Async iterator over messages."""

812

813

async def __anext__(self) -> TopicReaderMessage:

814

"""Get next message asynchronously."""

815

816

async def close(self, timeout: float = None):

817

"""Close reader asynchronously."""

818

819

class TopicClientAsyncIO:

820

def __init__(self, driver: Driver, settings: TopicClientSettings = None):

821

"""Asynchronous topic client."""

822

823

async def create_topic(self, path: str, settings: CreateTopicSettings):

824

"""Create topic asynchronously."""

825

826

async def describe_topic(

827

self,

828

path: str,

829

settings: DescribeTopicSettings = None

830

) -> TopicDescription:

831

"""Describe topic asynchronously."""

832

833

async def alter_topic(self, path: str, settings: AlterTopicSettings):

834

"""Alter topic asynchronously."""

835

836

async def drop_topic(self, path: str, settings: DropTopicSettings = None):

837

"""Drop topic asynchronously."""

838

```

839

840

## Usage Examples

841

842

### Basic Topic Publishing

843

844

```python

845

import ydb

846

847

# Create driver and topic client

848

driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")

849

driver.wait(fail_fast=True)

850

851

topic_client = ydb.TopicClient(driver)

852

853

# Create topic

854

create_settings = ydb.CreateTopicSettings(

855

partitions_count=3,

856

retention_period=timedelta(days=7),

857

supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP]

858

)

859

860

topic_client.create_topic("/local/events", create_settings)

861

862

# Write messages

863

with topic_client.writer("/local/events", producer_id="producer-1") as writer:

864

# Write single message

865

message = ydb.TopicWriterMessage(

866

data=b'{"event": "user_login", "user_id": 123}',

867

message_group_id="user-123"

868

)

869

writer.write(message)

870

871

# Write batch of messages

872

messages = [

873

ydb.TopicWriterMessage(

874

data=f'{{"event": "page_view", "page": "/home", "user_id": {i}}}'.encode(),

875

message_group_id=f"user-{i}"

876

)

877

for i in range(100, 110)

878

]

879

880

result = writer.write(messages)

881

writer.flush()

882

883

print(f"Written {len(result.acks)} messages")

884

```

885

886

### Message Consumption

887

888

```python

889

# Create consumer

890

consumer_settings = ydb.TopicReaderSettings(

891

consumer_name="analytics-consumer",

892

topics=[

893

ydb.TopicReaderSelector(

894

path="/local/events",

895

read_from=datetime.now() - timedelta(hours=1)

896

)

897

],

898

buffer_size_bytes=1024*1024, # 1MB buffer

899

max_lag=timedelta(minutes=5)

900

)

901

902

# Read messages

903

with topic_client.reader(consumer_settings) as reader:

904

for message in reader:

905

try:

906

# Process message

907

event_data = json.loads(message.data.decode())

908

print(f"Processing event: {event_data}")

909

910

# Commit message after successful processing

911

message.commit()

912

913

except json.JSONDecodeError:

914

print(f"Failed to parse message: {message.data}")

915

# Still commit to skip malformed messages

916

message.commit()

917

except Exception as e:

918

print(f"Error processing message: {e}")

919

# Don't commit - message will be redelivered

920

break

921

```

922

923

### Batch Processing

924

925

```python

926

# Process messages in batches for better throughput

927

with topic_client.reader(consumer_settings) as reader:

928

while True:

929

try:

930

batch = reader.receive_batch(timeout=30.0)

931

932

if not batch.messages:

933

continue

934

935

# Process batch

936

events = []

937

for message in batch:

938

try:

939

event = json.loads(message.data.decode())

940

events.append(event)

941

except json.JSONDecodeError:

942

print(f"Skipping malformed message: {message.data}")

943

944

# Bulk insert to database

945

if events:

946

process_events_batch(events)

947

948

# Commit entire batch

949

batch.commit()

950

print(f"Processed batch of {len(batch)} messages")

951

952

except ydb.TimeoutError:

953

print("No messages received in timeout period")

954

continue

955

except KeyboardInterrupt:

956

print("Shutting down consumer...")

957

break

958

```

959

960

### Async Topic Operations

961

962

```python

963

import asyncio

964

import ydb.aio as ydb_aio

965

966

async def async_topic_producer():

967

async with ydb_aio.Driver(...) as driver:

968

topic_client = ydb.TopicClientAsyncIO(driver)

969

970

# Create topic

971

await topic_client.create_topic(

972

"/local/async_events",

973

ydb.CreateTopicSettings(partitions_count=5)

974

)

975

976

# Write messages asynchronously

977

async with ydb.TopicWriterAsyncIO(

978

driver,

979

"/local/async_events",

980

producer_id="async-producer"

981

) as writer:

982

983

# Generate and write messages

984

for i in range(1000):

985

message = ydb.TopicWriterMessage(

986

data=f'{{"id": {i}, "timestamp": "{datetime.now().isoformat()}"}}'.encode(),

987

seq_no=i

988

)

989

990

await writer.write(message)

991

992

if i % 100 == 0:

993

await writer.flush()

994

print(f"Written {i+1} messages")

995

996

async def async_topic_consumer():

997

async with ydb_aio.Driver(...) as driver:

998

reader_settings = ydb.TopicReaderSettings(

999

consumer_name="async-consumer",

1000

topics=[ydb.TopicReaderSelector("/local/async_events")]

1001

)

1002

1003

async with ydb.TopicReaderAsyncIO(driver, reader_settings) as reader:

1004

async for message in reader:

1005

# Process message asynchronously

1006

await process_message_async(message.data)

1007

message.commit()

1008

1009

# Run async operations

1010

asyncio.run(async_topic_producer())

1011

asyncio.run(async_topic_consumer())

1012

```

1013

1014

### Topic Administration

1015

1016

```python

1017

def manage_topic_lifecycle():

1018

topic_client = ydb.TopicClient(driver)

1019

topic_path = "/local/user_events"

1020

1021

# Create topic with consumers

1022

consumers = [

1023

ydb.TopicConsumer(

1024

name="analytics",

1025

important=True,

1026

read_from=datetime.now()

1027

),

1028

ydb.TopicConsumer(

1029

name="archival",

1030

important=False

1031

)

1032

]

1033

1034

create_settings = ydb.CreateTopicSettings(

1035

partitions_count=10,

1036

retention_period=timedelta(days=30),

1037

retention_storage_mb=10000,

1038

consumers=consumers,

1039

attributes={"team": "analytics", "env": "prod"}

1040

)

1041

1042

topic_client.create_topic(topic_path, create_settings)

1043

1044

# Describe topic

1045

description = topic_client.describe_topic(topic_path)

1046

print(f"Topic: {description.path}")

1047

print(f"Partitions: {description.partitions_count}")

1048

print(f"Retention: {description.retention_period}")

1049

print(f"Consumers: {[c.name for c in description.consumers]}")

1050

1051

# Alter topic - add partition and consumer

1052

alter_settings = ydb.AlterTopicSettings(

1053

alter_partitions_count=15,

1054

add_consumers=[

1055

ydb.TopicConsumer(name="realtime", important=True)

1056

],

1057

set_retention_period=timedelta(days=45)

1058

)

1059

1060

topic_client.alter_topic(topic_path, alter_settings)

1061

1062

# Verify changes

1063

updated_description = topic_client.describe_topic(topic_path)

1064

print(f"Updated partitions: {updated_description.partitions_count}")

1065

print(f"Updated consumers: {[c.name for c in updated_description.consumers]}")

1066

1067

manage_topic_lifecycle()

1068

```

1069

1070

### Error Handling and Monitoring

1071

1072

```python

1073

def robust_topic_processing():

1074

reader_settings = ydb.TopicReaderSettings(

1075

consumer_name="robust-consumer",

1076

topics=[ydb.TopicReaderSelector("/local/events")],

1077

read_timeout=10.0,

1078

commit_timeout=5.0

1079

)

1080

1081

retry_count = 0

1082

max_retries = 3

1083

1084

while retry_count < max_retries:

1085

try:

1086

with topic_client.reader(reader_settings) as reader:

1087

retry_count = 0 # Reset on successful connection

1088

1089

while True:

1090

try:

1091

message = reader.receive_message(timeout=30.0)

1092

1093

# Process with timeout

1094

process_start = time.time()

1095

result = process_message_with_timeout(message.data, timeout=10.0)

1096

process_time = time.time() - process_start

1097

1098

if result:

1099

message.commit()

1100

print(f"Processed message in {process_time:.2f}s")

1101

else:

1102

print("Processing failed, skipping message")

1103

message.commit() # Skip failed messages

1104

1105

except ydb.TimeoutError:

1106

print("No messages received, continuing...")

1107

continue

1108

except ydb.TopicReaderPartitionExpiredError as e:

1109

print(f"Partition expired: {e}, reconnecting...")

1110

break

1111

except Exception as e:

1112

print(f"Unexpected error: {e}")

1113

time.sleep(1.0)

1114

1115

except ydb.ConnectionError as e:

1116

retry_count += 1

1117

backoff = min(2 ** retry_count, 30)

1118

print(f"Connection failed, retrying in {backoff}s... ({retry_count}/{max_retries})")

1119

time.sleep(backoff)

1120

except KeyboardInterrupt:

1121

print("Shutting down gracefully...")

1122

break

1123

1124

if retry_count >= max_retries:

1125

print("Max retries exceeded, giving up")

1126

1127

robust_topic_processing()

1128

```

1129

1130

## Type Definitions

1131

1132

```python { .api }

1133

# Type aliases for topic operations

1134

TopicPath = str

1135

ProducerId = str

1136

ConsumerName = str

1137

MessageGroupId = str

1138

PartitionId = int

1139

MessageOffset = int

1140

SequenceNumber = int

1141

1142

# Message handling

1143

MessageData = bytes

1144

MessageMetadata = Dict[str, str]

1145

MessageHandler = Callable[[TopicReaderMessage], bool]

1146

AsyncMessageHandler = Callable[[TopicReaderMessage], Awaitable[bool]]

1147

1148

# Batch processing

1149

BatchProcessor = Callable[[List[TopicReaderMessage]], bool]

1150

AsyncBatchProcessor = Callable[[List[TopicReaderMessage]], Awaitable[bool]]

1151

```