or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdapplication-framework.mdbot-api.mdfiles.mdfilters.mdhandlers.mdindex.mdkeyboards.mdtelegram-types.md

advanced-features.mddocs/

0

# Advanced Features

1

2

Additional capabilities including job queues for scheduled tasks, persistence for data storage, rate limiting, conversation management, and webhook configuration.

3

4

## Capabilities

5

6

### Job Queue

7

8

Schedule and manage recurring or one-time tasks.

9

10

```python { .api }

11

class JobQueue:

12

def __init__(self, scheduler=None): ...

13

14

def run_once(

15

self,

16

callback: callable,

17

when: datetime.datetime | datetime.timedelta | int | float,

18

data: object = None,

19

name: str = None,

20

chat_id: int | str = None,

21

user_id: int = None,

22

job_kwargs: dict = None

23

) -> Job: ...

24

25

def run_repeating(

26

self,

27

callback: callable,

28

interval: datetime.timedelta | int | float,

29

first: datetime.datetime | datetime.timedelta | int | float = None,

30

last: datetime.datetime | datetime.timedelta | int | float = None,

31

data: object = None,

32

name: str = None,

33

chat_id: int | str = None,

34

user_id: int = None,

35

job_kwargs: dict = None

36

) -> Job: ...

37

38

def run_daily(

39

self,

40

callback: callable,

41

time: datetime.time,

42

days: tuple[int, ...] = tuple(range(7)),

43

data: object = None,

44

name: str = None,

45

chat_id: int | str = None,

46

user_id: int = None,

47

job_kwargs: dict = None

48

) -> Job: ...

49

50

def run_monthly(

51

self,

52

callback: callable,

53

when: datetime.time,

54

day: int,

55

data: object = None,

56

name: str = None,

57

chat_id: int | str = None,

58

user_id: int = None,

59

job_kwargs: dict = None

60

) -> Job: ...

61

62

def run_custom(

63

self,

64

callback: callable,

65

job_kwargs: dict,

66

data: object = None,

67

name: str = None,

68

chat_id: int | str = None,

69

user_id: int = None

70

) -> Job: ...

71

72

def get_jobs_by_name(self, name: str) -> tuple[Job, ...]: ...

73

def jobs(self) -> tuple[Job, ...]: ...

74

75

async def start(self) -> None: ...

76

async def stop(self) -> None: ...

77

78

class Job:

79

callback: callable

80

data: object

81

name: str | None

82

chat_id: int | str | None

83

user_id: int | None

84

85

def schedule_removal(self) -> None: ...

86

def remove(self) -> None: ...

87

88

@property

89

def removed(self) -> bool: ...

90

@property

91

def enabled(self) -> bool: ...

92

@enabled.setter

93

def enabled(self, status: bool) -> None: ...

94

95

@property

96

def next_t(self) -> datetime.datetime | None: ...

97

```

98

99

Usage examples:

100

101

```python

102

import datetime

103

from telegram.ext import ContextTypes

104

105

async def send_reminder(context: ContextTypes.DEFAULT_TYPE):

106

"""Send a reminder message."""

107

job = context.job

108

await context.bot.send_message(

109

chat_id=job.chat_id,

110

text=f"Reminder: {job.data}"

111

)

112

113

async def daily_backup(context: ContextTypes.DEFAULT_TYPE):

114

"""Perform daily backup."""

115

# Backup logic here

116

print("Performing daily backup...")

117

118

# Schedule one-time job (in 10 seconds)

119

job_queue.run_once(

120

send_reminder,

121

when=10,

122

data="Meeting in conference room",

123

chat_id=chat_id,

124

name="meeting_reminder"

125

)

126

127

# Schedule repeating job (every 30 seconds)

128

job_queue.run_repeating(

129

send_reminder,

130

interval=30,

131

first=5, # Start after 5 seconds

132

data="Regular check-in",

133

chat_id=chat_id

134

)

135

136

# Schedule daily job (every day at 9:00 AM)

137

job_queue.run_daily(

138

daily_backup,

139

time=datetime.time(9, 0, 0),

140

name="daily_backup"

141

)

142

143

# Schedule weekly job (Mondays and Fridays at 2:30 PM)

144

job_queue.run_daily(

145

send_reminder,

146

time=datetime.time(14, 30, 0),

147

days=(0, 4), # Monday=0, Friday=4

148

data="Weekly team meeting",

149

chat_id=chat_id

150

)

151

152

# Schedule monthly job (15th of each month at noon)

153

job_queue.run_monthly(

154

send_reminder,

155

when=datetime.time(12, 0, 0),

156

day=15,

157

data="Monthly report due",

158

chat_id=chat_id

159

)

160

161

# Remove jobs by name

162

jobs = job_queue.get_jobs_by_name("meeting_reminder")

163

for job in jobs:

164

job.remove()

165

166

# Disable/enable jobs

167

job.enabled = False # Pause job

168

job.enabled = True # Resume job

169

```

170

171

### Persistence

172

173

Store and retrieve bot data across restarts.

174

175

```python { .api }

176

class BasePersistence:

177

def __init__(self, store_data: PersistenceInput = None, update_interval: float = 60): ...

178

179

async def get_bot_data(self) -> dict: ...

180

async def update_bot_data(self, data: dict) -> None: ...

181

async def refresh_bot_data(self, bot_data: dict) -> None: ...

182

183

async def get_chat_data(self) -> dict[int, dict]: ...

184

async def update_chat_data(self, chat_id: int, data: dict) -> None: ...

185

async def refresh_chat_data(self, chat_id: int, chat_data: dict) -> None: ...

186

async def drop_chat_data(self, chat_id: int) -> None: ...

187

188

async def get_user_data(self) -> dict[int, dict]: ...

189

async def update_user_data(self, user_id: int, data: dict) -> None: ...

190

async def refresh_user_data(self, user_id: int, user_data: dict) -> None: ...

191

async def drop_user_data(self, user_id: int) -> None: ...

192

193

async def get_callback_data(self) -> tuple[list[tuple], dict]: ...

194

async def update_callback_data(self, data: tuple[list[tuple], dict]) -> None: ...

195

196

async def get_conversations(self, name: str) -> dict: ...

197

async def update_conversation(self, name: str, key: tuple, new_state: object) -> None: ...

198

199

async def flush(self) -> None: ...

200

201

class PicklePersistence(BasePersistence):

202

def __init__(

203

self,

204

filepath: str,

205

store_data: PersistenceInput = None,

206

single_file: bool = True,

207

on_flush: bool = False,

208

update_interval: float = 60

209

): ...

210

211

class DictPersistence(BasePersistence):

212

def __init__(

213

self,

214

bot_data: dict = None,

215

chat_data: dict = None,

216

user_data: dict = None,

217

callback_data: tuple = None,

218

conversations: dict = None,

219

store_data: PersistenceInput = None,

220

update_interval: float = 60

221

): ...

222

223

class PersistenceInput:

224

def __init__(

225

self,

226

bot_data: bool = True,

227

chat_data: bool = True,

228

user_data: bool = True,

229

callback_data: bool = True

230

): ...

231

```

232

233

Usage examples:

234

235

```python

236

from telegram.ext import PicklePersistence, DictPersistence, PersistenceInput

237

238

# Pickle persistence (file-based)

239

persistence = PicklePersistence(filepath="bot_data.pkl")

240

241

application = (

242

ApplicationBuilder()

243

.token("YOUR_BOT_TOKEN")

244

.persistence(persistence)

245

.build()

246

)

247

248

# Dictionary persistence (memory-based)

249

persistence = DictPersistence()

250

251

# Custom persistence configuration

252

persistence_input = PersistenceInput(

253

bot_data=True,

254

chat_data=True,

255

user_data=True,

256

callback_data=False # Don't store callback data

257

)

258

persistence = PicklePersistence(

259

filepath="bot_data.pkl",

260

store_data=persistence_input

261

)

262

263

# Using persistence in handlers

264

async def set_user_setting(update, context):

265

user_data = context.user_data

266

user_data['notifications'] = True

267

user_data['timezone'] = 'UTC'

268

await update.message.reply_text("Settings saved!")

269

270

async def get_user_setting(update, context):

271

user_data = context.user_data

272

notifications = user_data.get('notifications', False)

273

timezone = user_data.get('timezone', 'UTC')

274

await update.message.reply_text(

275

f"Notifications: {notifications}, Timezone: {timezone}"

276

)

277

278

# Bot-wide data

279

async def update_stats(update, context):

280

bot_data = context.bot_data

281

bot_data.setdefault('message_count', 0)

282

bot_data['message_count'] += 1

283

284

if bot_data['message_count'] % 1000 == 0:

285

await update.message.reply_text(

286

f"Bot has processed {bot_data['message_count']} messages!"

287

)

288

289

# Chat-specific data

290

async def set_chat_language(update, context):

291

chat_data = context.chat_data

292

args = context.args

293

if args:

294

chat_data['language'] = args[0]

295

await update.message.reply_text(f"Language set to: {args[0]}")

296

else:

297

current_lang = chat_data.get('language', 'en')

298

await update.message.reply_text(f"Current language: {current_lang}")

299

```

300

301

### Rate Limiting

302

303

Control request rates to avoid hitting Telegram's limits.

304

305

```python { .api }

306

class BaseRateLimiter:

307

async def initialize(self) -> None: ...

308

async def shutdown(self) -> None: ...

309

async def process_request(

310

self,

311

callback: callable,

312

args: tuple,

313

kwargs: dict,

314

endpoint: str,

315

data: dict,

316

rate_limit_args: tuple

317

) -> object: ...

318

319

class AIORateLimiter(BaseRateLimiter):

320

def __init__(

321

self,

322

max_retries: int = 0,

323

on_retry: callable = None,

324

on_failure: callable = None

325

): ...

326

```

327

328

Usage example:

329

330

```python

331

from telegram.ext import AIORateLimiter

332

333

# Create rate limiter

334

rate_limiter = AIORateLimiter(

335

max_retries=3,

336

on_retry=lambda retry_state: print(f"Retrying request {retry_state}"),

337

on_failure=lambda failure_state: print(f"Request failed: {failure_state}")

338

)

339

340

# Use in application

341

application = (

342

ApplicationBuilder()

343

.token("YOUR_BOT_TOKEN")

344

.rate_limiter(rate_limiter)

345

.build()

346

)

347

```

348

349

### Conversation Management

350

351

Advanced conversation handling beyond basic ConversationHandler.

352

353

```python

354

# Nested conversations

355

from telegram.ext import ConversationHandler

356

357

# States for main conversation

358

MAIN_MENU, SETTINGS, PROFILE = range(3)

359

360

# States for settings sub-conversation

361

NOTIFICATIONS, LANGUAGE, THEME = range(3, 6)

362

363

# Sub-conversation for settings

364

settings_conv = ConversationHandler(

365

entry_points=[MessageHandler(filters.Regex('^Settings$'), enter_settings)],

366

states={

367

NOTIFICATIONS: [MessageHandler(filters.TEXT, handle_notifications)],

368

LANGUAGE: [MessageHandler(filters.TEXT, handle_language)],

369

THEME: [MessageHandler(filters.TEXT, handle_theme)],

370

},

371

fallbacks=[MessageHandler(filters.Regex('^Back$'), back_to_main)],

372

map_to_parent={

373

# Map states back to parent conversation

374

MAIN_MENU: MAIN_MENU,

375

ConversationHandler.END: MAIN_MENU,

376

}

377

)

378

379

# Main conversation with nested sub-conversation

380

main_conv = ConversationHandler(

381

entry_points=[CommandHandler('start', start)],

382

states={

383

MAIN_MENU: [

384

settings_conv, # Nested conversation

385

MessageHandler(filters.Regex('^Profile$'), enter_profile),

386

],

387

PROFILE: [MessageHandler(filters.TEXT, handle_profile)],

388

},

389

fallbacks=[CommandHandler('cancel', cancel)],

390

)

391

392

# Per-message conversations

393

per_message_conv = ConversationHandler(

394

entry_points=[CommandHandler('quiz', start_quiz)],

395

states={

396

1: [MessageHandler(filters.TEXT, question_1)],

397

2: [MessageHandler(filters.TEXT, question_2)],

398

},

399

fallbacks=[CommandHandler('cancel', cancel_quiz)],

400

per_message=True, # Separate conversation per message

401

)

402

```

403

404

### Webhook Configuration

405

406

Configure webhook for production deployment.

407

408

```python

409

# Webhook with SSL certificate

410

application.run_webhook(

411

listen="0.0.0.0",

412

port=8443,

413

url_path="webhook",

414

key="private.key",

415

cert="cert.pem",

416

webhook_url="https://yourdomain.com:8443/webhook",

417

drop_pending_updates=True

418

)

419

420

# Webhook behind reverse proxy

421

application.run_webhook(

422

listen="127.0.0.1",

423

port=5000,

424

url_path="telegram-webhook",

425

webhook_url="https://yourdomain.com/telegram-webhook",

426

secret_token="your_secret_token"

427

)

428

429

# Custom webhook handling

430

from telegram import Update

431

import json

432

433

async def webhook_handler(request):

434

"""Custom webhook handler."""

435

body = await request.body()

436

update_dict = json.loads(body)

437

update = Update.de_json(update_dict, application.bot)

438

await application.process_update(update)

439

return {"status": "ok"}

440

```

441

442

### Error Handling and Logging

443

444

Comprehensive error handling and logging setup.

445

446

```python

447

import logging

448

from telegram.error import TelegramError, BadRequest, Forbidden, NetworkError

449

450

# Configure logging

451

logging.basicConfig(

452

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

453

level=logging.INFO

454

)

455

logger = logging.getLogger(__name__)

456

457

# Error handler

458

async def error_handler(update, context):

459

"""Handle errors in the application."""

460

logger.error('Update "%s" caused error "%s"', update, context.error)

461

462

if isinstance(context.error, BadRequest):

463

# Handle bad requests (invalid parameters, etc.)

464

if update and update.message:

465

await update.message.reply_text("Sorry, there was an error with your request.")

466

467

elif isinstance(context.error, Forbidden):

468

# Handle forbidden errors (blocked by user, insufficient permissions)

469

logger.warning("Bot was blocked by user or lacks permissions")

470

471

elif isinstance(context.error, NetworkError):

472

# Handle network errors (connection issues, timeouts)

473

logger.warning("Network error occurred, retrying...")

474

475

else:

476

# Handle other Telegram errors

477

logger.error("Unexpected error: %s", context.error)

478

479

application.add_error_handler(error_handler)

480

481

# Graceful shutdown

482

import signal

483

import asyncio

484

485

def signal_handler(signum, frame):

486

"""Handle shutdown signals."""

487

logger.info("Received signal %s, shutting down gracefully...", signum)

488

asyncio.create_task(application.shutdown())

489

490

signal.signal(signal.SIGINT, signal_handler)

491

signal.signal(signal.SIGTERM, signal_handler)

492

```

493

494

### Custom Context Types

495

496

Create custom context classes for additional functionality.

497

498

```python

499

from telegram.ext import CallbackContext, ContextTypes

500

501

class CustomContext(CallbackContext):

502

"""Custom context with additional features."""

503

504

def __init__(self, application, chat_id=None, user_id=None):

505

super().__init__(application, chat_id, user_id)

506

self._database = None

507

508

@property

509

def database(self):

510

"""Lazy database connection."""

511

if self._database is None:

512

self._database = get_database_connection()

513

return self._database

514

515

async def log_action(self, action: str):

516

"""Log user action to database."""

517

await self.database.log_user_action(

518

user_id=self.user_id,

519

chat_id=self.chat_id,

520

action=action

521

)

522

523

# Configure custom context

524

context_types = ContextTypes(context=CustomContext)

525

application = (

526

ApplicationBuilder()

527

.token("YOUR_BOT_TOKEN")

528

.context_types(context_types)

529

.build()

530

)

531

532

# Use custom context in handlers

533

async def custom_handler(update: Update, context: CustomContext):

534

await context.log_action("button_pressed")

535

# Access custom database property

536

user_stats = await context.database.get_user_stats(context.user_id)

537

await update.message.reply_text(f"Your stats: {user_stats}")

538

```

539

540

### Performance Optimization

541

542

Optimize bot performance for high-load scenarios.

543

544

```python

545

# Concurrent update processing

546

application = (

547

ApplicationBuilder()

548

.token("YOUR_BOT_TOKEN")

549

.concurrent_updates(True) # Enable concurrent processing

550

.build()

551

)

552

553

# Custom update processor with higher concurrency

554

from telegram.ext import SimpleUpdateProcessor

555

556

update_processor = SimpleUpdateProcessor(max_concurrent_updates=512)

557

application = (

558

ApplicationBuilder()

559

.token("YOUR_BOT_TOKEN")

560

.update_processor(update_processor)

561

.build()

562

)

563

564

# Connection pooling

565

from telegram.request import HTTPXRequest

566

567

request = HTTPXRequest(connection_pool_size=20)

568

application = (

569

ApplicationBuilder()

570

.token("YOUR_BOT_TOKEN")

571

.request(request)

572

.build()

573

)

574

575

# Callback data caching for inline keyboards

576

from telegram.ext import CallbackDataCache

577

578

callback_data_cache = CallbackDataCache(maxsize=1024, ttl=3600)

579

application = (

580

ApplicationBuilder()

581

.token("YOUR_BOT_TOKEN")

582

.arbitrary_callback_data(True)

583

.build()

584

)

585

```

586

587

## Types

588

589

```python { .api }

590

from typing import Any, Callable, Dict, Optional, Union

591

from datetime import datetime, timedelta, time

592

593

JobCallback = Callable[[CallbackContext], object]

594

ErrorCallback = Callable[[Update, CallbackContext], None]

595

596

class PersistenceInput:

597

bot_data: bool

598

chat_data: bool

599

user_data: bool

600

callback_data: bool

601

```