or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdconfiguration.mderrors.mdfunctional-api.mdindex.mdmessage-graph.mdpregel.mdstate-graph.mdtypes-primitives.md

functional-api.mddocs/

0

# Functional API

1

2

The Functional API provides a decorator-based approach to building LangGraph workflows using `@entrypoint` and `@task` decorators. This enables more natural Python code with automatic parallelization, while maintaining all the benefits of LangGraph's stateful execution model.

3

4

**Important:** Functions decorated with `@entrypoint()` return a `Pregel` (compiled graph) object, not a callable function. Use `.invoke()`, `.stream()`, or other Pregel methods to execute the workflow.

5

6

## Imports

7

8

```python

9

from langgraph.func import entrypoint, task

10

```

11

12

## Capabilities

13

14

### Task Decorator

15

16

Defines a LangGraph task that can be called from within entrypoints or StateGraphs. Tasks return futures that enable parallel execution.

17

18

```python { .api }

19

def task(

20

func=None,

21

*,

22

name=None,

23

retry_policy=None,

24

cache_policy=None

25

):

26

"""

27

Decorator to define a LangGraph task.

28

29

Tasks can be called from within entrypoint functions or StateGraph nodes.

30

When called, they return a future (SyncAsyncFuture) that enables parallel

31

execution. The actual execution happens when the future is awaited or

32

when the containing function returns.

33

34

Parameters:

35

func: Optional[Callable] - Function to wrap as a task

36

name: Optional[str] - Custom name for the task (default: func.__name__)

37

retry_policy: Optional[RetryPolicy | Sequence[RetryPolicy]]

38

Configuration for retrying the task on failure

39

cache_policy: Optional[CachePolicy]

40

Configuration for caching task results

41

42

Returns:

43

_TaskFunction - Wrapped task function that returns futures when called

44

45

Deprecated Parameters:

46

retry: Deprecated in v0.5.0, use retry_policy instead

47

48

Usage:

49

@task

50

def process_item(item: dict) -> dict:

51

return {"result": item["value"] * 2}

52

53

# With parameters

54

@task(name="custom_task", retry_policy=RetryPolicy(max_attempts=5))

55

def retry_task(data: str) -> str:

56

return data.upper()

57

"""

58

```

59

60

### TaskFunction Class

61

62

Wrapper class for task functions that manages execution and caching.

63

64

```python { .api }

65

class _TaskFunction:

66

"""

67

Wrapper for task functions created by the @task decorator.

68

69

Type Parameters:

70

P: Parameter types of the wrapped function

71

T: Return type of the wrapped function

72

"""

73

74

def __call__(self, *args, **kwargs):

75

"""

76

Execute the task.

77

78

Returns:

79

SyncAsyncFuture[T] - Future representing the pending task execution.

80

Can be awaited (async) or accessed directly (sync).

81

"""

82

83

def clear_cache(self, cache):

84

"""

85

Clear cache for this task.

86

87

Parameters:

88

cache: Cache backend to clear

89

90

Returns:

91

None

92

"""

93

94

def aclear_cache(self, cache):

95

"""

96

Asynchronously clear cache for this task.

97

98

Parameters:

99

cache: Cache backend to clear

100

101

Returns:

102

Coroutine that clears the cache

103

"""

104

```

105

106

### Entrypoint Decorator

107

108

Defines a LangGraph workflow using the functional API. The decorated function becomes a Pregel graph with automatic state management and task parallelization.

109

110

```python { .api }

111

class entrypoint:

112

"""

113

Decorator to define a LangGraph workflow using the functional API.

114

115

The decorated function becomes a Pregel graph. Tasks called within the

116

function are automatically parallelized. The function can access runtime

117

context and manage state implicitly.

118

119

Parameters:

120

checkpointer: Optional[BaseCheckpointSaver]

121

Checkpointer for persisting state across runs

122

store: Optional[BaseStore]

123

Store for persistent cross-thread memory

124

cache: Optional cache backend for task results

125

context_schema: Optional type for run-scoped context

126

cache_policy: Optional[CachePolicy]

127

Default cache policy for all tasks

128

retry_policy: Optional[RetryPolicy | Sequence[RetryPolicy]]

129

Default retry policy for all tasks

130

131

Deprecated Parameters:

132

config_schema: Deprecated in v1.0.0, use context_schema instead

133

retry: Deprecated in v0.5.0, use retry_policy instead

134

135

Returns:

136

Callable that wraps the function as a Pregel graph

137

"""

138

139

def __init__(

140

self,

141

checkpointer=None,

142

store=None,

143

cache=None,

144

context_schema=None,

145

cache_policy=None,

146

retry_policy=None

147

): ...

148

149

def __call__(self, func):

150

"""

151

Wrap the function as a Pregel graph.

152

153

Parameters:

154

func: Callable - Function to wrap as a workflow

155

156

Returns:

157

Pregel - Compiled graph object with methods like invoke(), stream(),

158

get_state(), etc. The decorated function is NOT directly callable;

159

you must use graph methods to execute it.

160

161

Example:

162

@entrypoint()

163

def my_workflow(input: dict) -> dict:

164

return {"result": input["value"] * 2}

165

166

# Use .invoke() to execute

167

result = my_workflow.invoke({"value": 5}) # Returns {"result": 10}

168

"""

169

```

170

171

### Entrypoint Final Class

172

173

Data class for returning different values from what gets saved to state.

174

175

```python { .api }

176

class entrypoint.final:

177

"""

178

Return value wrapper for entrypoint functions.

179

180

Allows returning a different value to the caller than what gets

181

saved to the graph state/checkpoint.

182

183

Type Parameters:

184

R: Type of the return value

185

S: Type of the saved value

186

187

Usage:

188

@entrypoint()

189

def workflow(input: dict) -> entrypoint.final:

190

# Do work...

191

return entrypoint.final(

192

value="Success!",

193

save={"state": "saved"}

194

)

195

"""

196

197

value: Any # Value returned to caller

198

save: Any # Value saved to state

199

```

200

201

## Usage Examples

202

203

### Basic Task Usage

204

205

```python

206

from langgraph.func import task

207

208

@task

209

def process_data(data: dict) -> dict:

210

"""Process a single data item."""

211

return {

212

"id": data["id"],

213

"result": data["value"] * 2

214

}

215

216

@task

217

def aggregate_results(results: list[dict]) -> dict:

218

"""Aggregate processed results."""

219

total = sum(r["result"] for r in results)

220

return {"total": total, "count": len(results)}

221

```

222

223

### Basic Entrypoint

224

225

```python

226

from langgraph.func import entrypoint, task

227

228

@task

229

def process_item(item: str) -> str:

230

return item.upper()

231

232

@entrypoint()

233

def workflow(input: dict) -> dict:

234

"""Simple workflow that processes items."""

235

items = input["items"]

236

237

# Call tasks - returns futures

238

processed = [process_item(item) for item in items]

239

240

# Access results - automatically waits for completion

241

results = [p.result() for p in processed]

242

243

return {"processed": results}

244

245

# Use like any Pregel graph

246

result = workflow.invoke({"items": ["a", "b", "c"]})

247

# result == {"processed": ["A", "B", "C"]}

248

```

249

250

### Parallel Task Execution

251

252

```python

253

from langgraph.func import entrypoint, task

254

255

@task

256

def fetch_user(user_id: int) -> dict:

257

# Simulated API call

258

return {"id": user_id, "name": f"User {user_id}"}

259

260

@task

261

def fetch_orders(user_id: int) -> list[dict]:

262

# Simulated API call

263

return [{"order_id": i, "user_id": user_id} for i in range(3)]

264

265

@entrypoint()

266

def get_user_data(input: dict) -> dict:

267

"""Fetch user and orders in parallel."""

268

user_id = input["user_id"]

269

270

# Both tasks execute in parallel

271

user_future = fetch_user(user_id)

272

orders_future = fetch_orders(user_id)

273

274

# Wait for both to complete

275

user = user_future.result()

276

orders = orders_future.result()

277

278

return {

279

"user": user,

280

"orders": orders

281

}

282

283

result = get_user_data.invoke({"user_id": 123})

284

```

285

286

### With Retry and Cache Policies

287

288

```python

289

from langgraph.func import entrypoint, task

290

from langgraph.types import RetryPolicy, CachePolicy

291

292

# Task with retry policy

293

@task(

294

retry_policy=RetryPolicy(

295

max_attempts=3,

296

initial_interval=1.0,

297

backoff_factor=2.0,

298

retry_on=Exception

299

)

300

)

301

def unreliable_api_call(data: dict) -> dict:

302

"""Task that might fail and should be retried."""

303

# API call that might fail

304

return {"result": data["value"]}

305

306

# Task with cache policy

307

@task(

308

cache_policy=CachePolicy(

309

key_func=lambda data: data["id"],

310

ttl=3600 # Cache for 1 hour

311

)

312

)

313

def expensive_computation(data: dict) -> dict:

314

"""Expensive task that should be cached."""

315

# Complex computation

316

return {"computed": data["value"] ** 2}

317

318

@entrypoint(

319

retry_policy=RetryPolicy(max_attempts=2), # Default for all tasks

320

cache_policy=CachePolicy(key_func=str) # Default cache

321

)

322

def workflow(input: dict) -> dict:

323

"""Workflow with retry and cache policies."""

324

# Tasks inherit default policies but can override

325

result1 = unreliable_api_call(input)

326

result2 = expensive_computation(input)

327

328

return {

329

"api_result": result1.result(),

330

"computed": result2.result()

331

}

332

```

333

334

### With Checkpointing

335

336

```python

337

from langgraph.func import entrypoint, task

338

from langgraph.checkpoint.memory import MemorySaver

339

340

@task

341

def step1(data: dict) -> dict:

342

return {"step1_done": True, "value": data["value"] + 1}

343

344

@task

345

def step2(data: dict) -> dict:

346

return {"step2_done": True, "value": data["value"] * 2}

347

348

# Create checkpointer for persistence

349

checkpointer = MemorySaver()

350

351

@entrypoint(checkpointer=checkpointer)

352

def workflow(input: dict) -> dict:

353

"""Workflow with checkpointing."""

354

result1 = step1(input).result()

355

result2 = step2(result1).result()

356

357

return result2

358

359

# Use with thread_id for persistence

360

config = {"configurable": {"thread_id": "session-1"}}

361

result = workflow.invoke({"value": 5}, config)

362

363

# Can resume from checkpoint

364

state = workflow.get_state(config)

365

```

366

367

### Using Entrypoint Final

368

369

```python

370

from langgraph.func import entrypoint, task

371

372

@task

373

def process(data: dict) -> dict:

374

return {"processed": data["value"] * 2}

375

376

@entrypoint()

377

def workflow(input: dict) -> entrypoint.final:

378

"""

379

Workflow that returns different value than what's saved.

380

"""

381

result = process(input).result()

382

383

# Return user-friendly message, but save full state

384

return entrypoint.final(

385

value={"message": "Processing complete!"},

386

save={"full_result": result, "timestamp": "2024-01-01"}

387

)

388

389

# Caller receives the value

390

result = workflow.invoke({"value": 10})

391

# result == {"message": "Processing complete!"}

392

393

# But checkpoint contains save

394

config = {"configurable": {"thread_id": "1"}}

395

result = workflow.invoke({"value": 10}, config)

396

state = workflow.get_state(config)

397

# state.values == {"full_result": ..., "timestamp": "2024-01-01"}

398

```

399

400

### Async Entrypoint

401

402

```python

403

from langgraph.func import entrypoint, task

404

405

@task

406

async def async_fetch(url: str) -> dict:

407

"""Async task."""

408

# Async operation

409

return {"url": url, "data": "fetched"}

410

411

@entrypoint()

412

async def async_workflow(input: dict) -> dict:

413

"""Async workflow."""

414

urls = input["urls"]

415

416

# Launch parallel async tasks

417

futures = [async_fetch(url) for url in urls]

418

419

# Await results

420

results = [await f for f in futures]

421

422

return {"results": results}

423

424

# Use async invoke

425

result = await async_workflow.ainvoke({"urls": ["url1", "url2"]})

426

```

427

428

### Accessing Runtime Context

429

430

```python

431

from langgraph.func import entrypoint, task

432

from langgraph.runtime import get_runtime

433

434

@task

435

def process_with_context(data: dict) -> dict:

436

"""Task that accesses runtime context."""

437

runtime = get_runtime()

438

439

# Access context, store, stream_writer

440

if runtime.context:

441

# Use context data

442

pass

443

444

if runtime.store:

445

# Access store

446

pass

447

448

return {"processed": data["value"]}

449

450

@entrypoint()

451

def workflow(input: dict) -> dict:

452

"""Workflow with runtime context."""

453

runtime = get_runtime()

454

455

# Access previous return value (for multi-step flows)

456

if runtime.previous:

457

# Use previous value

458

pass

459

460

result = process_with_context(input).result()

461

return result

462

```

463

464

## Notes

465

466

- Tasks execute lazily - actual execution happens when results are accessed or when the entrypoint function returns

467

- Tasks called within an entrypoint automatically run in parallel when possible

468

- Both sync and async variants are supported

469

- Tasks can call other tasks, creating nested parallelism

470

- Entrypoints inherit all Pregel methods (invoke, stream, get_state, etc.)

471

- The functional API provides automatic state management - no need to explicitly define state schema

472