0
# Application Framework
1
2
The Application framework provides a high-level interface for building and managing Telegram bots. It handles update processing, handler management, job scheduling, persistence, and bot lifecycle operations.
3
4
## Capabilities
5
6
### Application Builder
7
8
Create and configure Application instances with fluent builder pattern.
9
10
```python { .api }
11
class ApplicationBuilder:
12
def token(self, token: str) -> 'ApplicationBuilder': ...
13
def bot(self, bot: Bot) -> 'ApplicationBuilder': ...
14
def updater(self, updater: Updater) -> 'ApplicationBuilder': ...
15
def update_processor(self, update_processor: BaseUpdateProcessor) -> 'ApplicationBuilder': ...
16
def job_queue(self, job_queue: JobQueue) -> 'ApplicationBuilder': ...
17
def persistence(self, persistence: BasePersistence) -> 'ApplicationBuilder': ...
18
def defaults(self, defaults: Defaults) -> 'ApplicationBuilder': ...
19
def context_types(self, context_types: ContextTypes) -> 'ApplicationBuilder': ...
20
def rate_limiter(self, rate_limiter: BaseRateLimiter) -> 'ApplicationBuilder': ...
21
def post_init(self, post_init: callable) -> 'ApplicationBuilder': ...
22
def post_shutdown(self, post_shutdown: callable) -> 'ApplicationBuilder': ...
23
def build(self) -> 'Application': ...
24
```
25
26
Usage example:
27
28
```python
29
from telegram.ext import ApplicationBuilder, PicklePersistence
30
31
# Basic application
32
application = ApplicationBuilder().token("YOUR_BOT_TOKEN").build()
33
34
# Advanced configuration
35
persistence = PicklePersistence(filepath="bot_data.pkl")
36
application = (
37
ApplicationBuilder()
38
.token("YOUR_BOT_TOKEN")
39
.persistence(persistence)
40
.concurrent_updates(True)
41
.build()
42
)
43
```
44
45
### Application Management
46
47
Main Application class for managing bot operations, handlers, and lifecycle.
48
49
```python { .api }
50
class Application:
51
bot: Bot
52
updater: Updater
53
job_queue: JobQueue
54
persistence: BasePersistence | None
55
56
def add_handler(self, handler: BaseHandler, group: int = 0) -> None: ...
57
def add_handlers(self, handlers: list[BaseHandler], group: int = 0) -> None: ...
58
def remove_handler(self, handler: BaseHandler, group: int = 0) -> bool: ...
59
60
def add_error_handler(self, callback: callable) -> None: ...
61
def remove_error_handler(self, callback: callable) -> None: ...
62
63
async def process_update(self, update: Update) -> None: ...
64
65
def run_polling(
66
self,
67
poll_interval: float = 0.0,
68
timeout: int = 10,
69
bootstrap_retries: int = -1,
70
read_timeout: float = 2.0,
71
write_timeout: float = 2.0,
72
connect_timeout: float = 2.0,
73
pool_timeout: float = 1.0,
74
allowed_updates: list[str] = None,
75
drop_pending_updates: bool = None,
76
close_loop: bool = True
77
) -> None: ...
78
79
def run_webhook(
80
self,
81
listen: str = "127.0.0.1",
82
port: int = 80,
83
url_path: str = "",
84
cert: str = None,
85
key: str = None,
86
bootstrap_retries: int = -1,
87
webhook_url: str = None,
88
allowed_updates: list[str] = None,
89
drop_pending_updates: bool = None,
90
ip_address: str = None,
91
max_connections: int = 40,
92
secret_token: str = None,
93
close_loop: bool = True
94
) -> None: ...
95
96
async def initialize(self) -> None: ...
97
async def start(self) -> None: ...
98
async def stop(self) -> None: ...
99
async def shutdown(self) -> None: ...
100
101
async def __aenter__(self) -> 'Application': ...
102
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
103
```
104
105
Usage example:
106
107
```python
108
from telegram.ext import Application, CommandHandler
109
110
async def start(update, context):
111
await update.message.reply_text("Hello!")
112
113
# Create and run application
114
application = Application.builder().token("YOUR_BOT_TOKEN").build()
115
application.add_handler(CommandHandler("start", start))
116
117
# Run with polling
118
application.run_polling()
119
120
# Or use as async context manager
121
async def main():
122
async with application:
123
await application.start()
124
await application.updater.start_polling()
125
# Keep running...
126
await application.updater.stop()
127
await application.stop()
128
```
129
130
### Extended Bot
131
132
Enhanced Bot class with additional features and integration with Application framework.
133
134
```python { .api }
135
class ExtBot(Bot):
136
def __init__(
137
self,
138
token: str,
139
defaults: Defaults = None,
140
callback_data_cache: CallbackDataCache = None,
141
arbitrary_callback_data: bool = False,
142
**kwargs
143
): ...
144
145
async def get_business_connection(self, business_connection_id: str) -> BusinessConnection: ...
146
147
async def send_paid_media(
148
self,
149
chat_id: int | str,
150
star_count: int,
151
media: list[InputPaidMedia],
152
**kwargs
153
) -> Message: ...
154
155
async def copy_messages(
156
self,
157
chat_id: int | str,
158
from_chat_id: int | str,
159
message_ids: list[int],
160
**kwargs
161
) -> list[MessageId]: ...
162
163
async def forward_messages(
164
self,
165
chat_id: int | str,
166
from_chat_id: int | str,
167
message_ids: list[int],
168
**kwargs
169
) -> list[MessageId]: ...
170
```
171
172
### Updater and Update Processing
173
174
Manage incoming updates from Telegram using polling or webhooks.
175
176
```python { .api }
177
class Updater:
178
bot: Bot
179
update_queue: asyncio.Queue
180
181
def __init__(self, bot: Bot, update_queue: asyncio.Queue): ...
182
183
async def start_polling(
184
self,
185
poll_interval: float = 0.0,
186
timeout: int = 10,
187
bootstrap_retries: int = -1,
188
read_timeout: float = 2.0,
189
write_timeout: float = 2.0,
190
connect_timeout: float = 2.0,
191
pool_timeout: float = 1.0,
192
allowed_updates: list[str] = None,
193
drop_pending_updates: bool = None
194
) -> asyncio.Queue: ...
195
196
async def start_webhook(
197
self,
198
listen: str = "127.0.0.1",
199
port: int = 80,
200
url_path: str = "",
201
cert: str = None,
202
key: str = None,
203
bootstrap_retries: int = -1,
204
webhook_url: str = None,
205
allowed_updates: list[str] = None,
206
drop_pending_updates: bool = None,
207
ip_address: str = None,
208
max_connections: int = 40,
209
secret_token: str = None
210
) -> asyncio.Queue: ...
211
212
async def stop(self) -> None: ...
213
async def shutdown(self) -> None: ...
214
215
class BaseUpdateProcessor:
216
async def process_update(self, update: Update, application: 'Application') -> None: ...
217
218
class SimpleUpdateProcessor(BaseUpdateProcessor):
219
def __init__(self, max_concurrent_updates: int = 256): ...
220
```
221
222
### Context Types
223
224
Configure custom context types for handlers and callbacks.
225
226
```python { .api }
227
class ContextTypes:
228
def __init__(
229
self,
230
context: type = None,
231
bot_data: type = None,
232
chat_data: type = None,
233
user_data: type = None
234
): ...
235
236
context: type
237
bot_data: type
238
chat_data: type
239
user_data: type
240
```
241
242
### Defaults Configuration
243
244
Set default values for Bot methods and Application behavior.
245
246
```python { .api }
247
class Defaults:
248
def __init__(
249
self,
250
parse_mode: str = None,
251
disable_notification: bool = None,
252
disable_web_page_preview: bool = None,
253
allow_sending_without_reply: bool = None,
254
protect_content: bool = None,
255
tzinfo: datetime.tzinfo = None,
256
block: bool = None,
257
quote: bool = None
258
): ...
259
```
260
261
Usage example:
262
263
```python
264
from telegram.ext import Defaults
265
from telegram.constants import ParseMode
266
267
defaults = Defaults(
268
parse_mode=ParseMode.HTML,
269
disable_notification=True,
270
tzinfo=datetime.timezone.utc
271
)
272
273
application = (
274
ApplicationBuilder()
275
.token("YOUR_BOT_TOKEN")
276
.defaults(defaults)
277
.build()
278
)
279
```
280
281
### Application Exception Handling
282
283
Built-in exception handling with custom error handlers.
284
285
```python { .api }
286
class ApplicationHandlerStop(Exception):
287
def __init__(self, state: object = None): ...
288
289
state: object
290
```
291
292
Usage example:
293
294
```python
295
from telegram.error import TelegramError
296
from telegram.ext import ApplicationHandlerStop
297
298
async def error_handler(update, context):
299
"""Log errors and handle them gracefully."""
300
print(f"Update {update} caused error {context.error}")
301
302
# Stop processing further handlers for this update
303
raise ApplicationHandlerStop
304
305
async def command_handler(update, context):
306
try:
307
# Some operation that might fail
308
result = await risky_operation()
309
await update.message.reply_text(f"Success: {result}")
310
except ValueError as e:
311
await update.message.reply_text("Invalid input!")
312
raise ApplicationHandlerStop # Stop processing
313
except Exception as e:
314
await update.message.reply_text("Something went wrong!")
315
raise # Let error handler deal with it
316
317
application.add_error_handler(error_handler)
318
```
319
320
## Types
321
322
```python { .api }
323
from typing import Any, Callable, Dict, List, Optional, Union
324
325
CallbackType = Callable[[Update, CallbackContext], object]
326
ErrorHandlerType = Callable[[Update, CallbackContext], None]
327
JobCallbackType = Callable[[CallbackContext], object]
328
329
class CallbackContext:
330
bot: Bot
331
update_queue: asyncio.Queue
332
job_queue: JobQueue
333
user_data: dict
334
chat_data: dict
335
bot_data: dict
336
args: list[str]
337
error: Exception | None
338
job: 'Job' | None
339
340
async def refresh_data(self) -> None: ...
341
def drop_callback_data(self, callback_query: CallbackQuery) -> None: ...
342
```