A pure Python, asynchronous interface for the Telegram Bot API with comprehensive wrapper and high-level framework for building sophisticated Telegram bots
—
Additional capabilities including job queues for scheduled tasks, persistence for data storage, rate limiting, conversation management, and webhook configuration.
Schedule and manage recurring or one-time tasks.
class JobQueue:
def __init__(self, scheduler=None): ...
def run_once(
self,
callback: callable,
when: datetime.datetime | datetime.timedelta | int | float,
data: object = None,
name: str = None,
chat_id: int | str = None,
user_id: int = None,
job_kwargs: dict = None
) -> Job: ...
def run_repeating(
self,
callback: callable,
interval: datetime.timedelta | int | float,
first: datetime.datetime | datetime.timedelta | int | float = None,
last: datetime.datetime | datetime.timedelta | int | float = None,
data: object = None,
name: str = None,
chat_id: int | str = None,
user_id: int = None,
job_kwargs: dict = None
) -> Job: ...
def run_daily(
self,
callback: callable,
time: datetime.time,
days: tuple[int, ...] = tuple(range(7)),
data: object = None,
name: str = None,
chat_id: int | str = None,
user_id: int = None,
job_kwargs: dict = None
) -> Job: ...
def run_monthly(
self,
callback: callable,
when: datetime.time,
day: int,
data: object = None,
name: str = None,
chat_id: int | str = None,
user_id: int = None,
job_kwargs: dict = None
) -> Job: ...
def run_custom(
self,
callback: callable,
job_kwargs: dict,
data: object = None,
name: str = None,
chat_id: int | str = None,
user_id: int = None
) -> Job: ...
def get_jobs_by_name(self, name: str) -> tuple[Job, ...]: ...
def jobs(self) -> tuple[Job, ...]: ...
async def start(self) -> None: ...
async def stop(self) -> None: ...
class Job:
callback: callable
data: object
name: str | None
chat_id: int | str | None
user_id: int | None
def schedule_removal(self) -> None: ...
def remove(self) -> None: ...
@property
def removed(self) -> bool: ...
@property
def enabled(self) -> bool: ...
@enabled.setter
def enabled(self, status: bool) -> None: ...
@property
def next_t(self) -> datetime.datetime | None: ...Usage examples:
import datetime
from telegram.ext import ContextTypes
async def send_reminder(context: ContextTypes.DEFAULT_TYPE):
"""Send a reminder message."""
job = context.job
await context.bot.send_message(
chat_id=job.chat_id,
text=f"Reminder: {job.data}"
)
async def daily_backup(context: ContextTypes.DEFAULT_TYPE):
"""Perform daily backup."""
# Backup logic here
print("Performing daily backup...")
# Schedule one-time job (in 10 seconds)
job_queue.run_once(
send_reminder,
when=10,
data="Meeting in conference room",
chat_id=chat_id,
name="meeting_reminder"
)
# Schedule repeating job (every 30 seconds)
job_queue.run_repeating(
send_reminder,
interval=30,
first=5, # Start after 5 seconds
data="Regular check-in",
chat_id=chat_id
)
# Schedule daily job (every day at 9:00 AM)
job_queue.run_daily(
daily_backup,
time=datetime.time(9, 0, 0),
name="daily_backup"
)
# Schedule weekly job (Mondays and Fridays at 2:30 PM)
job_queue.run_daily(
send_reminder,
time=datetime.time(14, 30, 0),
days=(0, 4), # Monday=0, Friday=4
data="Weekly team meeting",
chat_id=chat_id
)
# Schedule monthly job (15th of each month at noon)
job_queue.run_monthly(
send_reminder,
when=datetime.time(12, 0, 0),
day=15,
data="Monthly report due",
chat_id=chat_id
)
# Remove jobs by name
jobs = job_queue.get_jobs_by_name("meeting_reminder")
for job in jobs:
job.remove()
# Disable/enable jobs
job.enabled = False # Pause job
job.enabled = True # Resume jobStore and retrieve bot data across restarts.
class BasePersistence:
def __init__(self, store_data: PersistenceInput = None, update_interval: float = 60): ...
async def get_bot_data(self) -> dict: ...
async def update_bot_data(self, data: dict) -> None: ...
async def refresh_bot_data(self, bot_data: dict) -> None: ...
async def get_chat_data(self) -> dict[int, dict]: ...
async def update_chat_data(self, chat_id: int, data: dict) -> None: ...
async def refresh_chat_data(self, chat_id: int, chat_data: dict) -> None: ...
async def drop_chat_data(self, chat_id: int) -> None: ...
async def get_user_data(self) -> dict[int, dict]: ...
async def update_user_data(self, user_id: int, data: dict) -> None: ...
async def refresh_user_data(self, user_id: int, user_data: dict) -> None: ...
async def drop_user_data(self, user_id: int) -> None: ...
async def get_callback_data(self) -> tuple[list[tuple], dict]: ...
async def update_callback_data(self, data: tuple[list[tuple], dict]) -> None: ...
async def get_conversations(self, name: str) -> dict: ...
async def update_conversation(self, name: str, key: tuple, new_state: object) -> None: ...
async def flush(self) -> None: ...
class PicklePersistence(BasePersistence):
def __init__(
self,
filepath: str,
store_data: PersistenceInput = None,
single_file: bool = True,
on_flush: bool = False,
update_interval: float = 60
): ...
class DictPersistence(BasePersistence):
def __init__(
self,
bot_data: dict = None,
chat_data: dict = None,
user_data: dict = None,
callback_data: tuple = None,
conversations: dict = None,
store_data: PersistenceInput = None,
update_interval: float = 60
): ...
class PersistenceInput:
def __init__(
self,
bot_data: bool = True,
chat_data: bool = True,
user_data: bool = True,
callback_data: bool = True
): ...Usage examples:
from telegram.ext import PicklePersistence, DictPersistence, PersistenceInput
# Pickle persistence (file-based)
persistence = PicklePersistence(filepath="bot_data.pkl")
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.persistence(persistence)
.build()
)
# Dictionary persistence (memory-based)
persistence = DictPersistence()
# Custom persistence configuration
persistence_input = PersistenceInput(
bot_data=True,
chat_data=True,
user_data=True,
callback_data=False # Don't store callback data
)
persistence = PicklePersistence(
filepath="bot_data.pkl",
store_data=persistence_input
)
# Using persistence in handlers
async def set_user_setting(update, context):
user_data = context.user_data
user_data['notifications'] = True
user_data['timezone'] = 'UTC'
await update.message.reply_text("Settings saved!")
async def get_user_setting(update, context):
user_data = context.user_data
notifications = user_data.get('notifications', False)
timezone = user_data.get('timezone', 'UTC')
await update.message.reply_text(
f"Notifications: {notifications}, Timezone: {timezone}"
)
# Bot-wide data
async def update_stats(update, context):
bot_data = context.bot_data
bot_data.setdefault('message_count', 0)
bot_data['message_count'] += 1
if bot_data['message_count'] % 1000 == 0:
await update.message.reply_text(
f"Bot has processed {bot_data['message_count']} messages!"
)
# Chat-specific data
async def set_chat_language(update, context):
chat_data = context.chat_data
args = context.args
if args:
chat_data['language'] = args[0]
await update.message.reply_text(f"Language set to: {args[0]}")
else:
current_lang = chat_data.get('language', 'en')
await update.message.reply_text(f"Current language: {current_lang}")Control request rates to avoid hitting Telegram's limits.
class BaseRateLimiter:
async def initialize(self) -> None: ...
async def shutdown(self) -> None: ...
async def process_request(
self,
callback: callable,
args: tuple,
kwargs: dict,
endpoint: str,
data: dict,
rate_limit_args: tuple
) -> object: ...
class AIORateLimiter(BaseRateLimiter):
def __init__(
self,
max_retries: int = 0,
on_retry: callable = None,
on_failure: callable = None
): ...Usage example:
from telegram.ext import AIORateLimiter
# Create rate limiter
rate_limiter = AIORateLimiter(
max_retries=3,
on_retry=lambda retry_state: print(f"Retrying request {retry_state}"),
on_failure=lambda failure_state: print(f"Request failed: {failure_state}")
)
# Use in application
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.rate_limiter(rate_limiter)
.build()
)Advanced conversation handling beyond basic ConversationHandler.
# Nested conversations
from telegram.ext import ConversationHandler
# States for main conversation
MAIN_MENU, SETTINGS, PROFILE = range(3)
# States for settings sub-conversation
NOTIFICATIONS, LANGUAGE, THEME = range(3, 6)
# Sub-conversation for settings
settings_conv = ConversationHandler(
entry_points=[MessageHandler(filters.Regex('^Settings$'), enter_settings)],
states={
NOTIFICATIONS: [MessageHandler(filters.TEXT, handle_notifications)],
LANGUAGE: [MessageHandler(filters.TEXT, handle_language)],
THEME: [MessageHandler(filters.TEXT, handle_theme)],
},
fallbacks=[MessageHandler(filters.Regex('^Back$'), back_to_main)],
map_to_parent={
# Map states back to parent conversation
MAIN_MENU: MAIN_MENU,
ConversationHandler.END: MAIN_MENU,
}
)
# Main conversation with nested sub-conversation
main_conv = ConversationHandler(
entry_points=[CommandHandler('start', start)],
states={
MAIN_MENU: [
settings_conv, # Nested conversation
MessageHandler(filters.Regex('^Profile$'), enter_profile),
],
PROFILE: [MessageHandler(filters.TEXT, handle_profile)],
},
fallbacks=[CommandHandler('cancel', cancel)],
)
# Per-message conversations
per_message_conv = ConversationHandler(
entry_points=[CommandHandler('quiz', start_quiz)],
states={
1: [MessageHandler(filters.TEXT, question_1)],
2: [MessageHandler(filters.TEXT, question_2)],
},
fallbacks=[CommandHandler('cancel', cancel_quiz)],
per_message=True, # Separate conversation per message
)Configure webhook for production deployment.
# Webhook with SSL certificate
application.run_webhook(
listen="0.0.0.0",
port=8443,
url_path="webhook",
key="private.key",
cert="cert.pem",
webhook_url="https://yourdomain.com:8443/webhook",
drop_pending_updates=True
)
# Webhook behind reverse proxy
application.run_webhook(
listen="127.0.0.1",
port=5000,
url_path="telegram-webhook",
webhook_url="https://yourdomain.com/telegram-webhook",
secret_token="your_secret_token"
)
# Custom webhook handling
from telegram import Update
import json
async def webhook_handler(request):
"""Custom webhook handler."""
body = await request.body()
update_dict = json.loads(body)
update = Update.de_json(update_dict, application.bot)
await application.process_update(update)
return {"status": "ok"}Comprehensive error handling and logging setup.
import logging
from telegram.error import TelegramError, BadRequest, Forbidden, NetworkError
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Error handler
async def error_handler(update, context):
"""Handle errors in the application."""
logger.error('Update "%s" caused error "%s"', update, context.error)
if isinstance(context.error, BadRequest):
# Handle bad requests (invalid parameters, etc.)
if update and update.message:
await update.message.reply_text("Sorry, there was an error with your request.")
elif isinstance(context.error, Forbidden):
# Handle forbidden errors (blocked by user, insufficient permissions)
logger.warning("Bot was blocked by user or lacks permissions")
elif isinstance(context.error, NetworkError):
# Handle network errors (connection issues, timeouts)
logger.warning("Network error occurred, retrying...")
else:
# Handle other Telegram errors
logger.error("Unexpected error: %s", context.error)
application.add_error_handler(error_handler)
# Graceful shutdown
import signal
import asyncio
def signal_handler(signum, frame):
"""Handle shutdown signals."""
logger.info("Received signal %s, shutting down gracefully...", signum)
asyncio.create_task(application.shutdown())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)Create custom context classes for additional functionality.
from telegram.ext import CallbackContext, ContextTypes
class CustomContext(CallbackContext):
"""Custom context with additional features."""
def __init__(self, application, chat_id=None, user_id=None):
super().__init__(application, chat_id, user_id)
self._database = None
@property
def database(self):
"""Lazy database connection."""
if self._database is None:
self._database = get_database_connection()
return self._database
async def log_action(self, action: str):
"""Log user action to database."""
await self.database.log_user_action(
user_id=self.user_id,
chat_id=self.chat_id,
action=action
)
# Configure custom context
context_types = ContextTypes(context=CustomContext)
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.context_types(context_types)
.build()
)
# Use custom context in handlers
async def custom_handler(update: Update, context: CustomContext):
await context.log_action("button_pressed")
# Access custom database property
user_stats = await context.database.get_user_stats(context.user_id)
await update.message.reply_text(f"Your stats: {user_stats}")Optimize bot performance for high-load scenarios.
# Concurrent update processing
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.concurrent_updates(True) # Enable concurrent processing
.build()
)
# Custom update processor with higher concurrency
from telegram.ext import SimpleUpdateProcessor
update_processor = SimpleUpdateProcessor(max_concurrent_updates=512)
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.update_processor(update_processor)
.build()
)
# Connection pooling
from telegram.request import HTTPXRequest
request = HTTPXRequest(connection_pool_size=20)
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.request(request)
.build()
)
# Callback data caching for inline keyboards
from telegram.ext import CallbackDataCache
callback_data_cache = CallbackDataCache(maxsize=1024, ttl=3600)
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.arbitrary_callback_data(True)
.build()
)from typing import Any, Callable, Dict, Optional, Union
from datetime import datetime, timedelta, time
JobCallback = Callable[[CallbackContext], object]
ErrorCallback = Callable[[Update, CallbackContext], None]
class PersistenceInput:
bot_data: bool
chat_data: bool
user_data: bool
callback_data: boolInstall with Tessl CLI
npx tessl i tessl/pypi-python-telegram-bot