A pure Python, asynchronous interface for the Telegram Bot API with comprehensive wrapper and high-level framework for building sophisticated Telegram bots
—
The Application framework provides a high-level interface for building and managing Telegram bots. It handles update processing, handler management, job scheduling, persistence, and bot lifecycle operations.
Create and configure Application instances with fluent builder pattern.
class ApplicationBuilder:
def token(self, token: str) -> 'ApplicationBuilder': ...
def bot(self, bot: Bot) -> 'ApplicationBuilder': ...
def updater(self, updater: Updater) -> 'ApplicationBuilder': ...
def update_processor(self, update_processor: BaseUpdateProcessor) -> 'ApplicationBuilder': ...
def job_queue(self, job_queue: JobQueue) -> 'ApplicationBuilder': ...
def persistence(self, persistence: BasePersistence) -> 'ApplicationBuilder': ...
def defaults(self, defaults: Defaults) -> 'ApplicationBuilder': ...
def context_types(self, context_types: ContextTypes) -> 'ApplicationBuilder': ...
def rate_limiter(self, rate_limiter: BaseRateLimiter) -> 'ApplicationBuilder': ...
def post_init(self, post_init: callable) -> 'ApplicationBuilder': ...
def post_shutdown(self, post_shutdown: callable) -> 'ApplicationBuilder': ...
def build(self) -> 'Application': ...Usage example:
from telegram.ext import ApplicationBuilder, PicklePersistence
# Basic application
application = ApplicationBuilder().token("YOUR_BOT_TOKEN").build()
# Advanced configuration
persistence = PicklePersistence(filepath="bot_data.pkl")
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.persistence(persistence)
.concurrent_updates(True)
.build()
)Main Application class for managing bot operations, handlers, and lifecycle.
class Application:
bot: Bot
updater: Updater
job_queue: JobQueue
persistence: BasePersistence | None
def add_handler(self, handler: BaseHandler, group: int = 0) -> None: ...
def add_handlers(self, handlers: list[BaseHandler], group: int = 0) -> None: ...
def remove_handler(self, handler: BaseHandler, group: int = 0) -> bool: ...
def add_error_handler(self, callback: callable) -> None: ...
def remove_error_handler(self, callback: callable) -> None: ...
async def process_update(self, update: Update) -> None: ...
def run_polling(
self,
poll_interval: float = 0.0,
timeout: int = 10,
bootstrap_retries: int = -1,
read_timeout: float = 2.0,
write_timeout: float = 2.0,
connect_timeout: float = 2.0,
pool_timeout: float = 1.0,
allowed_updates: list[str] = None,
drop_pending_updates: bool = None,
close_loop: bool = True
) -> None: ...
def run_webhook(
self,
listen: str = "127.0.0.1",
port: int = 80,
url_path: str = "",
cert: str = None,
key: str = None,
bootstrap_retries: int = -1,
webhook_url: str = None,
allowed_updates: list[str] = None,
drop_pending_updates: bool = None,
ip_address: str = None,
max_connections: int = 40,
secret_token: str = None,
close_loop: bool = True
) -> None: ...
async def initialize(self) -> None: ...
async def start(self) -> None: ...
async def stop(self) -> None: ...
async def shutdown(self) -> None: ...
async def __aenter__(self) -> 'Application': ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...Usage example:
from telegram.ext import Application, CommandHandler
async def start(update, context):
await update.message.reply_text("Hello!")
# Create and run application
application = Application.builder().token("YOUR_BOT_TOKEN").build()
application.add_handler(CommandHandler("start", start))
# Run with polling
application.run_polling()
# Or use as async context manager
async def main():
async with application:
await application.start()
await application.updater.start_polling()
# Keep running...
await application.updater.stop()
await application.stop()Enhanced Bot class with additional features and integration with Application framework.
class ExtBot(Bot):
def __init__(
self,
token: str,
defaults: Defaults = None,
callback_data_cache: CallbackDataCache = None,
arbitrary_callback_data: bool = False,
**kwargs
): ...
async def get_business_connection(self, business_connection_id: str) -> BusinessConnection: ...
async def send_paid_media(
self,
chat_id: int | str,
star_count: int,
media: list[InputPaidMedia],
**kwargs
) -> Message: ...
async def copy_messages(
self,
chat_id: int | str,
from_chat_id: int | str,
message_ids: list[int],
**kwargs
) -> list[MessageId]: ...
async def forward_messages(
self,
chat_id: int | str,
from_chat_id: int | str,
message_ids: list[int],
**kwargs
) -> list[MessageId]: ...Manage incoming updates from Telegram using polling or webhooks.
class Updater:
bot: Bot
update_queue: asyncio.Queue
def __init__(self, bot: Bot, update_queue: asyncio.Queue): ...
async def start_polling(
self,
poll_interval: float = 0.0,
timeout: int = 10,
bootstrap_retries: int = -1,
read_timeout: float = 2.0,
write_timeout: float = 2.0,
connect_timeout: float = 2.0,
pool_timeout: float = 1.0,
allowed_updates: list[str] = None,
drop_pending_updates: bool = None
) -> asyncio.Queue: ...
async def start_webhook(
self,
listen: str = "127.0.0.1",
port: int = 80,
url_path: str = "",
cert: str = None,
key: str = None,
bootstrap_retries: int = -1,
webhook_url: str = None,
allowed_updates: list[str] = None,
drop_pending_updates: bool = None,
ip_address: str = None,
max_connections: int = 40,
secret_token: str = None
) -> asyncio.Queue: ...
async def stop(self) -> None: ...
async def shutdown(self) -> None: ...
class BaseUpdateProcessor:
async def process_update(self, update: Update, application: 'Application') -> None: ...
class SimpleUpdateProcessor(BaseUpdateProcessor):
def __init__(self, max_concurrent_updates: int = 256): ...Configure custom context types for handlers and callbacks.
class ContextTypes:
def __init__(
self,
context: type = None,
bot_data: type = None,
chat_data: type = None,
user_data: type = None
): ...
context: type
bot_data: type
chat_data: type
user_data: typeSet default values for Bot methods and Application behavior.
class Defaults:
def __init__(
self,
parse_mode: str = None,
disable_notification: bool = None,
disable_web_page_preview: bool = None,
allow_sending_without_reply: bool = None,
protect_content: bool = None,
tzinfo: datetime.tzinfo = None,
block: bool = None,
quote: bool = None
): ...Usage example:
from telegram.ext import Defaults
from telegram.constants import ParseMode
defaults = Defaults(
parse_mode=ParseMode.HTML,
disable_notification=True,
tzinfo=datetime.timezone.utc
)
application = (
ApplicationBuilder()
.token("YOUR_BOT_TOKEN")
.defaults(defaults)
.build()
)Built-in exception handling with custom error handlers.
class ApplicationHandlerStop(Exception):
def __init__(self, state: object = None): ...
state: objectUsage example:
from telegram.error import TelegramError
from telegram.ext import ApplicationHandlerStop
async def error_handler(update, context):
"""Log errors and handle them gracefully."""
print(f"Update {update} caused error {context.error}")
# Stop processing further handlers for this update
raise ApplicationHandlerStop
async def command_handler(update, context):
try:
# Some operation that might fail
result = await risky_operation()
await update.message.reply_text(f"Success: {result}")
except ValueError as e:
await update.message.reply_text("Invalid input!")
raise ApplicationHandlerStop # Stop processing
except Exception as e:
await update.message.reply_text("Something went wrong!")
raise # Let error handler deal with it
application.add_error_handler(error_handler)from typing import Any, Callable, Dict, List, Optional, Union
CallbackType = Callable[[Update, CallbackContext], object]
ErrorHandlerType = Callable[[Update, CallbackContext], None]
JobCallbackType = Callable[[CallbackContext], object]
class CallbackContext:
bot: Bot
update_queue: asyncio.Queue
job_queue: JobQueue
user_data: dict
chat_data: dict
bot_data: dict
args: list[str]
error: Exception | None
job: 'Job' | None
async def refresh_data(self) -> None: ...
def drop_callback_data(self, callback_query: CallbackQuery) -> None: ...Install with Tessl CLI
npx tessl i tessl/pypi-python-telegram-bot