CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-telethon

Full-featured Telegram client library for Python 3

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

event-system.mddocs/

Event System

Real-time update handling with event decorators, filters, and handlers for messages, edits, deletions, and other Telegram updates in Telethon.

Capabilities

Event Handler Registration

Register and manage event handlers for real-time updates.

def on(self, event):
    """
    Decorator to register event handlers.
    
    Parameters:
    - event: Event class or instance (NewMessage, MessageEdited, etc.)
    
    Usage:
    @client.on(events.NewMessage)
    async def handler(event):
        await event.respond('Hello!')
    """

def add_event_handler(
    self,
    callback,
    event=None
):
    """
    Add an event handler function.
    
    Parameters:
    - callback: Async function to handle events
    - event: Event filter (None for all events)
    
    Usage:
    async def my_handler(event):
        print(f"Got event: {event}")
    
    client.add_event_handler(my_handler, events.NewMessage)
    """

def remove_event_handler(
    self,
    callback,
    event=None
) -> int:
    """
    Remove an event handler.
    
    Parameters:
    - callback: Handler function to remove
    - event: Specific event type to remove (None for all)
    
    Returns:
    int: Number of handlers removed
    """

def list_event_handlers(self) -> List[Tuple[Callable, EventBuilder]]:
    """
    List all registered event handlers.
    
    Returns:
    List of (callback, event) tuples
    """

Event Loop Management

Control the event loop and update processing.

def run_until_disconnected(self):
    """
    Run the client until disconnected.
    
    Blocks the current thread and processes events.
    Use in synchronous code or as final call in async main().
    
    Usage:
    client.start()
    client.run_until_disconnected()
    """

async def catch_up(self):
    """
    Catch up on missed updates since last connection.
    
    Processes updates that occurred while client was offline.
    Requires catch_up=True in client constructor.
    """

async def set_receive_updates(self, receive_updates: bool):
    """
    Enable or disable receiving updates.
    
    Parameters:
    - receive_updates: Whether to process updates
    """

Message Events

Handle new, edited, and deleted messages.

class NewMessage:
    """
    Event for new incoming messages.
    
    Attributes:
    - message: The new message object
    - chat: Chat where message was sent
    - sender: User who sent the message
    """
    def __init__(
        self,
        chats=None,
        *,
        blacklist_chats: bool = False,
        func=None,
        pattern=None,
        from_users=None,
        forwards=None,
        incoming: bool = None,
        outgoing: bool = None
    ):
        """
        Filter for new messages.
        
        Parameters:
        - chats: Specific chats to monitor (None for all)
        - blacklist_chats: Exclude instead of include specified chats
        - func: Custom filter function
        - pattern: Regex pattern to match message text
        - from_users: Only messages from these users
        - forwards: Include forwarded messages
        - incoming: Only incoming messages
        - outgoing: Only outgoing messages
        """

class MessageEdited:
    """
    Event for message edits.
    
    Attributes same as NewMessage.
    """
    def __init__(self, **kwargs):
        """Parameters same as NewMessage."""

class MessageDeleted:
    """
    Event for message deletions.
    
    Attributes:
    - deleted_ids: List of deleted message IDs
    - peer: Chat where messages were deleted
    """
    def __init__(self, chats=None, *, blacklist_chats: bool = False):
        """
        Filter for message deletions.
        
        Parameters:
        - chats: Specific chats to monitor
        - blacklist_chats: Exclude specified chats
        """

class MessageRead:
    """
    Event for message read status changes.
    
    Attributes:
    - message_ids: IDs of messages marked as read
    - peer: Chat where messages were read
    - max_id: Highest message ID read
    """
    def __init__(self, chats=None, *, blacklist_chats: bool = False):
        """Parameters same as MessageDeleted."""

Interactive Events

Handle button clicks and inline queries.

class CallbackQuery:
    """
    Event for inline button clicks.
    
    Attributes:
    - query: The callback query object
    - data: Button callback data
    - message: Message containing the button
    - sender: User who clicked the button
    """
    def __init__(
        self,
        chats=None,
        *,
        blacklist_chats: bool = False,
        func=None,
        data=None,
        pattern=None
    ):
        """
        Filter for callback queries.
        
        Parameters:
        - chats: Specific chats to monitor
        - blacklist_chats: Exclude specified chats
        - func: Custom filter function
        - data: Specific callback data to match
        - pattern: Regex pattern for callback data
        """

class InlineQuery:
    """
    Event for inline bot queries.
    
    Attributes:
    - query: The inline query object
    - text: Query text from user
    - user: User making the query
    - offset: Pagination offset
    """
    def __init__(
        self,
        *,
        users=None,
        blacklist_users: bool = False,
        func=None,
        pattern=None
    ):
        """
        Filter for inline queries.
        
        Parameters:
        - users: Specific users to monitor
        - blacklist_users: Exclude specified users
        - func: Custom filter function
        - pattern: Regex pattern for query text
        """

Chat Activity Events

Monitor chat actions and user status changes.

class ChatAction:
    """
    Event for chat actions (typing, online status, etc.).
    
    Attributes:
    - action: The specific action (typing, online, etc.)
    - user: User performing the action
    - chat: Chat where action occurred
    """
    def __init__(
        self,
        chats=None,
        *,
        blacklist_chats: bool = False,
        func=None
    ):
        """
        Filter for chat actions.
        
        Parameters:
        - chats: Specific chats to monitor
        - blacklist_chats: Exclude specified chats
        - func: Custom filter function
        """

class UserUpdate:
    """
    Event for user status changes (online, offline, etc.).
    
    Attributes:
    - user: User whose status changed
    - status: New user status
    """
    def __init__(
        self,
        *,
        users=None,
        blacklist_users: bool = False,
        func=None
    ):
        """
        Filter for user updates.
        
        Parameters:
        - users: Specific users to monitor
        - blacklist_users: Exclude specified users
        - func: Custom filter function
        """

Album Events

Handle grouped media messages.

class Album:
    """
    Event for grouped media messages (albums).
    
    Attributes:
    - messages: List of messages in the album
    - grouped_id: Album group identifier
    """
    def __init__(
        self,
        chats=None,
        *,
        blacklist_chats: bool = False,
        func=None
    ):
        """
        Filter for album messages.
        
        Parameters same as NewMessage.
        """

Raw Events

Handle low-level Telegram updates.

class Raw:
    """
    Event for raw Telegram updates.
    
    Provides access to unprocessed updates from Telegram.
    
    Attributes:
    - update: Raw update object from Telegram
    """
    def __init__(
        self,
        *,
        types=None,
        func=None
    ):
        """
        Filter for raw updates.
        
        Parameters:
        - types: Specific update types to handle
        - func: Custom filter function
        """

Event Control

Control event propagation and handling.

class StopPropagation(Exception):
    """
    Exception to stop processing further event handlers.
    
    Raise this in an event handler to prevent other handlers
    from processing the same event.
    
    Usage:
    @client.on(events.NewMessage)
    async def first_handler(event):
        if event.text.startswith('/stop'):
            await event.respond('Stopping here!')
            raise StopPropagation
    
    @client.on(events.NewMessage)
    async def second_handler(event):
        # This won't run if first handler raises StopPropagation
        await event.respond('This might not be sent')
    """

Standalone Event Registration

Register events without a client instance.

def register(event=None):
    """
    Decorator to register event handlers without a client.
    
    Parameters:
    - event: Event class or instance
    
    Usage:
    @events.register(events.NewMessage)
    async def handler(event):
        await event.respond('Hello!')
    
    # Later, add to client:
    client.add_event_handler(handler)
    """

def unregister(callback, event=None):
    """
    Remove registered event handlers.
    
    Parameters:
    - callback: Handler function to remove
    - event: Specific event type (None for all)
    
    Returns:
    int: Number of handlers removed
    """

def is_handler(callback) -> bool:
    """
    Check if a function is registered as an event handler.
    
    Parameters:
    - callback: Function to check
    
    Returns:
    bool: True if registered as handler
    """

def list(callback) -> List:
    """
    List event builders registered to a callback.
    
    Parameters:
    - callback: Handler function
    
    Returns:
    List of event builders
    """

Usage Examples

Basic Event Handling

import asyncio
import re
from telethon import TelegramClient, events

async def basic_events():
    client = TelegramClient('session', api_id, api_hash)
    
    # Simple message handler
    @client.on(events.NewMessage)
    async def message_handler(event):
        print(f"New message: {event.text}")
        await event.respond(f"You said: {event.text}")
    
    # Pattern-based handler
    @client.on(events.NewMessage(pattern=r'/start'))
    async def start_handler(event):
        await event.respond('Hello! I am a bot.')
    
    # Handler for specific chats
    @client.on(events.NewMessage(chats=['username', 'chat_id']))
    async def chat_handler(event):
        await event.respond('Message in monitored chat!')
    
    await client.start()
    await client.run_until_disconnected()

asyncio.run(basic_events())

Advanced Message Filtering

import re
from telethon import events

async def advanced_filtering():
    client = TelegramClient('session', api_id, api_hash)
    
    # Command pattern matching
    @client.on(events.NewMessage(pattern=r'/echo (.+)'))
    async def echo_command(event):
        text_to_echo = event.pattern_match.group(1)
        await event.respond(f"Echo: {text_to_echo}")
    
    # Custom filter function
    def is_admin_message(event):
        return event.sender_id in [admin_id_1, admin_id_2]
    
    @client.on(events.NewMessage(func=is_admin_message))
    async def admin_handler(event):
        await event.respond("Admin command received!")
    
    # Multiple conditions
    @client.on(events.NewMessage(
        chats=[group_chat_id],
        from_users=[specific_user_id],
        incoming=True
    ))
    async def specific_handler(event):
        await event.respond("Message from specific user in group!")
    
    # Forwarded messages only
    @client.on(events.NewMessage(forwards=True))
    async def forward_handler(event):
        await event.respond("This is a forwarded message!")
    
    await client.start()
    await client.run_until_disconnected()

Interactive Events

from telethon import Button, events

async def interactive_events():
    client = TelegramClient('session', api_id, api_hash)
    
    # Send message with buttons
    @client.on(events.NewMessage(pattern='/menu'))
    async def menu_handler(event):
        buttons = [
            [Button.inline('Option 1', b'opt1')],
            [Button.inline('Option 2', b'opt2')],
            [Button.inline('Help', b'help')]
        ]
        await event.respond('Choose an option:', buttons=buttons)
    
    # Handle button clicks
    @client.on(events.CallbackQuery(data=b'opt1'))
    async def option1_handler(event):
        await event.answer('You chose Option 1!')
        await event.edit('Option 1 selected ✓')
    
    @client.on(events.CallbackQuery(data=b'opt2'))
    async def option2_handler(event):
        await event.answer('You chose Option 2!')
        await event.edit('Option 2 selected ✓')
    
    @client.on(events.CallbackQuery(data=b'help'))
    async def help_handler(event):
        await event.answer('This is help text!', alert=True)
    
    # Pattern matching for callback data
    @client.on(events.CallbackQuery(pattern=rb'user_(\d+)'))
    async def user_callback(event):
        user_id = int(event.pattern_match.group(1))
        await event.answer(f'User ID: {user_id}')
    
    await client.start()
    await client.run_until_disconnected()

Event Lifecycle Management

async def event_lifecycle():
    client = TelegramClient('session', api_id, api_hash)
    
    # Handler with stop propagation
    @client.on(events.NewMessage(pattern='/stop'))
    async def stop_handler(event):
        await event.respond('Stopping here!')
        raise events.StopPropagation
    
    @client.on(events.NewMessage)
    async def catch_all_handler(event):
        # This won't run for /stop messages
        await event.respond('Catch-all handler')
    
    # Dynamic handler addition
    async def dynamic_handler(event):
        await event.respond('Dynamic handler!')
    
    # Add handler programmatically
    client.add_event_handler(
        dynamic_handler, 
        events.NewMessage(pattern='/dynamic')
    )
    
    # Remove handler after some time
    await asyncio.sleep(60)  # Wait 1 minute
    removed = client.remove_event_handler(dynamic_handler)
    print(f"Removed {removed} handlers")
    
    # List all handlers
    handlers = client.list_event_handlers()
    print(f"Active handlers: {len(handlers)}")
    
    await client.start()
    await client.run_until_disconnected()

Album and Media Events

async def media_events():
    client = TelegramClient('session', api_id, api_hash)
    
    # Handle grouped media (albums)
    @client.on(events.Album)
    async def album_handler(event):
        await event.respond(
            f'Received album with {len(event.messages)} items!'
        )
        
        # Process each item in album
        for message in event.messages:
            if message.photo:
                print(f"Photo in album: {message.id}")
            elif message.video:
                print(f"Video in album: {message.id}")
    
    # Handle message edits
    @client.on(events.MessageEdited)
    async def edit_handler(event):
        await event.respond(f'Message {event.id} was edited!')
    
    # Handle message deletions
    @client.on(events.MessageDeleted)
    async def delete_handler(event):
        print(f'Messages deleted: {event.deleted_ids}')
        # Note: Can't respond to deleted messages
    
    # Handle read status
    @client.on(events.MessageRead)
    async def read_handler(event):
        print(f'Messages read up to ID: {event.max_id}')
    
    await client.start()
    await client.run_until_disconnected()

Chat Activity Monitoring

async def activity_monitoring():
    client = TelegramClient('session', api_id, api_hash)
    
    # Monitor typing indicators
    @client.on(events.ChatAction)
    async def typing_handler(event):
        if hasattr(event.action, 'typing') and event.action.typing:
            user = await event.get_user()
            print(f'{user.first_name} is typing...')
    
    # Monitor user status changes
    @client.on(events.UserUpdate)
    async def status_handler(event):
        user = await event.get_user()
        status = event.status
        
        if hasattr(status, 'online') and status.online:
            print(f'{user.first_name} came online')
        elif hasattr(status, 'offline'):
            print(f'{user.first_name} went offline')
    
    await client.start()
    await client.run_until_disconnected()

Raw Event Handling

from telethon.tl.types import UpdateNewMessage, UpdateMessageEdited

async def raw_events():
    client = TelegramClient('session', api_id, api_hash)
    
    # Handle specific raw update types
    @client.on(events.Raw(types=[UpdateNewMessage]))
    async def raw_message_handler(event):
        update = event.update
        message = update.message
        print(f"Raw message: {message.message}")
    
    # Handle all raw updates
    @client.on(events.Raw)
    async def all_updates_handler(event):
        update_type = type(event.update).__name__
        print(f"Received update: {update_type}")
    
    await client.start()
    await client.run_until_disconnected()

Types

from typing import Union, List, Optional, Callable, Pattern, Any
import re
from telethon.tl import types
from telethon import custom

EventHandler = Callable[[Any], None]
FilterFunction = Callable[[Any], bool]
PatternType = Union[str, Pattern[str], Pattern[bytes]]
EntityFilter = Union[int, str, List[Union[int, str]]]

class EventBuilder:
    """Base class for event filters"""
    
class EventCommon:
    """Common event attributes and methods"""
    client: TelegramClient
    original_update: types.Updates
    
    async def get_chat(self):
        """Get the chat where the event occurred"""
        
    async def get_sender(self):
        """Get the user who triggered the event"""
        
    async def get_input_chat(self):
        """Get InputPeer for the chat"""
        
    async def get_input_sender(self):
        """Get InputPeer for the sender"""

Install with Tessl CLI

npx tessl i tessl/pypi-telethon

docs

authentication-sessions.md

chat-management.md

client-management.md

error-handling.md

event-system.md

file-handling.md

index.md

message-operations.md

utilities-helpers.md

tile.json