0
# Functional API
1
2
The Functional API provides a decorator-based approach to building LangGraph workflows using `@entrypoint` and `@task` decorators. This enables more natural Python code with automatic parallelization, while maintaining all the benefits of LangGraph's stateful execution model.
3
4
**Important:** Functions decorated with `@entrypoint()` return a `Pregel` (compiled graph) object, not a callable function. Use `.invoke()`, `.stream()`, or other Pregel methods to execute the workflow.
5
6
## Imports
7
8
```python
9
from langgraph.func import entrypoint, task
10
```
11
12
## Capabilities
13
14
### Task Decorator
15
16
Defines a LangGraph task that can be called from within entrypoints or StateGraphs. Tasks return futures that enable parallel execution.
17
18
```python { .api }
19
def task(
20
func=None,
21
*,
22
name=None,
23
retry_policy=None,
24
cache_policy=None
25
):
26
"""
27
Decorator to define a LangGraph task.
28
29
Tasks can be called from within entrypoint functions or StateGraph nodes.
30
When called, they return a future (SyncAsyncFuture) that enables parallel
31
execution. The actual execution happens when the future is awaited or
32
when the containing function returns.
33
34
Parameters:
35
func: Optional[Callable] - Function to wrap as a task
36
name: Optional[str] - Custom name for the task (default: func.__name__)
37
retry_policy: Optional[RetryPolicy | Sequence[RetryPolicy]]
38
Configuration for retrying the task on failure
39
cache_policy: Optional[CachePolicy]
40
Configuration for caching task results
41
42
Returns:
43
_TaskFunction - Wrapped task function that returns futures when called
44
45
Deprecated Parameters:
46
retry: Deprecated in v0.5.0, use retry_policy instead
47
48
Usage:
49
@task
50
def process_item(item: dict) -> dict:
51
return {"result": item["value"] * 2}
52
53
# With parameters
54
@task(name="custom_task", retry_policy=RetryPolicy(max_attempts=5))
55
def retry_task(data: str) -> str:
56
return data.upper()
57
"""
58
```
59
60
### TaskFunction Class
61
62
Wrapper class for task functions that manages execution and caching.
63
64
```python { .api }
65
class _TaskFunction:
66
"""
67
Wrapper for task functions created by the @task decorator.
68
69
Type Parameters:
70
P: Parameter types of the wrapped function
71
T: Return type of the wrapped function
72
"""
73
74
def __call__(self, *args, **kwargs):
75
"""
76
Execute the task.
77
78
Returns:
79
SyncAsyncFuture[T] - Future representing the pending task execution.
80
Can be awaited (async) or accessed directly (sync).
81
"""
82
83
def clear_cache(self, cache):
84
"""
85
Clear cache for this task.
86
87
Parameters:
88
cache: Cache backend to clear
89
90
Returns:
91
None
92
"""
93
94
def aclear_cache(self, cache):
95
"""
96
Asynchronously clear cache for this task.
97
98
Parameters:
99
cache: Cache backend to clear
100
101
Returns:
102
Coroutine that clears the cache
103
"""
104
```
105
106
### Entrypoint Decorator
107
108
Defines a LangGraph workflow using the functional API. The decorated function becomes a Pregel graph with automatic state management and task parallelization.
109
110
```python { .api }
111
class entrypoint:
112
"""
113
Decorator to define a LangGraph workflow using the functional API.
114
115
The decorated function becomes a Pregel graph. Tasks called within the
116
function are automatically parallelized. The function can access runtime
117
context and manage state implicitly.
118
119
Parameters:
120
checkpointer: Optional[BaseCheckpointSaver]
121
Checkpointer for persisting state across runs
122
store: Optional[BaseStore]
123
Store for persistent cross-thread memory
124
cache: Optional cache backend for task results
125
context_schema: Optional type for run-scoped context
126
cache_policy: Optional[CachePolicy]
127
Default cache policy for all tasks
128
retry_policy: Optional[RetryPolicy | Sequence[RetryPolicy]]
129
Default retry policy for all tasks
130
131
Deprecated Parameters:
132
config_schema: Deprecated in v1.0.0, use context_schema instead
133
retry: Deprecated in v0.5.0, use retry_policy instead
134
135
Returns:
136
Callable that wraps the function as a Pregel graph
137
"""
138
139
def __init__(
140
self,
141
checkpointer=None,
142
store=None,
143
cache=None,
144
context_schema=None,
145
cache_policy=None,
146
retry_policy=None
147
): ...
148
149
def __call__(self, func):
150
"""
151
Wrap the function as a Pregel graph.
152
153
Parameters:
154
func: Callable - Function to wrap as a workflow
155
156
Returns:
157
Pregel - Compiled graph object with methods like invoke(), stream(),
158
get_state(), etc. The decorated function is NOT directly callable;
159
you must use graph methods to execute it.
160
161
Example:
162
@entrypoint()
163
def my_workflow(input: dict) -> dict:
164
return {"result": input["value"] * 2}
165
166
# Use .invoke() to execute
167
result = my_workflow.invoke({"value": 5}) # Returns {"result": 10}
168
"""
169
```
170
171
### Entrypoint Final Class
172
173
Data class for returning different values from what gets saved to state.
174
175
```python { .api }
176
class entrypoint.final:
177
"""
178
Return value wrapper for entrypoint functions.
179
180
Allows returning a different value to the caller than what gets
181
saved to the graph state/checkpoint.
182
183
Type Parameters:
184
R: Type of the return value
185
S: Type of the saved value
186
187
Usage:
188
@entrypoint()
189
def workflow(input: dict) -> entrypoint.final:
190
# Do work...
191
return entrypoint.final(
192
value="Success!",
193
save={"state": "saved"}
194
)
195
"""
196
197
value: Any # Value returned to caller
198
save: Any # Value saved to state
199
```
200
201
## Usage Examples
202
203
### Basic Task Usage
204
205
```python
206
from langgraph.func import task
207
208
@task
209
def process_data(data: dict) -> dict:
210
"""Process a single data item."""
211
return {
212
"id": data["id"],
213
"result": data["value"] * 2
214
}
215
216
@task
217
def aggregate_results(results: list[dict]) -> dict:
218
"""Aggregate processed results."""
219
total = sum(r["result"] for r in results)
220
return {"total": total, "count": len(results)}
221
```
222
223
### Basic Entrypoint
224
225
```python
226
from langgraph.func import entrypoint, task
227
228
@task
229
def process_item(item: str) -> str:
230
return item.upper()
231
232
@entrypoint()
233
def workflow(input: dict) -> dict:
234
"""Simple workflow that processes items."""
235
items = input["items"]
236
237
# Call tasks - returns futures
238
processed = [process_item(item) for item in items]
239
240
# Access results - automatically waits for completion
241
results = [p.result() for p in processed]
242
243
return {"processed": results}
244
245
# Use like any Pregel graph
246
result = workflow.invoke({"items": ["a", "b", "c"]})
247
# result == {"processed": ["A", "B", "C"]}
248
```
249
250
### Parallel Task Execution
251
252
```python
253
from langgraph.func import entrypoint, task
254
255
@task
256
def fetch_user(user_id: int) -> dict:
257
# Simulated API call
258
return {"id": user_id, "name": f"User {user_id}"}
259
260
@task
261
def fetch_orders(user_id: int) -> list[dict]:
262
# Simulated API call
263
return [{"order_id": i, "user_id": user_id} for i in range(3)]
264
265
@entrypoint()
266
def get_user_data(input: dict) -> dict:
267
"""Fetch user and orders in parallel."""
268
user_id = input["user_id"]
269
270
# Both tasks execute in parallel
271
user_future = fetch_user(user_id)
272
orders_future = fetch_orders(user_id)
273
274
# Wait for both to complete
275
user = user_future.result()
276
orders = orders_future.result()
277
278
return {
279
"user": user,
280
"orders": orders
281
}
282
283
result = get_user_data.invoke({"user_id": 123})
284
```
285
286
### With Retry and Cache Policies
287
288
```python
289
from langgraph.func import entrypoint, task
290
from langgraph.types import RetryPolicy, CachePolicy
291
292
# Task with retry policy
293
@task(
294
retry_policy=RetryPolicy(
295
max_attempts=3,
296
initial_interval=1.0,
297
backoff_factor=2.0,
298
retry_on=Exception
299
)
300
)
301
def unreliable_api_call(data: dict) -> dict:
302
"""Task that might fail and should be retried."""
303
# API call that might fail
304
return {"result": data["value"]}
305
306
# Task with cache policy
307
@task(
308
cache_policy=CachePolicy(
309
key_func=lambda data: data["id"],
310
ttl=3600 # Cache for 1 hour
311
)
312
)
313
def expensive_computation(data: dict) -> dict:
314
"""Expensive task that should be cached."""
315
# Complex computation
316
return {"computed": data["value"] ** 2}
317
318
@entrypoint(
319
retry_policy=RetryPolicy(max_attempts=2), # Default for all tasks
320
cache_policy=CachePolicy(key_func=str) # Default cache
321
)
322
def workflow(input: dict) -> dict:
323
"""Workflow with retry and cache policies."""
324
# Tasks inherit default policies but can override
325
result1 = unreliable_api_call(input)
326
result2 = expensive_computation(input)
327
328
return {
329
"api_result": result1.result(),
330
"computed": result2.result()
331
}
332
```
333
334
### With Checkpointing
335
336
```python
337
from langgraph.func import entrypoint, task
338
from langgraph.checkpoint.memory import MemorySaver
339
340
@task
341
def step1(data: dict) -> dict:
342
return {"step1_done": True, "value": data["value"] + 1}
343
344
@task
345
def step2(data: dict) -> dict:
346
return {"step2_done": True, "value": data["value"] * 2}
347
348
# Create checkpointer for persistence
349
checkpointer = MemorySaver()
350
351
@entrypoint(checkpointer=checkpointer)
352
def workflow(input: dict) -> dict:
353
"""Workflow with checkpointing."""
354
result1 = step1(input).result()
355
result2 = step2(result1).result()
356
357
return result2
358
359
# Use with thread_id for persistence
360
config = {"configurable": {"thread_id": "session-1"}}
361
result = workflow.invoke({"value": 5}, config)
362
363
# Can resume from checkpoint
364
state = workflow.get_state(config)
365
```
366
367
### Using Entrypoint Final
368
369
```python
370
from langgraph.func import entrypoint, task
371
372
@task
373
def process(data: dict) -> dict:
374
return {"processed": data["value"] * 2}
375
376
@entrypoint()
377
def workflow(input: dict) -> entrypoint.final:
378
"""
379
Workflow that returns different value than what's saved.
380
"""
381
result = process(input).result()
382
383
# Return user-friendly message, but save full state
384
return entrypoint.final(
385
value={"message": "Processing complete!"},
386
save={"full_result": result, "timestamp": "2024-01-01"}
387
)
388
389
# Caller receives the value
390
result = workflow.invoke({"value": 10})
391
# result == {"message": "Processing complete!"}
392
393
# But checkpoint contains save
394
config = {"configurable": {"thread_id": "1"}}
395
result = workflow.invoke({"value": 10}, config)
396
state = workflow.get_state(config)
397
# state.values == {"full_result": ..., "timestamp": "2024-01-01"}
398
```
399
400
### Async Entrypoint
401
402
```python
403
from langgraph.func import entrypoint, task
404
405
@task
406
async def async_fetch(url: str) -> dict:
407
"""Async task."""
408
# Async operation
409
return {"url": url, "data": "fetched"}
410
411
@entrypoint()
412
async def async_workflow(input: dict) -> dict:
413
"""Async workflow."""
414
urls = input["urls"]
415
416
# Launch parallel async tasks
417
futures = [async_fetch(url) for url in urls]
418
419
# Await results
420
results = [await f for f in futures]
421
422
return {"results": results}
423
424
# Use async invoke
425
result = await async_workflow.ainvoke({"urls": ["url1", "url2"]})
426
```
427
428
### Accessing Runtime Context
429
430
```python
431
from langgraph.func import entrypoint, task
432
from langgraph.runtime import get_runtime
433
434
@task
435
def process_with_context(data: dict) -> dict:
436
"""Task that accesses runtime context."""
437
runtime = get_runtime()
438
439
# Access context, store, stream_writer
440
if runtime.context:
441
# Use context data
442
pass
443
444
if runtime.store:
445
# Access store
446
pass
447
448
return {"processed": data["value"]}
449
450
@entrypoint()
451
def workflow(input: dict) -> dict:
452
"""Workflow with runtime context."""
453
runtime = get_runtime()
454
455
# Access previous return value (for multi-step flows)
456
if runtime.previous:
457
# Use previous value
458
pass
459
460
result = process_with_context(input).result()
461
return result
462
```
463
464
## Notes
465
466
- Tasks execute lazily - actual execution happens when results are accessed or when the entrypoint function returns
467
- Tasks called within an entrypoint automatically run in parallel when possible
468
- Both sync and async variants are supported
469
- Tasks can call other tasks, creating nested parallelism
470
- Entrypoints inherit all Pregel methods (invoke, stream, get_state, etc.)
471
- The functional API provides automatic state management - no need to explicitly define state schema
472