CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-telegram-bot

A pure Python, asynchronous interface for the Telegram Bot API with comprehensive wrapper and high-level framework for building sophisticated Telegram bots

Pending
Overview
Eval results
Files

advanced-features.mddocs/

Advanced Features

Additional capabilities including job queues for scheduled tasks, persistence for data storage, rate limiting, conversation management, and webhook configuration.

Capabilities

Job Queue

Schedule and manage recurring or one-time tasks.

class JobQueue:
    def __init__(self, scheduler=None): ...
    
    def run_once(
        self,
        callback: callable,
        when: datetime.datetime | datetime.timedelta | int | float,
        data: object = None,
        name: str = None,
        chat_id: int | str = None,
        user_id: int = None,
        job_kwargs: dict = None
    ) -> Job: ...
    
    def run_repeating(
        self,
        callback: callable,
        interval: datetime.timedelta | int | float,
        first: datetime.datetime | datetime.timedelta | int | float = None,
        last: datetime.datetime | datetime.timedelta | int | float = None,
        data: object = None,
        name: str = None,
        chat_id: int | str = None,
        user_id: int = None,
        job_kwargs: dict = None
    ) -> Job: ...
    
    def run_daily(
        self,
        callback: callable,
        time: datetime.time,
        days: tuple[int, ...] = tuple(range(7)),
        data: object = None,
        name: str = None,
        chat_id: int | str = None,
        user_id: int = None,
        job_kwargs: dict = None
    ) -> Job: ...
    
    def run_monthly(
        self,
        callback: callable,
        when: datetime.time,
        day: int,
        data: object = None,
        name: str = None,
        chat_id: int | str = None,
        user_id: int = None,
        job_kwargs: dict = None
    ) -> Job: ...
    
    def run_custom(
        self,
        callback: callable,
        job_kwargs: dict,
        data: object = None,
        name: str = None,
        chat_id: int | str = None,
        user_id: int = None
    ) -> Job: ...
    
    def get_jobs_by_name(self, name: str) -> tuple[Job, ...]: ...
    def jobs(self) -> tuple[Job, ...]: ...
    
    async def start(self) -> None: ...
    async def stop(self) -> None: ...

class Job:
    callback: callable
    data: object
    name: str | None
    chat_id: int | str | None
    user_id: int | None
    
    def schedule_removal(self) -> None: ...
    def remove(self) -> None: ...
    
    @property
    def removed(self) -> bool: ...
    @property
    def enabled(self) -> bool: ...
    @enabled.setter
    def enabled(self, status: bool) -> None: ...
    
    @property
    def next_t(self) -> datetime.datetime | None: ...

Usage examples:

import datetime
from telegram.ext import ContextTypes

async def send_reminder(context: ContextTypes.DEFAULT_TYPE):
    """Send a reminder message."""
    job = context.job
    await context.bot.send_message(
        chat_id=job.chat_id,
        text=f"Reminder: {job.data}"
    )

async def daily_backup(context: ContextTypes.DEFAULT_TYPE):
    """Perform daily backup."""
    # Backup logic here
    print("Performing daily backup...")

# Schedule one-time job (in 10 seconds)
job_queue.run_once(
    send_reminder,
    when=10,
    data="Meeting in conference room",
    chat_id=chat_id,
    name="meeting_reminder"
)

# Schedule repeating job (every 30 seconds)
job_queue.run_repeating(
    send_reminder,
    interval=30,
    first=5,  # Start after 5 seconds
    data="Regular check-in",
    chat_id=chat_id
)

# Schedule daily job (every day at 9:00 AM)
job_queue.run_daily(
    daily_backup,
    time=datetime.time(9, 0, 0),
    name="daily_backup"
)

# Schedule weekly job (Mondays and Fridays at 2:30 PM)  
job_queue.run_daily(
    send_reminder,
    time=datetime.time(14, 30, 0),
    days=(0, 4),  # Monday=0, Friday=4
    data="Weekly team meeting",
    chat_id=chat_id
)

# Schedule monthly job (15th of each month at noon)
job_queue.run_monthly(
    send_reminder,
    when=datetime.time(12, 0, 0),
    day=15,
    data="Monthly report due",
    chat_id=chat_id
)

# Remove jobs by name
jobs = job_queue.get_jobs_by_name("meeting_reminder")
for job in jobs:
    job.remove()

# Disable/enable jobs
job.enabled = False  # Pause job
job.enabled = True   # Resume job

Persistence

Store and retrieve bot data across restarts.

class BasePersistence:
    def __init__(self, store_data: PersistenceInput = None, update_interval: float = 60): ...
    
    async def get_bot_data(self) -> dict: ...
    async def update_bot_data(self, data: dict) -> None: ...
    async def refresh_bot_data(self, bot_data: dict) -> None: ...
    
    async def get_chat_data(self) -> dict[int, dict]: ...
    async def update_chat_data(self, chat_id: int, data: dict) -> None: ...
    async def refresh_chat_data(self, chat_id: int, chat_data: dict) -> None: ...
    async def drop_chat_data(self, chat_id: int) -> None: ...
    
    async def get_user_data(self) -> dict[int, dict]: ...
    async def update_user_data(self, user_id: int, data: dict) -> None: ...
    async def refresh_user_data(self, user_id: int, user_data: dict) -> None: ...
    async def drop_user_data(self, user_id: int) -> None: ...
    
    async def get_callback_data(self) -> tuple[list[tuple], dict]: ...
    async def update_callback_data(self, data: tuple[list[tuple], dict]) -> None: ...
    
    async def get_conversations(self, name: str) -> dict: ...
    async def update_conversation(self, name: str, key: tuple, new_state: object) -> None: ...
    
    async def flush(self) -> None: ...

class PicklePersistence(BasePersistence):
    def __init__(
        self,
        filepath: str,
        store_data: PersistenceInput = None,
        single_file: bool = True,
        on_flush: bool = False,
        update_interval: float = 60
    ): ...

class DictPersistence(BasePersistence):
    def __init__(
        self,
        bot_data: dict = None,
        chat_data: dict = None,
        user_data: dict = None,
        callback_data: tuple = None,
        conversations: dict = None,
        store_data: PersistenceInput = None,
        update_interval: float = 60
    ): ...

class PersistenceInput:
    def __init__(
        self,
        bot_data: bool = True,
        chat_data: bool = True,
        user_data: bool = True,
        callback_data: bool = True
    ): ...

Usage examples:

from telegram.ext import PicklePersistence, DictPersistence, PersistenceInput

# Pickle persistence (file-based)
persistence = PicklePersistence(filepath="bot_data.pkl")

application = (
    ApplicationBuilder()
    .token("YOUR_BOT_TOKEN")
    .persistence(persistence)
    .build()
)

# Dictionary persistence (memory-based)
persistence = DictPersistence()

# Custom persistence configuration
persistence_input = PersistenceInput(
    bot_data=True,
    chat_data=True,
    user_data=True,
    callback_data=False  # Don't store callback data
)
persistence = PicklePersistence(
    filepath="bot_data.pkl",
    store_data=persistence_input
)

# Using persistence in handlers
async def set_user_setting(update, context):
    user_data = context.user_data
    user_data['notifications'] = True
    user_data['timezone'] = 'UTC'
    await update.message.reply_text("Settings saved!")

async def get_user_setting(update, context):
    user_data = context.user_data
    notifications = user_data.get('notifications', False)
    timezone = user_data.get('timezone', 'UTC')
    await update.message.reply_text(
        f"Notifications: {notifications}, Timezone: {timezone}"
    )

# Bot-wide data
async def update_stats(update, context):
    bot_data = context.bot_data
    bot_data.setdefault('message_count', 0)
    bot_data['message_count'] += 1
    
    if bot_data['message_count'] % 1000 == 0:
        await update.message.reply_text(
            f"Bot has processed {bot_data['message_count']} messages!"
        )

# Chat-specific data
async def set_chat_language(update, context):
    chat_data = context.chat_data
    args = context.args
    if args:
        chat_data['language'] = args[0]
        await update.message.reply_text(f"Language set to: {args[0]}")
    else:
        current_lang = chat_data.get('language', 'en')
        await update.message.reply_text(f"Current language: {current_lang}")

Rate Limiting

Control request rates to avoid hitting Telegram's limits.

class BaseRateLimiter:
    async def initialize(self) -> None: ...
    async def shutdown(self) -> None: ...
    async def process_request(
        self,
        callback: callable,
        args: tuple,
        kwargs: dict,
        endpoint: str,
        data: dict,
        rate_limit_args: tuple
    ) -> object: ...

class AIORateLimiter(BaseRateLimiter):
    def __init__(
        self,
        max_retries: int = 0,
        on_retry: callable = None,
        on_failure: callable = None
    ): ...

Usage example:

from telegram.ext import AIORateLimiter

# Create rate limiter
rate_limiter = AIORateLimiter(
    max_retries=3,
    on_retry=lambda retry_state: print(f"Retrying request {retry_state}"),
    on_failure=lambda failure_state: print(f"Request failed: {failure_state}")
)

# Use in application
application = (
    ApplicationBuilder()
    .token("YOUR_BOT_TOKEN")
    .rate_limiter(rate_limiter)
    .build()
)

Conversation Management

Advanced conversation handling beyond basic ConversationHandler.

# Nested conversations
from telegram.ext import ConversationHandler

# States for main conversation
MAIN_MENU, SETTINGS, PROFILE = range(3)

# States for settings sub-conversation  
NOTIFICATIONS, LANGUAGE, THEME = range(3, 6)

# Sub-conversation for settings
settings_conv = ConversationHandler(
    entry_points=[MessageHandler(filters.Regex('^Settings$'), enter_settings)],
    states={
        NOTIFICATIONS: [MessageHandler(filters.TEXT, handle_notifications)],
        LANGUAGE: [MessageHandler(filters.TEXT, handle_language)],
        THEME: [MessageHandler(filters.TEXT, handle_theme)],
    },
    fallbacks=[MessageHandler(filters.Regex('^Back$'), back_to_main)],
    map_to_parent={
        # Map states back to parent conversation
        MAIN_MENU: MAIN_MENU,
        ConversationHandler.END: MAIN_MENU,
    }
)

# Main conversation with nested sub-conversation
main_conv = ConversationHandler(
    entry_points=[CommandHandler('start', start)],
    states={
        MAIN_MENU: [
            settings_conv,  # Nested conversation
            MessageHandler(filters.Regex('^Profile$'), enter_profile),
        ],
        PROFILE: [MessageHandler(filters.TEXT, handle_profile)],
    },
    fallbacks=[CommandHandler('cancel', cancel)],
)

# Per-message conversations
per_message_conv = ConversationHandler(
    entry_points=[CommandHandler('quiz', start_quiz)],
    states={
        1: [MessageHandler(filters.TEXT, question_1)],
        2: [MessageHandler(filters.TEXT, question_2)],
    },
    fallbacks=[CommandHandler('cancel', cancel_quiz)],
    per_message=True,  # Separate conversation per message
)

Webhook Configuration

Configure webhook for production deployment.

# Webhook with SSL certificate
application.run_webhook(
    listen="0.0.0.0",
    port=8443,
    url_path="webhook",
    key="private.key",
    cert="cert.pem",
    webhook_url="https://yourdomain.com:8443/webhook",
    drop_pending_updates=True
)

# Webhook behind reverse proxy
application.run_webhook(
    listen="127.0.0.1",
    port=5000,
    url_path="telegram-webhook",
    webhook_url="https://yourdomain.com/telegram-webhook",
    secret_token="your_secret_token"
)

# Custom webhook handling
from telegram import Update
import json

async def webhook_handler(request):
    """Custom webhook handler."""
    body = await request.body()
    update_dict = json.loads(body)
    update = Update.de_json(update_dict, application.bot)
    await application.process_update(update)
    return {"status": "ok"}

Error Handling and Logging

Comprehensive error handling and logging setup.

import logging
from telegram.error import TelegramError, BadRequest, Forbidden, NetworkError

# Configure logging
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

# Error handler
async def error_handler(update, context):
    """Handle errors in the application."""
    logger.error('Update "%s" caused error "%s"', update, context.error)
    
    if isinstance(context.error, BadRequest):
        # Handle bad requests (invalid parameters, etc.)
        if update and update.message:
            await update.message.reply_text("Sorry, there was an error with your request.")
    
    elif isinstance(context.error, Forbidden):
        # Handle forbidden errors (blocked by user, insufficient permissions)
        logger.warning("Bot was blocked by user or lacks permissions")
    
    elif isinstance(context.error, NetworkError):
        # Handle network errors (connection issues, timeouts)
        logger.warning("Network error occurred, retrying...")
    
    else:
        # Handle other Telegram errors
        logger.error("Unexpected error: %s", context.error)

application.add_error_handler(error_handler)

# Graceful shutdown
import signal
import asyncio

def signal_handler(signum, frame):
    """Handle shutdown signals."""
    logger.info("Received signal %s, shutting down gracefully...", signum)
    asyncio.create_task(application.shutdown())

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

Custom Context Types

Create custom context classes for additional functionality.

from telegram.ext import CallbackContext, ContextTypes

class CustomContext(CallbackContext):
    """Custom context with additional features."""
    
    def __init__(self, application, chat_id=None, user_id=None):
        super().__init__(application, chat_id, user_id)
        self._database = None
    
    @property
    def database(self):
        """Lazy database connection."""
        if self._database is None:
            self._database = get_database_connection()
        return self._database
    
    async def log_action(self, action: str):
        """Log user action to database."""
        await self.database.log_user_action(
            user_id=self.user_id,
            chat_id=self.chat_id,
            action=action
        )

# Configure custom context
context_types = ContextTypes(context=CustomContext)
application = (
    ApplicationBuilder()
    .token("YOUR_BOT_TOKEN")
    .context_types(context_types)
    .build()
)

# Use custom context in handlers
async def custom_handler(update: Update, context: CustomContext):
    await context.log_action("button_pressed")
    # Access custom database property
    user_stats = await context.database.get_user_stats(context.user_id)
    await update.message.reply_text(f"Your stats: {user_stats}")

Performance Optimization

Optimize bot performance for high-load scenarios.

# Concurrent update processing
application = (
    ApplicationBuilder()
    .token("YOUR_BOT_TOKEN")
    .concurrent_updates(True)  # Enable concurrent processing
    .build()
)

# Custom update processor with higher concurrency
from telegram.ext import SimpleUpdateProcessor

update_processor = SimpleUpdateProcessor(max_concurrent_updates=512)
application = (
    ApplicationBuilder()
    .token("YOUR_BOT_TOKEN")
    .update_processor(update_processor)
    .build()
)

# Connection pooling
from telegram.request import HTTPXRequest

request = HTTPXRequest(connection_pool_size=20)
application = (
    ApplicationBuilder()
    .token("YOUR_BOT_TOKEN")
    .request(request)
    .build()
)

# Callback data caching for inline keyboards
from telegram.ext import CallbackDataCache

callback_data_cache = CallbackDataCache(maxsize=1024, ttl=3600)
application = (
    ApplicationBuilder()
    .token("YOUR_BOT_TOKEN")
    .arbitrary_callback_data(True)
    .build()
)

Types

from typing import Any, Callable, Dict, Optional, Union
from datetime import datetime, timedelta, time

JobCallback = Callable[[CallbackContext], object]
ErrorCallback = Callable[[Update, CallbackContext], None]

class PersistenceInput:
    bot_data: bool
    chat_data: bool  
    user_data: bool
    callback_data: bool

Install with Tessl CLI

npx tessl i tessl/pypi-python-telegram-bot

docs

advanced-features.md

application-framework.md

bot-api.md

files.md

filters.md

handlers.md

index.md

keyboards.md

telegram-types.md

tile.json