Full-featured Telegram client library for Python 3
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Real-time update handling with event decorators, filters, and handlers for messages, edits, deletions, and other Telegram updates in Telethon.
Register and manage event handlers for real-time updates.
def on(self, event):
"""
Decorator to register event handlers.
Parameters:
- event: Event class or instance (NewMessage, MessageEdited, etc.)
Usage:
@client.on(events.NewMessage)
async def handler(event):
await event.respond('Hello!')
"""
def add_event_handler(
self,
callback,
event=None
):
"""
Add an event handler function.
Parameters:
- callback: Async function to handle events
- event: Event filter (None for all events)
Usage:
async def my_handler(event):
print(f"Got event: {event}")
client.add_event_handler(my_handler, events.NewMessage)
"""
def remove_event_handler(
self,
callback,
event=None
) -> int:
"""
Remove an event handler.
Parameters:
- callback: Handler function to remove
- event: Specific event type to remove (None for all)
Returns:
int: Number of handlers removed
"""
def list_event_handlers(self) -> List[Tuple[Callable, EventBuilder]]:
"""
List all registered event handlers.
Returns:
List of (callback, event) tuples
"""Control the event loop and update processing.
def run_until_disconnected(self):
"""
Run the client until disconnected.
Blocks the current thread and processes events.
Use in synchronous code or as final call in async main().
Usage:
client.start()
client.run_until_disconnected()
"""
async def catch_up(self):
"""
Catch up on missed updates since last connection.
Processes updates that occurred while client was offline.
Requires catch_up=True in client constructor.
"""
async def set_receive_updates(self, receive_updates: bool):
"""
Enable or disable receiving updates.
Parameters:
- receive_updates: Whether to process updates
"""Handle new, edited, and deleted messages.
class NewMessage:
"""
Event for new incoming messages.
Attributes:
- message: The new message object
- chat: Chat where message was sent
- sender: User who sent the message
"""
def __init__(
self,
chats=None,
*,
blacklist_chats: bool = False,
func=None,
pattern=None,
from_users=None,
forwards=None,
incoming: bool = None,
outgoing: bool = None
):
"""
Filter for new messages.
Parameters:
- chats: Specific chats to monitor (None for all)
- blacklist_chats: Exclude instead of include specified chats
- func: Custom filter function
- pattern: Regex pattern to match message text
- from_users: Only messages from these users
- forwards: Include forwarded messages
- incoming: Only incoming messages
- outgoing: Only outgoing messages
"""
class MessageEdited:
"""
Event for message edits.
Attributes same as NewMessage.
"""
def __init__(self, **kwargs):
"""Parameters same as NewMessage."""
class MessageDeleted:
"""
Event for message deletions.
Attributes:
- deleted_ids: List of deleted message IDs
- peer: Chat where messages were deleted
"""
def __init__(self, chats=None, *, blacklist_chats: bool = False):
"""
Filter for message deletions.
Parameters:
- chats: Specific chats to monitor
- blacklist_chats: Exclude specified chats
"""
class MessageRead:
"""
Event for message read status changes.
Attributes:
- message_ids: IDs of messages marked as read
- peer: Chat where messages were read
- max_id: Highest message ID read
"""
def __init__(self, chats=None, *, blacklist_chats: bool = False):
"""Parameters same as MessageDeleted."""Handle button clicks and inline queries.
class CallbackQuery:
"""
Event for inline button clicks.
Attributes:
- query: The callback query object
- data: Button callback data
- message: Message containing the button
- sender: User who clicked the button
"""
def __init__(
self,
chats=None,
*,
blacklist_chats: bool = False,
func=None,
data=None,
pattern=None
):
"""
Filter for callback queries.
Parameters:
- chats: Specific chats to monitor
- blacklist_chats: Exclude specified chats
- func: Custom filter function
- data: Specific callback data to match
- pattern: Regex pattern for callback data
"""
class InlineQuery:
"""
Event for inline bot queries.
Attributes:
- query: The inline query object
- text: Query text from user
- user: User making the query
- offset: Pagination offset
"""
def __init__(
self,
*,
users=None,
blacklist_users: bool = False,
func=None,
pattern=None
):
"""
Filter for inline queries.
Parameters:
- users: Specific users to monitor
- blacklist_users: Exclude specified users
- func: Custom filter function
- pattern: Regex pattern for query text
"""Monitor chat actions and user status changes.
class ChatAction:
"""
Event for chat actions (typing, online status, etc.).
Attributes:
- action: The specific action (typing, online, etc.)
- user: User performing the action
- chat: Chat where action occurred
"""
def __init__(
self,
chats=None,
*,
blacklist_chats: bool = False,
func=None
):
"""
Filter for chat actions.
Parameters:
- chats: Specific chats to monitor
- blacklist_chats: Exclude specified chats
- func: Custom filter function
"""
class UserUpdate:
"""
Event for user status changes (online, offline, etc.).
Attributes:
- user: User whose status changed
- status: New user status
"""
def __init__(
self,
*,
users=None,
blacklist_users: bool = False,
func=None
):
"""
Filter for user updates.
Parameters:
- users: Specific users to monitor
- blacklist_users: Exclude specified users
- func: Custom filter function
"""Handle grouped media messages.
class Album:
"""
Event for grouped media messages (albums).
Attributes:
- messages: List of messages in the album
- grouped_id: Album group identifier
"""
def __init__(
self,
chats=None,
*,
blacklist_chats: bool = False,
func=None
):
"""
Filter for album messages.
Parameters same as NewMessage.
"""Handle low-level Telegram updates.
class Raw:
"""
Event for raw Telegram updates.
Provides access to unprocessed updates from Telegram.
Attributes:
- update: Raw update object from Telegram
"""
def __init__(
self,
*,
types=None,
func=None
):
"""
Filter for raw updates.
Parameters:
- types: Specific update types to handle
- func: Custom filter function
"""Control event propagation and handling.
class StopPropagation(Exception):
"""
Exception to stop processing further event handlers.
Raise this in an event handler to prevent other handlers
from processing the same event.
Usage:
@client.on(events.NewMessage)
async def first_handler(event):
if event.text.startswith('/stop'):
await event.respond('Stopping here!')
raise StopPropagation
@client.on(events.NewMessage)
async def second_handler(event):
# This won't run if first handler raises StopPropagation
await event.respond('This might not be sent')
"""Register events without a client instance.
def register(event=None):
"""
Decorator to register event handlers without a client.
Parameters:
- event: Event class or instance
Usage:
@events.register(events.NewMessage)
async def handler(event):
await event.respond('Hello!')
# Later, add to client:
client.add_event_handler(handler)
"""
def unregister(callback, event=None):
"""
Remove registered event handlers.
Parameters:
- callback: Handler function to remove
- event: Specific event type (None for all)
Returns:
int: Number of handlers removed
"""
def is_handler(callback) -> bool:
"""
Check if a function is registered as an event handler.
Parameters:
- callback: Function to check
Returns:
bool: True if registered as handler
"""
def list(callback) -> List:
"""
List event builders registered to a callback.
Parameters:
- callback: Handler function
Returns:
List of event builders
"""import asyncio
import re
from telethon import TelegramClient, events
async def basic_events():
client = TelegramClient('session', api_id, api_hash)
# Simple message handler
@client.on(events.NewMessage)
async def message_handler(event):
print(f"New message: {event.text}")
await event.respond(f"You said: {event.text}")
# Pattern-based handler
@client.on(events.NewMessage(pattern=r'/start'))
async def start_handler(event):
await event.respond('Hello! I am a bot.')
# Handler for specific chats
@client.on(events.NewMessage(chats=['username', 'chat_id']))
async def chat_handler(event):
await event.respond('Message in monitored chat!')
await client.start()
await client.run_until_disconnected()
asyncio.run(basic_events())import re
from telethon import events
async def advanced_filtering():
client = TelegramClient('session', api_id, api_hash)
# Command pattern matching
@client.on(events.NewMessage(pattern=r'/echo (.+)'))
async def echo_command(event):
text_to_echo = event.pattern_match.group(1)
await event.respond(f"Echo: {text_to_echo}")
# Custom filter function
def is_admin_message(event):
return event.sender_id in [admin_id_1, admin_id_2]
@client.on(events.NewMessage(func=is_admin_message))
async def admin_handler(event):
await event.respond("Admin command received!")
# Multiple conditions
@client.on(events.NewMessage(
chats=[group_chat_id],
from_users=[specific_user_id],
incoming=True
))
async def specific_handler(event):
await event.respond("Message from specific user in group!")
# Forwarded messages only
@client.on(events.NewMessage(forwards=True))
async def forward_handler(event):
await event.respond("This is a forwarded message!")
await client.start()
await client.run_until_disconnected()from telethon import Button, events
async def interactive_events():
client = TelegramClient('session', api_id, api_hash)
# Send message with buttons
@client.on(events.NewMessage(pattern='/menu'))
async def menu_handler(event):
buttons = [
[Button.inline('Option 1', b'opt1')],
[Button.inline('Option 2', b'opt2')],
[Button.inline('Help', b'help')]
]
await event.respond('Choose an option:', buttons=buttons)
# Handle button clicks
@client.on(events.CallbackQuery(data=b'opt1'))
async def option1_handler(event):
await event.answer('You chose Option 1!')
await event.edit('Option 1 selected ✓')
@client.on(events.CallbackQuery(data=b'opt2'))
async def option2_handler(event):
await event.answer('You chose Option 2!')
await event.edit('Option 2 selected ✓')
@client.on(events.CallbackQuery(data=b'help'))
async def help_handler(event):
await event.answer('This is help text!', alert=True)
# Pattern matching for callback data
@client.on(events.CallbackQuery(pattern=rb'user_(\d+)'))
async def user_callback(event):
user_id = int(event.pattern_match.group(1))
await event.answer(f'User ID: {user_id}')
await client.start()
await client.run_until_disconnected()async def event_lifecycle():
client = TelegramClient('session', api_id, api_hash)
# Handler with stop propagation
@client.on(events.NewMessage(pattern='/stop'))
async def stop_handler(event):
await event.respond('Stopping here!')
raise events.StopPropagation
@client.on(events.NewMessage)
async def catch_all_handler(event):
# This won't run for /stop messages
await event.respond('Catch-all handler')
# Dynamic handler addition
async def dynamic_handler(event):
await event.respond('Dynamic handler!')
# Add handler programmatically
client.add_event_handler(
dynamic_handler,
events.NewMessage(pattern='/dynamic')
)
# Remove handler after some time
await asyncio.sleep(60) # Wait 1 minute
removed = client.remove_event_handler(dynamic_handler)
print(f"Removed {removed} handlers")
# List all handlers
handlers = client.list_event_handlers()
print(f"Active handlers: {len(handlers)}")
await client.start()
await client.run_until_disconnected()async def media_events():
client = TelegramClient('session', api_id, api_hash)
# Handle grouped media (albums)
@client.on(events.Album)
async def album_handler(event):
await event.respond(
f'Received album with {len(event.messages)} items!'
)
# Process each item in album
for message in event.messages:
if message.photo:
print(f"Photo in album: {message.id}")
elif message.video:
print(f"Video in album: {message.id}")
# Handle message edits
@client.on(events.MessageEdited)
async def edit_handler(event):
await event.respond(f'Message {event.id} was edited!')
# Handle message deletions
@client.on(events.MessageDeleted)
async def delete_handler(event):
print(f'Messages deleted: {event.deleted_ids}')
# Note: Can't respond to deleted messages
# Handle read status
@client.on(events.MessageRead)
async def read_handler(event):
print(f'Messages read up to ID: {event.max_id}')
await client.start()
await client.run_until_disconnected()async def activity_monitoring():
client = TelegramClient('session', api_id, api_hash)
# Monitor typing indicators
@client.on(events.ChatAction)
async def typing_handler(event):
if hasattr(event.action, 'typing') and event.action.typing:
user = await event.get_user()
print(f'{user.first_name} is typing...')
# Monitor user status changes
@client.on(events.UserUpdate)
async def status_handler(event):
user = await event.get_user()
status = event.status
if hasattr(status, 'online') and status.online:
print(f'{user.first_name} came online')
elif hasattr(status, 'offline'):
print(f'{user.first_name} went offline')
await client.start()
await client.run_until_disconnected()from telethon.tl.types import UpdateNewMessage, UpdateMessageEdited
async def raw_events():
client = TelegramClient('session', api_id, api_hash)
# Handle specific raw update types
@client.on(events.Raw(types=[UpdateNewMessage]))
async def raw_message_handler(event):
update = event.update
message = update.message
print(f"Raw message: {message.message}")
# Handle all raw updates
@client.on(events.Raw)
async def all_updates_handler(event):
update_type = type(event.update).__name__
print(f"Received update: {update_type}")
await client.start()
await client.run_until_disconnected()from typing import Union, List, Optional, Callable, Pattern, Any
import re
from telethon.tl import types
from telethon import custom
EventHandler = Callable[[Any], None]
FilterFunction = Callable[[Any], bool]
PatternType = Union[str, Pattern[str], Pattern[bytes]]
EntityFilter = Union[int, str, List[Union[int, str]]]
class EventBuilder:
"""Base class for event filters"""
class EventCommon:
"""Common event attributes and methods"""
client: TelegramClient
original_update: types.Updates
async def get_chat(self):
"""Get the chat where the event occurred"""
async def get_sender(self):
"""Get the user who triggered the event"""
async def get_input_chat(self):
"""Get InputPeer for the chat"""
async def get_input_sender(self):
"""Get InputPeer for the sender"""Install with Tessl CLI
npx tessl i tessl/pypi-telethon