0
# Scheduled Tasks
1
2
Create and manage cron jobs for automated execution of assistants on threads or with dynamic thread creation. Supports timezone handling, webhook notifications, and flexible scheduling.
3
4
## Capabilities
5
6
### Cron Job Creation
7
8
Create scheduled tasks for automated assistant execution with flexible scheduling and configuration options.
9
10
```python { .api }
11
from collections.abc import Mapping
12
from typing import Any
13
from langgraph_sdk.schema import (
14
Cron, Config, Context, All, QueryParamTypes
15
)
16
17
# Via client.crons
18
async def create(
19
assistant_id: str,
20
*,
21
schedule: str,
22
input: Mapping[str, Any] | None = None,
23
metadata: Mapping[str, Any] | None = None,
24
config: Config | None = None,
25
context: Context | None = None,
26
checkpoint_during: bool | None = None,
27
interrupt_before: All | list[str] | None = None,
28
interrupt_after: All | list[str] | None = None,
29
webhook: str | None = None,
30
webhook_mode: str | None = None,
31
headers: Mapping[str, str] | None = None,
32
params: QueryParamTypes | None = None,
33
) -> Cron:
34
"""
35
Create a cron job that will create a new thread for each run.
36
37
Args:
38
assistant_id: The assistant ID or graph name to cron.
39
schedule: The schedule to run the assistant on.
40
input: The input to the assistant.
41
metadata: The metadata to add to the runs.
42
config: The config to use for the runs.
43
context: The context to add to the runs.
44
checkpoint_during: Whether to checkpoint during the run.
45
interrupt_before: Nodes to interrupt immediately before they run.
46
interrupt_after: Nodes to interrupt immediately after they run.
47
webhook: Webhook to call after the run is done.
48
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
49
headers: Optional custom headers to include with the request.
50
params: Optional query parameters to include with the request.
51
52
Returns:
53
Cron: The created cron job.
54
"""
55
56
async def create_for_thread(
57
thread_id: str,
58
assistant_id: str,
59
*,
60
schedule: str,
61
input: Mapping[str, Any] | None = None,
62
metadata: Mapping[str, Any] | None = None,
63
config: Config | None = None,
64
context: Context | None = None,
65
checkpoint_during: bool | None = None,
66
interrupt_before: All | list[str] | None = None,
67
interrupt_after: All | list[str] | None = None,
68
webhook: str | None = None,
69
webhook_mode: str | None = None,
70
headers: Mapping[str, str] | None = None,
71
params: QueryParamTypes | None = None,
72
) -> Cron:
73
"""
74
Create a cron job that will run on a specific thread.
75
76
Args:
77
thread_id: The thread ID to cron.
78
assistant_id: The assistant ID or graph name to cron.
79
schedule: The schedule to run the assistant on.
80
input: The input to the assistant.
81
metadata: The metadata to add to the runs.
82
config: The config to use for the runs.
83
context: The context to add to the runs.
84
checkpoint_during: Whether to checkpoint during the run.
85
interrupt_before: Nodes to interrupt immediately before they run.
86
interrupt_after: Nodes to interrupt immediately after they run.
87
webhook: Webhook to call after the run is done.
88
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
89
headers: Optional custom headers to include with the request.
90
params: Optional query parameters to include with the request.
91
92
Returns:
93
Cron: The created cron job.
94
"""
95
```
96
97
### Cron Job Management
98
99
Search, list, count, and delete scheduled tasks with filtering capabilities.
100
101
```python { .api }
102
from langgraph_sdk.schema import (
103
CronSelectField, CronSortBy, SortOrder, QueryParamTypes
104
)
105
106
async def search(
107
*,
108
assistant_id: str | None = None,
109
thread_id: str | None = None,
110
limit: int = 10,
111
offset: int = 0,
112
sort_by: CronSortBy | None = None,
113
sort_order: SortOrder | None = None,
114
select: list[CronSelectField] | None = None,
115
headers: Mapping[str, str] | None = None,
116
params: QueryParamTypes | None = None,
117
) -> list[Cron]:
118
"""
119
List cron jobs.
120
121
Args:
122
assistant_id: Assistant ID to filter by.
123
thread_id: Thread ID to filter by.
124
limit: Limit the number of cron jobs to return.
125
offset: Offset to start from.
126
sort_by: Field to sort by.
127
sort_order: Order to sort by.
128
select: Fields to include in the response.
129
headers: Optional custom headers to include with the request.
130
params: Optional query parameters to include with the request.
131
132
Returns:
133
list[Cron]: List of cron jobs.
134
"""
135
136
async def count(
137
*,
138
assistant_id: str | None = None,
139
thread_id: str | None = None,
140
headers: Mapping[str, str] | None = None,
141
params: QueryParamTypes | None = None,
142
) -> int:
143
"""
144
Count cron jobs matching filters.
145
146
Args:
147
assistant_id: Assistant ID to filter by.
148
thread_id: Thread ID to filter by.
149
headers: Optional custom headers to include with the request.
150
params: Optional query parameters to include with the request.
151
152
Returns:
153
int: Number of crons matching the criteria.
154
"""
155
156
async def delete(
157
cron_id: str,
158
*,
159
headers: Mapping[str, str] | None = None,
160
params: QueryParamTypes | None = None,
161
) -> None:
162
"""
163
Delete a cron job.
164
165
Args:
166
cron_id: The cron ID to delete.
167
headers: Optional custom headers to include with the request.
168
params: Optional query parameters to include with the request.
169
"""
170
```
171
172
## Types
173
174
```python { .api }
175
class Cron(TypedDict):
176
"""Scheduled task definition."""
177
cron_id: str
178
thread_id: str
179
assistant_id: str
180
schedule: str
181
timezone: str
182
created_at: str
183
updated_at: str
184
metadata: dict
185
config: Config
186
input: dict
187
next_run_time: str
188
last_run_time: str
189
enabled: bool
190
191
CronSelectField = Literal[
192
"cron_id", "thread_id", "assistant_id", "schedule",
193
"timezone", "created_at", "updated_at", "metadata",
194
"config", "input", "next_run_time", "last_run_time", "enabled"
195
]
196
197
CronSortBy = Literal["created_at", "updated_at", "next_run_time", "last_run_time"]
198
```
199
200
## Usage Examples
201
202
### Creating Scheduled Tasks
203
204
```python
205
# Daily report generation
206
daily_report = await client.crons.create(
207
assistant_id="report-assistant",
208
schedule="0 9 * * *", # 9 AM daily
209
input={"report_type": "daily", "email_list": ["admin@company.com"]},
210
config={"timeout": 600},
211
metadata={"purpose": "daily_report", "owner": "operations"},
212
timezone="America/New_York",
213
webhook="https://myapp.com/webhooks/report-complete"
214
)
215
216
# Hourly data processing for specific thread
217
hourly_processor = await client.crons.create_for_thread(
218
thread_id="data-thread-123",
219
assistant_id="data-processor",
220
schedule="0 * * * *", # Every hour
221
input={"source": "api", "format": "json"},
222
config={"batch_size": 1000},
223
metadata={"environment": "production"}
224
)
225
226
# Weekly cleanup task
227
cleanup_job = await client.crons.create(
228
assistant_id="cleanup-assistant",
229
schedule="0 2 * * 0", # 2 AM on Sundays
230
input={"retention_days": 30, "dry_run": False},
231
timezone="UTC",
232
on_completion="keep" # Keep run records for audit
233
)
234
```
235
236
### Managing Scheduled Tasks
237
238
```python
239
# List all cron jobs for an assistant
240
assistant_jobs = await client.crons.search(
241
assistant_id="report-assistant",
242
limit=50
243
)
244
245
# Find jobs for a specific thread
246
thread_jobs = await client.crons.search(
247
thread_id="data-thread-123"
248
)
249
250
# Get total job count
251
total_jobs = await client.crons.count()
252
253
# Delete a scheduled task
254
await client.crons.delete("cron-456")
255
```
256
257
### Schedule Patterns
258
259
```python
260
# Common cron schedule patterns
261
262
# Every minute
263
await client.crons.create(
264
assistant_id="monitoring-assistant",
265
schedule="* * * * *"
266
)
267
268
# Every 15 minutes
269
await client.crons.create(
270
assistant_id="health-check-assistant",
271
schedule="*/15 * * * *"
272
)
273
274
# Daily at 3:30 AM
275
await client.crons.create(
276
assistant_id="backup-assistant",
277
schedule="30 3 * * *"
278
)
279
280
# Weekly on Monday at 9 AM
281
await client.crons.create(
282
assistant_id="weekly-report-assistant",
283
schedule="0 9 * * 1"
284
)
285
286
# Monthly on the 1st at midnight
287
await client.crons.create(
288
assistant_id="monthly-billing-assistant",
289
schedule="0 0 1 * *"
290
)
291
292
# Weekdays at 6 PM
293
await client.crons.create(
294
assistant_id="daily-summary-assistant",
295
schedule="0 18 * * 1-5"
296
)
297
```
298
299
### Timezone Handling
300
301
```python
302
# Schedule with specific timezone
303
tokyo_job = await client.crons.create(
304
assistant_id="tokyo-report-assistant",
305
schedule="0 9 * * *", # 9 AM Tokyo time
306
timezone="Asia/Tokyo",
307
input={"region": "APAC"}
308
)
309
310
# Multiple timezone jobs for global operation
311
timezones = ["UTC", "America/New_York", "Europe/London", "Asia/Tokyo"]
312
for tz in timezones:
313
await client.crons.create(
314
assistant_id="regional-assistant",
315
schedule="0 8 * * *", # 8 AM local time
316
timezone=tz,
317
input={"timezone": tz, "region": tz.split("/")[-1]}
318
)
319
```
320
321
### Webhook Integration
322
323
```python
324
# Job with webhook notifications
325
webhook_job = await client.crons.create(
326
assistant_id="critical-task-assistant",
327
schedule="0 */6 * * *", # Every 6 hours
328
input={"task": "system_health_check"},
329
webhook="https://monitoring.company.com/webhooks/cron-complete",
330
metadata={"priority": "critical", "alert_on_failure": True}
331
)
332
333
# Webhook payload will include:
334
# {
335
# "cron_id": "cron-123",
336
# "run_id": "run-456",
337
# "status": "success|error|timeout",
338
# "metadata": {...},
339
# "completed_at": "2023-12-01T12:00:00Z"
340
# }
341
```
342
343
### Error Handling and Monitoring
344
345
```python
346
# Create jobs with error handling configuration
347
robust_job = await client.crons.create(
348
assistant_id="data-sync-assistant",
349
schedule="0 2 * * *",
350
input={"source": "external_api", "retries": 3},
351
config={"timeout": 1800, "retry_policy": "exponential_backoff"},
352
multitask_strategy="enqueue", # Queue if previous run still active
353
webhook="https://alerts.company.com/cron-status"
354
)
355
356
# Monitor scheduled tasks
357
all_jobs = await client.crons.search(limit=100)
358
for job in all_jobs:
359
next_run = job.get("next_run_time")
360
last_run = job.get("last_run_time")
361
enabled = job.get("enabled", True)
362
363
print(f"Job {job['cron_id']}: next={next_run}, last={last_run}, enabled={enabled}")
364
365
if not enabled:
366
print(f"Warning: Job {job['cron_id']} is disabled")
367
```