0
# Advanced Features
1
2
Additional capabilities including job queues for scheduled tasks, persistence for data storage, rate limiting, conversation management, and webhook configuration.
3
4
## Capabilities
5
6
### Job Queue
7
8
Schedule and manage recurring or one-time tasks.
9
10
```python { .api }
11
class JobQueue:
12
def __init__(self, scheduler=None): ...
13
14
def run_once(
15
self,
16
callback: callable,
17
when: datetime.datetime | datetime.timedelta | int | float,
18
data: object = None,
19
name: str = None,
20
chat_id: int | str = None,
21
user_id: int = None,
22
job_kwargs: dict = None
23
) -> Job: ...
24
25
def run_repeating(
26
self,
27
callback: callable,
28
interval: datetime.timedelta | int | float,
29
first: datetime.datetime | datetime.timedelta | int | float = None,
30
last: datetime.datetime | datetime.timedelta | int | float = None,
31
data: object = None,
32
name: str = None,
33
chat_id: int | str = None,
34
user_id: int = None,
35
job_kwargs: dict = None
36
) -> Job: ...
37
38
def run_daily(
39
self,
40
callback: callable,
41
time: datetime.time,
42
days: tuple[int, ...] = tuple(range(7)),
43
data: object = None,
44
name: str = None,
45
chat_id: int | str = None,
46
user_id: int = None,
47
job_kwargs: dict = None
48
) -> Job: ...
49
50
def run_monthly(
51
self,
52
callback: callable,
53
when: datetime.time,
54
day: int,
55
data: object = None,
56
name: str = None,
57
chat_id: int | str = None,
58
user_id: int = None,
59
job_kwargs: dict = None
60
) -> Job: ...
61
62
def run_custom(
63
self,
64
callback: callable,
65
job_kwargs: dict,
66
data: object = None,
67
name: str = None,
68
chat_id: int | str = None,
69
user_id: int = None
70
) -> Job: ...
71
72
def get_jobs_by_name(self, name: str) -> tuple[Job, ...]: ...
73
def jobs(self) -> tuple[Job, ...]: ...
74
75
async def start(self) -> None: ...
76
async def stop(self) -> None: ...
77
78
class Job:
79
callback: callable
80
data: object
81
name: str | None
82
chat_id: int | str | None
83
user_id: int | None
84
85
def schedule_removal(self) -> None: ...
86
def remove(self) -> None: ...
87
88
@property
89
def removed(self) -> bool: ...
90
@property
91
def enabled(self) -> bool: ...
92
@enabled.setter
93
def enabled(self, status: bool) -> None: ...
94
95
@property
96
def next_t(self) -> datetime.datetime | None: ...
97
```
98
99
Usage examples:
100
101
```python
102
import datetime
103
from telegram.ext import ContextTypes
104
105
async def send_reminder(context: ContextTypes.DEFAULT_TYPE):
106
"""Send a reminder message."""
107
job = context.job
108
await context.bot.send_message(
109
chat_id=job.chat_id,
110
text=f"Reminder: {job.data}"
111
)
112
113
async def daily_backup(context: ContextTypes.DEFAULT_TYPE):
114
"""Perform daily backup."""
115
# Backup logic here
116
print("Performing daily backup...")
117
118
# Schedule one-time job (in 10 seconds)
119
job_queue.run_once(
120
send_reminder,
121
when=10,
122
data="Meeting in conference room",
123
chat_id=chat_id,
124
name="meeting_reminder"
125
)
126
127
# Schedule repeating job (every 30 seconds)
128
job_queue.run_repeating(
129
send_reminder,
130
interval=30,
131
first=5, # Start after 5 seconds
132
data="Regular check-in",
133
chat_id=chat_id
134
)
135
136
# Schedule daily job (every day at 9:00 AM)
137
job_queue.run_daily(
138
daily_backup,
139
time=datetime.time(9, 0, 0),
140
name="daily_backup"
141
)
142
143
# Schedule weekly job (Mondays and Fridays at 2:30 PM)
144
job_queue.run_daily(
145
send_reminder,
146
time=datetime.time(14, 30, 0),
147
days=(0, 4), # Monday=0, Friday=4
148
data="Weekly team meeting",
149
chat_id=chat_id
150
)
151
152
# Schedule monthly job (15th of each month at noon)
153
job_queue.run_monthly(
154
send_reminder,
155
when=datetime.time(12, 0, 0),
156
day=15,
157
data="Monthly report due",
158
chat_id=chat_id
159
)
160
161
# Remove jobs by name
162
jobs = job_queue.get_jobs_by_name("meeting_reminder")
163
for job in jobs:
164
job.remove()
165
166
# Disable/enable jobs
167
job.enabled = False # Pause job
168
job.enabled = True # Resume job
169
```
170
171
### Persistence
172
173
Store and retrieve bot data across restarts.
174
175
```python { .api }
176
class BasePersistence:
177
def __init__(self, store_data: PersistenceInput = None, update_interval: float = 60): ...
178
179
async def get_bot_data(self) -> dict: ...
180
async def update_bot_data(self, data: dict) -> None: ...
181
async def refresh_bot_data(self, bot_data: dict) -> None: ...
182
183
async def get_chat_data(self) -> dict[int, dict]: ...
184
async def update_chat_data(self, chat_id: int, data: dict) -> None: ...
185
async def refresh_chat_data(self, chat_id: int, chat_data: dict) -> None: ...
186
async def drop_chat_data(self, chat_id: int) -> None: ...
187
188
async def get_user_data(self) -> dict[int, dict]: ...
189
async def update_user_data(self, user_id: int, data: dict) -> None: ...
190
async def refresh_user_data(self, user_id: int, user_data: dict) -> None: ...
191
async def drop_user_data(self, user_id: int) -> None: ...
192
193
async def get_callback_data(self) -> tuple[list[tuple], dict]: ...
194
async def update_callback_data(self, data: tuple[list[tuple], dict]) -> None: ...
195
196
async def get_conversations(self, name: str) -> dict: ...
197
async def update_conversation(self, name: str, key: tuple, new_state: object) -> None: ...
198
199
async def flush(self) -> None: ...
200
201
class PicklePersistence(BasePersistence):
202
def __init__(
203
self,
204
filepath: str,
205
store_data: PersistenceInput = None,
206
single_file: bool = True,
207
on_flush: bool = False,
208
update_interval: float = 60
209
): ...
210
211
class DictPersistence(BasePersistence):
212
def __init__(
213
self,
214
bot_data: dict = None,
215
chat_data: dict = None,
216
user_data: dict = None,
217
callback_data: tuple = None,
218
conversations: dict = None,
219
store_data: PersistenceInput = None,
220
update_interval: float = 60
221
): ...
222
223
class PersistenceInput:
224
def __init__(
225
self,
226
bot_data: bool = True,
227
chat_data: bool = True,
228
user_data: bool = True,
229
callback_data: bool = True
230
): ...
231
```
232
233
Usage examples:
234
235
```python
236
from telegram.ext import PicklePersistence, DictPersistence, PersistenceInput
237
238
# Pickle persistence (file-based)
239
persistence = PicklePersistence(filepath="bot_data.pkl")
240
241
application = (
242
ApplicationBuilder()
243
.token("YOUR_BOT_TOKEN")
244
.persistence(persistence)
245
.build()
246
)
247
248
# Dictionary persistence (memory-based)
249
persistence = DictPersistence()
250
251
# Custom persistence configuration
252
persistence_input = PersistenceInput(
253
bot_data=True,
254
chat_data=True,
255
user_data=True,
256
callback_data=False # Don't store callback data
257
)
258
persistence = PicklePersistence(
259
filepath="bot_data.pkl",
260
store_data=persistence_input
261
)
262
263
# Using persistence in handlers
264
async def set_user_setting(update, context):
265
user_data = context.user_data
266
user_data['notifications'] = True
267
user_data['timezone'] = 'UTC'
268
await update.message.reply_text("Settings saved!")
269
270
async def get_user_setting(update, context):
271
user_data = context.user_data
272
notifications = user_data.get('notifications', False)
273
timezone = user_data.get('timezone', 'UTC')
274
await update.message.reply_text(
275
f"Notifications: {notifications}, Timezone: {timezone}"
276
)
277
278
# Bot-wide data
279
async def update_stats(update, context):
280
bot_data = context.bot_data
281
bot_data.setdefault('message_count', 0)
282
bot_data['message_count'] += 1
283
284
if bot_data['message_count'] % 1000 == 0:
285
await update.message.reply_text(
286
f"Bot has processed {bot_data['message_count']} messages!"
287
)
288
289
# Chat-specific data
290
async def set_chat_language(update, context):
291
chat_data = context.chat_data
292
args = context.args
293
if args:
294
chat_data['language'] = args[0]
295
await update.message.reply_text(f"Language set to: {args[0]}")
296
else:
297
current_lang = chat_data.get('language', 'en')
298
await update.message.reply_text(f"Current language: {current_lang}")
299
```
300
301
### Rate Limiting
302
303
Control request rates to avoid hitting Telegram's limits.
304
305
```python { .api }
306
class BaseRateLimiter:
307
async def initialize(self) -> None: ...
308
async def shutdown(self) -> None: ...
309
async def process_request(
310
self,
311
callback: callable,
312
args: tuple,
313
kwargs: dict,
314
endpoint: str,
315
data: dict,
316
rate_limit_args: tuple
317
) -> object: ...
318
319
class AIORateLimiter(BaseRateLimiter):
320
def __init__(
321
self,
322
max_retries: int = 0,
323
on_retry: callable = None,
324
on_failure: callable = None
325
): ...
326
```
327
328
Usage example:
329
330
```python
331
from telegram.ext import AIORateLimiter
332
333
# Create rate limiter
334
rate_limiter = AIORateLimiter(
335
max_retries=3,
336
on_retry=lambda retry_state: print(f"Retrying request {retry_state}"),
337
on_failure=lambda failure_state: print(f"Request failed: {failure_state}")
338
)
339
340
# Use in application
341
application = (
342
ApplicationBuilder()
343
.token("YOUR_BOT_TOKEN")
344
.rate_limiter(rate_limiter)
345
.build()
346
)
347
```
348
349
### Conversation Management
350
351
Advanced conversation handling beyond basic ConversationHandler.
352
353
```python
354
# Nested conversations
355
from telegram.ext import ConversationHandler
356
357
# States for main conversation
358
MAIN_MENU, SETTINGS, PROFILE = range(3)
359
360
# States for settings sub-conversation
361
NOTIFICATIONS, LANGUAGE, THEME = range(3, 6)
362
363
# Sub-conversation for settings
364
settings_conv = ConversationHandler(
365
entry_points=[MessageHandler(filters.Regex('^Settings$'), enter_settings)],
366
states={
367
NOTIFICATIONS: [MessageHandler(filters.TEXT, handle_notifications)],
368
LANGUAGE: [MessageHandler(filters.TEXT, handle_language)],
369
THEME: [MessageHandler(filters.TEXT, handle_theme)],
370
},
371
fallbacks=[MessageHandler(filters.Regex('^Back$'), back_to_main)],
372
map_to_parent={
373
# Map states back to parent conversation
374
MAIN_MENU: MAIN_MENU,
375
ConversationHandler.END: MAIN_MENU,
376
}
377
)
378
379
# Main conversation with nested sub-conversation
380
main_conv = ConversationHandler(
381
entry_points=[CommandHandler('start', start)],
382
states={
383
MAIN_MENU: [
384
settings_conv, # Nested conversation
385
MessageHandler(filters.Regex('^Profile$'), enter_profile),
386
],
387
PROFILE: [MessageHandler(filters.TEXT, handle_profile)],
388
},
389
fallbacks=[CommandHandler('cancel', cancel)],
390
)
391
392
# Per-message conversations
393
per_message_conv = ConversationHandler(
394
entry_points=[CommandHandler('quiz', start_quiz)],
395
states={
396
1: [MessageHandler(filters.TEXT, question_1)],
397
2: [MessageHandler(filters.TEXT, question_2)],
398
},
399
fallbacks=[CommandHandler('cancel', cancel_quiz)],
400
per_message=True, # Separate conversation per message
401
)
402
```
403
404
### Webhook Configuration
405
406
Configure webhook for production deployment.
407
408
```python
409
# Webhook with SSL certificate
410
application.run_webhook(
411
listen="0.0.0.0",
412
port=8443,
413
url_path="webhook",
414
key="private.key",
415
cert="cert.pem",
416
webhook_url="https://yourdomain.com:8443/webhook",
417
drop_pending_updates=True
418
)
419
420
# Webhook behind reverse proxy
421
application.run_webhook(
422
listen="127.0.0.1",
423
port=5000,
424
url_path="telegram-webhook",
425
webhook_url="https://yourdomain.com/telegram-webhook",
426
secret_token="your_secret_token"
427
)
428
429
# Custom webhook handling
430
from telegram import Update
431
import json
432
433
async def webhook_handler(request):
434
"""Custom webhook handler."""
435
body = await request.body()
436
update_dict = json.loads(body)
437
update = Update.de_json(update_dict, application.bot)
438
await application.process_update(update)
439
return {"status": "ok"}
440
```
441
442
### Error Handling and Logging
443
444
Comprehensive error handling and logging setup.
445
446
```python
447
import logging
448
from telegram.error import TelegramError, BadRequest, Forbidden, NetworkError
449
450
# Configure logging
451
logging.basicConfig(
452
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
453
level=logging.INFO
454
)
455
logger = logging.getLogger(__name__)
456
457
# Error handler
458
async def error_handler(update, context):
459
"""Handle errors in the application."""
460
logger.error('Update "%s" caused error "%s"', update, context.error)
461
462
if isinstance(context.error, BadRequest):
463
# Handle bad requests (invalid parameters, etc.)
464
if update and update.message:
465
await update.message.reply_text("Sorry, there was an error with your request.")
466
467
elif isinstance(context.error, Forbidden):
468
# Handle forbidden errors (blocked by user, insufficient permissions)
469
logger.warning("Bot was blocked by user or lacks permissions")
470
471
elif isinstance(context.error, NetworkError):
472
# Handle network errors (connection issues, timeouts)
473
logger.warning("Network error occurred, retrying...")
474
475
else:
476
# Handle other Telegram errors
477
logger.error("Unexpected error: %s", context.error)
478
479
application.add_error_handler(error_handler)
480
481
# Graceful shutdown
482
import signal
483
import asyncio
484
485
def signal_handler(signum, frame):
486
"""Handle shutdown signals."""
487
logger.info("Received signal %s, shutting down gracefully...", signum)
488
asyncio.create_task(application.shutdown())
489
490
signal.signal(signal.SIGINT, signal_handler)
491
signal.signal(signal.SIGTERM, signal_handler)
492
```
493
494
### Custom Context Types
495
496
Create custom context classes for additional functionality.
497
498
```python
499
from telegram.ext import CallbackContext, ContextTypes
500
501
class CustomContext(CallbackContext):
502
"""Custom context with additional features."""
503
504
def __init__(self, application, chat_id=None, user_id=None):
505
super().__init__(application, chat_id, user_id)
506
self._database = None
507
508
@property
509
def database(self):
510
"""Lazy database connection."""
511
if self._database is None:
512
self._database = get_database_connection()
513
return self._database
514
515
async def log_action(self, action: str):
516
"""Log user action to database."""
517
await self.database.log_user_action(
518
user_id=self.user_id,
519
chat_id=self.chat_id,
520
action=action
521
)
522
523
# Configure custom context
524
context_types = ContextTypes(context=CustomContext)
525
application = (
526
ApplicationBuilder()
527
.token("YOUR_BOT_TOKEN")
528
.context_types(context_types)
529
.build()
530
)
531
532
# Use custom context in handlers
533
async def custom_handler(update: Update, context: CustomContext):
534
await context.log_action("button_pressed")
535
# Access custom database property
536
user_stats = await context.database.get_user_stats(context.user_id)
537
await update.message.reply_text(f"Your stats: {user_stats}")
538
```
539
540
### Performance Optimization
541
542
Optimize bot performance for high-load scenarios.
543
544
```python
545
# Concurrent update processing
546
application = (
547
ApplicationBuilder()
548
.token("YOUR_BOT_TOKEN")
549
.concurrent_updates(True) # Enable concurrent processing
550
.build()
551
)
552
553
# Custom update processor with higher concurrency
554
from telegram.ext import SimpleUpdateProcessor
555
556
update_processor = SimpleUpdateProcessor(max_concurrent_updates=512)
557
application = (
558
ApplicationBuilder()
559
.token("YOUR_BOT_TOKEN")
560
.update_processor(update_processor)
561
.build()
562
)
563
564
# Connection pooling
565
from telegram.request import HTTPXRequest
566
567
request = HTTPXRequest(connection_pool_size=20)
568
application = (
569
ApplicationBuilder()
570
.token("YOUR_BOT_TOKEN")
571
.request(request)
572
.build()
573
)
574
575
# Callback data caching for inline keyboards
576
from telegram.ext import CallbackDataCache
577
578
callback_data_cache = CallbackDataCache(maxsize=1024, ttl=3600)
579
application = (
580
ApplicationBuilder()
581
.token("YOUR_BOT_TOKEN")
582
.arbitrary_callback_data(True)
583
.build()
584
)
585
```
586
587
## Types
588
589
```python { .api }
590
from typing import Any, Callable, Dict, Optional, Union
591
from datetime import datetime, timedelta, time
592
593
JobCallback = Callable[[CallbackContext], object]
594
ErrorCallback = Callable[[Update, CallbackContext], None]
595
596
class PersistenceInput:
597
bot_data: bool
598
chat_data: bool
599
user_data: bool
600
callback_data: bool
601
```