Modern and fully asynchronous framework for Telegram Bot API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core classes for bot instantiation, API communication, update processing, and event routing. These form the foundation of any aiogram bot application.
The Bot class serves as the primary interface to the Telegram Bot API, handling HTTP communication, automatic retries, session management, and method execution.
class Bot:
def __init__(
self,
token: str,
parse_mode: str | None = None,
disable_web_page_preview: bool | None = None,
protect_content: bool | None = None,
session: BaseSession | None = None,
**kwargs
):
"""
Initialize a Bot instance.
Parameters:
- token: Bot token from @BotFather
- parse_mode: Default parse mode (HTML, Markdown, MarkdownV2)
- disable_web_page_preview: Disable link previews by default
- protect_content: Protect content from forwarding/saving by default
- session: Custom HTTP session (defaults to AiohttpSession)
"""
async def get_me(self) -> User:
"""Get basic info about the bot"""
async def close(self) -> bool:
"""Close the bot instance and clean up resources"""
async def get_updates(
self,
offset: int | None = None,
limit: int | None = None,
timeout: int | None = None,
allowed_updates: list[str] | None = None
) -> list[Update]:
"""Get updates using long polling"""
@property
def id(self) -> int | None:
"""Bot's unique identifier (available after first API call)"""
@property
def username(self) -> str | None:
"""Bot's username (available after first API call)"""
@property
def first_name(self) -> str | None:
"""Bot's first name (available after first API call)"""The Dispatcher is the root router that manages the entire update processing pipeline, including middleware execution, handler resolution, and FSM state management.
class Dispatcher(Router):
def __init__(
self,
*,
storage: BaseStorage | None = None,
fsm_strategy: FSMStrategy = FSMStrategy.CHAT,
events_isolation: BaseEventIsolation | None = None,
disable_fsm: bool = False,
**kwargs
):
"""
Initialize the Dispatcher.
Parameters:
- storage: FSM storage backend (defaults to MemoryStorage)
- fsm_strategy: FSM isolation strategy (CHAT, USER_IN_CHAT, GLOBAL_USER)
- events_isolation: Event isolation mechanism
- disable_fsm: Disable FSM functionality entirely
"""
async def start_polling(
self,
bot: Bot,
*,
polling_timeout: int = 10,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = BackoffConfig(),
**kwargs
) -> None:
"""
Start polling for updates from Telegram.
Parameters:
- bot: Bot instance to use for polling
- polling_timeout: Timeout for long polling requests
- handle_as_tasks: Handle updates as asyncio tasks
- backoff_config: Backoff configuration for retries
"""
async def feed_update(self, bot: Bot, update: Update, **kwargs) -> Any:
"""Process a single update through the middleware and handler pipeline"""
def include_router(self, router: Router) -> None:
"""Include a child router in the dispatcher"""
def include_routers(self, *routers: Router) -> None:
"""Include multiple child routers"""
@property
def storage(self) -> BaseStorage:
"""Get the configured FSM storage backend"""Routers organize handlers and middleware into logical groups, supporting nested routing structures for modular bot development.
class Router:
def __init__(self, *, name: str | None = None):
"""
Initialize a Router.
Parameters:
- name: Optional name for the router (useful for debugging)
"""
def include_router(self, router: Router) -> None:
"""Include a child router"""
def include_routers(self, *routers: Router) -> None:
"""Include multiple child routers"""
# Event observers for handler registration
@property
def message(self) -> HandlerObject[Message]:
"""Message update observer"""
@property
def edited_message(self) -> HandlerObject[Message]:
"""Edited message update observer"""
@property
def channel_post(self) -> HandlerObject[Message]:
"""Channel post update observer"""
@property
def edited_channel_post(self) -> HandlerObject[Message]:
"""Edited channel post update observer"""
@property
def inline_query(self) -> HandlerObject[InlineQuery]:
"""Inline query update observer"""
@property
def chosen_inline_result(self) -> HandlerObject[ChosenInlineResult]:
"""Chosen inline result update observer"""
@property
def callback_query(self) -> HandlerObject[CallbackQuery]:
"""Callback query update observer"""
@property
def shipping_query(self) -> HandlerObject[ShippingQuery]:
"""Shipping query update observer"""
@property
def pre_checkout_query(self) -> HandlerObject[PreCheckoutQuery]:
"""Pre-checkout query update observer"""
@property
def poll(self) -> HandlerObject[Poll]:
"""Poll update observer"""
@property
def poll_answer(self) -> HandlerObject[PollAnswer]:
"""Poll answer update observer"""
@property
def my_chat_member(self) -> HandlerObject[ChatMemberUpdated]:
"""My chat member update observer"""
@property
def chat_member(self) -> HandlerObject[ChatMemberUpdated]:
"""Chat member update observer"""
@property
def chat_join_request(self) -> HandlerObject[ChatJoinRequest]:
"""Chat join request update observer"""
@property
def message_reaction(self) -> HandlerObject[MessageReactionUpdated]:
"""Message reaction update observer"""
@property
def message_reaction_count(self) -> HandlerObject[MessageReactionCountUpdated]:
"""Message reaction count update observer"""
@property
def chat_boost(self) -> HandlerObject[ChatBoostUpdated]:
"""Chat boost update observer"""
@property
def removed_chat_boost(self) -> HandlerObject[ChatBoostRemoved]:
"""Removed chat boost update observer"""
@property
def deleted_business_messages(self) -> HandlerObject[BusinessMessagesDeleted]:
"""Deleted business messages update observer"""
@property
def business_connection(self) -> HandlerObject[BusinessConnection]:
"""Business connection update observer"""
@property
def edited_business_message(self) -> HandlerObject[Message]:
"""Edited business message update observer"""
@property
def business_message(self) -> HandlerObject[Message]:
"""Business message update observer"""
@property
def purchased_paid_media(self) -> HandlerObject[PaidMediaPurchased]:
"""Purchased paid media update observer"""
@property
def error(self) -> HandlerObject[ErrorEvent]:
"""Error event observer"""
@property
def startup(self) -> HandlerObject[StartupEvent]:
"""Startup event observer"""
@property
def shutdown(self) -> HandlerObject[ShutdownEvent]:
"""Shutdown event observer"""Base middleware class for creating custom middleware components that process updates and API calls.
class BaseMiddleware:
"""Base class for creating middleware"""
async def __call__(
self,
handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: dict[str, Any]
) -> Any:
"""
Middleware execution method.
Parameters:
- handler: Next handler in the chain
- event: Telegram event object
- data: Handler context data
Returns:
Result from handler execution
"""import asyncio
from aiogram import Bot, Dispatcher, Router
from aiogram.types import Message
from aiogram.filters import Command
# Create bot and dispatcher
bot = Bot(token="YOUR_BOT_TOKEN")
dp = Dispatcher()
# Create router for organizing handlers
main_router = Router()
@main_router.message(Command("start"))
async def start_command(message: Message):
await message.answer("Welcome to the bot!")
# Include router in dispatcher
dp.include_router(main_router)
# Start the bot
async def main():
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())from aiogram import Bot, Dispatcher
from aiogram.client.session.aiohttp import AiohttpSession
from aiogram.client.telegram import TelegramAPIServer
import aiohttp
# Create custom session with proxy
session = AiohttpSession(
connector=aiohttp.TCPConnector(limit=100),
timeout=aiohttp.ClientTimeout(total=30)
)
# Create bot with custom API server
bot = Bot(
token="YOUR_BOT_TOKEN",
session=session,
server=TelegramAPIServer.from_base("https://api.telegram.org")
)
dp = Dispatcher()from aiogram import Dispatcher, Router
# Main dispatcher
dp = Dispatcher()
# Feature-specific routers
admin_router = Router(name="admin")
user_router = Router(name="user")
callback_router = Router(name="callbacks")
# Include routers with hierarchy
dp.include_routers(admin_router, user_router, callback_router)
# Routers can also include other routers
feature_router = Router(name="feature")
feature_router.include_router(callback_router)
user_router.include_router(feature_router)from aiogram import BaseMiddleware
from aiogram.types import TelegramObject
from typing import Callable, Dict, Any, Awaitable
class LoggingMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
print(f"Processing {type(event).__name__}")
result = await handler(event, data)
print(f"Finished processing {type(event).__name__}")
return result
# Register middleware
dp.message.middleware(LoggingMiddleware())class HandlerObject[T: TelegramObject]:
"""Handler registration interface for specific event types"""
def __call__(self, *filters: Filter) -> Callable:
"""Decorator for registering handlers with filters"""
def register(
self,
callback: Callable,
*filters: Filter,
flags: dict[str, Any] | None = None
) -> None:
"""Register a handler programmatically"""
def filter(self, custom_filter: Filter) -> HandlerObject[T]:
"""Add a filter to the handler object"""
def middleware(self, middleware: BaseMiddleware) -> None:
"""Register middleware for this handler type"""class ErrorEvent:
"""Error event for exception handling"""
exception: Exception
update: Update
class StartupEvent:
"""Startup event fired when dispatcher starts"""
pass
class ShutdownEvent:
"""Shutdown event fired when dispatcher stops"""
passInstall with Tessl CLI
npx tessl i tessl/pypi-aiogram