or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assistants.mdauth.mdclient.mdcrons.mdindex.mdreact.mdruns.mdstore.mdthreads.md

runs.mddocs/

0

# Runs Management and Streaming

1

2

The RunsClient provides comprehensive run execution and monitoring capabilities for LangGraph applications. It enables real-time streaming of run execution, batch processing, run lifecycle management, and flexible stream modes for different data requirements. Runs represent individual executions of LangGraph assistants within thread contexts.

3

4

## Core Functionality

5

6

The RunsClient supports:

7

8

- **Streaming Execution**: Real-time streaming of run execution with various stream modes

9

- **Run Lifecycle**: Create, get, list, cancel, and delete run operations

10

- **Batch Processing**: Create multiple runs simultaneously for parallel execution

11

- **Stream Management**: Join existing streams and manage streaming sessions

12

- **Wait Operations**: Block until run completion with configurable timeouts

13

- **Type Safety**: Full TypeScript support with generic types for state and events

14

15

## RunsClient API

16

17

```typescript { .api }

18

class RunsClient<TStateType = DefaultValues, TUpdateType = TStateType, TCustomEventType = unknown> extends BaseClient {

19

/**

20

* Stream run execution with real-time updates and events

21

* @param threadId - Thread ID for execution context (null for new thread)

22

* @param assistantId - Assistant ID to execute

23

* @param payload - Stream configuration and input data

24

* @returns TypedAsyncGenerator for streaming events

25

*/

26

stream<

27

TStreamMode extends StreamMode[] = ["values"],

28

TSubgraphs extends boolean = false

29

>(

30

threadId: string | null,

31

assistantId: string,

32

payload?: RunsStreamPayload

33

): TypedAsyncGenerator<TStreamMode, TSubgraphs, TStateType, TUpdateType, TCustomEventType>;

34

35

/**

36

* Create a new run execution

37

* @param threadId - Thread ID for execution context

38

* @param assistantId - Assistant ID to execute

39

* @param payload - Run configuration and input data

40

* @returns Promise resolving to created run

41

*/

42

create(threadId: string, assistantId: string, payload?: RunsCreatePayload): Promise<Run>;

43

44

/**

45

* Create multiple runs in parallel for batch processing

46

* @param payloads - Array of run creation configurations with assistant IDs

47

* @returns Promise resolving to array of created runs

48

*/

49

createBatch(payloads: (RunsCreatePayload & { assistantId: string })[]): Promise<Run[]>;

50

51

/**

52

* Wait for run completion and return final state values

53

* @param threadId - Thread ID (null for new thread)

54

* @param assistantId - Assistant ID to execute

55

* @param payload - Wait configuration

56

* @returns Promise resolving to final thread state values

57

*/

58

wait(

59

threadId: string | null,

60

assistantId: string,

61

payload?: RunsWaitPayload

62

): Promise<ThreadState["values"]>;

63

64

/**

65

* List all runs for a specific thread with pagination

66

* @param threadId - Thread ID to list runs for

67

* @param options - Listing and filtering options

68

* @returns Promise resolving to array of runs

69

*/

70

list(threadId: string, options?: ListRunsOptions): Promise<Run[]>;

71

72

/**

73

* Get specific run by ID

74

* @param threadId - Thread ID containing the run

75

* @param runId - Run ID to retrieve

76

* @returns Promise resolving to run details

77

*/

78

get(threadId: string, runId: string): Promise<Run>;

79

80

/**

81

* Cancel a running execution

82

* @param threadId - Thread ID containing the run

83

* @param runId - Run ID to cancel

84

* @param wait - Whether to wait for cancellation to complete

85

* @param action - Cancellation action type

86

* @returns Promise resolving when cancellation completes

87

*/

88

cancel(

89

threadId: string,

90

runId: string,

91

wait?: boolean,

92

action?: CancelAction

93

): Promise<void>;

94

95

/**

96

* Wait for run to complete execution

97

* @param threadId - Thread ID containing the run

98

* @param runId - Run ID to wait for

99

* @param options - Join options including signal for cancellation

100

* @returns Promise resolving when run completes

101

*/

102

join(

103

threadId: string,

104

runId: string,

105

options?: { signal?: AbortSignal }

106

): Promise<void>;

107

108

/**

109

* Join an existing streaming run to receive ongoing events

110

* @param threadId - Thread ID (null if not specified)

111

* @param runId - Run ID to join

112

* @param options - Stream joining options

113

* @returns AsyncGenerator for streaming events

114

*/

115

joinStream<TStreamMode extends StreamMode[] = ["values"]>(

116

threadId: string | null,

117

runId: string,

118

options?: JoinStreamOptions<TStreamMode>

119

): AsyncGenerator<StreamEvent<TStreamMode, TStateType, TUpdateType, TCustomEventType>>;

120

121

/**

122

* Delete a run permanently

123

* @param threadId - Thread ID containing the run

124

* @param runId - Run ID to delete

125

* @returns Promise resolving when deletion completes

126

*/

127

delete(threadId: string, runId: string): Promise<void>;

128

}

129

```

130

131

## Core Types

132

133

### Run Interface

134

135

```typescript { .api }

136

interface Run {

137

/** Unique run identifier */

138

run_id: string;

139

/** Thread ID for execution context */

140

thread_id: string;

141

/** Assistant ID being executed */

142

assistant_id: string;

143

/** Run creation timestamp */

144

created_at: string;

145

/** Run last update timestamp */

146

updated_at: string;

147

/** Current run status */

148

status: RunStatus;

149

/** Run metadata */

150

metadata: Metadata;

151

/** Run configuration */

152

config: Config;

153

/** Run kwargs */

154

kwargs: Record<string, any>;

155

/** Multitask strategy */

156

multitask_strategy: MultitaskStrategy;

157

}

158

159

type RunStatus = "pending" | "running" | "error" | "success" | "timeout" | "interrupted";

160

161

type CancelAction = "interrupt" | "rollback";

162

163

type MultitaskStrategy = "reject" | "interrupt" | "rollback" | "enqueue";

164

```

165

166

### Stream Types

167

168

```typescript { .api }

169

type StreamMode =

170

| "values" // Current state values

171

| "updates" // State updates

172

| "messages" // Message updates

173

| "events" // All execution events

174

| "debug" // Debug information

175

| "metadata" // Metadata updates

176

| "custom"; // Custom events

177

178

// Stream Event Types

179

interface ValuesStreamEvent<ValuesType = DefaultValues> {

180

event: "values";

181

data: ValuesType;

182

timestamp: string;

183

tags?: string[];

184

}

185

186

interface UpdatesStreamEvent<UpdateType = DefaultValues> {

187

event: "updates";

188

data: Record<string, UpdateType>;

189

timestamp: string;

190

tags?: string[];

191

}

192

193

interface MessagesStreamEvent {

194

event: "messages";

195

data: Message[];

196

timestamp: string;

197

tags?: string[];

198

}

199

200

interface EventsStreamEvent {

201

event: "events";

202

data: {

203

event: string;

204

name: string;

205

data: Record<string, any>;

206

tags: string[];

207

};

208

timestamp: string;

209

}

210

211

interface DebugStreamEvent {

212

event: "debug";

213

data: {

214

type: string;

215

timestamp: string;

216

step: number;

217

payload: Record<string, any>;

218

};

219

timestamp: string;

220

}

221

222

interface MetadataStreamEvent {

223

event: "metadata";

224

data: Metadata;

225

timestamp: string;

226

}

227

228

interface CustomStreamEvent<TCustomEventType = unknown> {

229

event: "custom";

230

data: TCustomEventType;

231

timestamp: string;

232

}

233

234

type StreamEvent<

235

TStreamMode extends StreamMode[] = ["values"],

236

TStateType = DefaultValues,

237

TUpdateType = TStateType,

238

TCustomEventType = unknown

239

> =

240

| (TStreamMode extends readonly ("values" | "values" | "values")[] ? ValuesStreamEvent<TStateType> : never)

241

| (TStreamMode extends readonly ("updates" | "updates" | "updates")[] ? UpdatesStreamEvent<TUpdateType> : never)

242

| (TStreamMode extends readonly ("messages" | "messages" | "messages")[] ? MessagesStreamEvent : never)

243

| (TStreamMode extends readonly ("events" | "events" | "events")[] ? EventsStreamEvent : never)

244

| (TStreamMode extends readonly ("debug" | "debug" | "debug")[] ? DebugStreamEvent : never)

245

| (TStreamMode extends readonly ("metadata" | "metadata" | "metadata")[] ? MetadataStreamEvent : never)

246

| (TStreamMode extends readonly ("custom" | "custom" | "custom")[] ? CustomStreamEvent<TCustomEventType> : never);

247

```

248

249

### TypedAsyncGenerator

250

251

```typescript { .api }

252

type TypedAsyncGenerator<

253

TStreamMode extends StreamMode[] = ["values"],

254

TSubgraphs extends boolean = false,

255

TStateType = DefaultValues,

256

TUpdateType = TStateType,

257

TCustomEventType = unknown

258

> = AsyncGenerator<StreamEvent<TStreamMode, TStateType, TUpdateType, TCustomEventType>>;

259

```

260

261

## Payload Types

262

263

### Stream and Create Payloads

264

265

```typescript { .api }

266

interface RunsStreamPayload {

267

/** Input data for the run */

268

input?: Record<string, any>;

269

/** Run configuration */

270

config?: Config;

271

/** Stream modes to enable */

272

streamMode?: StreamMode | StreamMode[];

273

/** Enable subgraph streaming */

274

streamSubgraphs?: boolean;

275

/** Run metadata */

276

metadata?: Metadata;

277

/** Multitask strategy */

278

multitaskStrategy?: MultitaskStrategy;

279

/** Feedback configuration */

280

feedbackKeys?: string[];

281

/** Interrupt after nodes */

282

interruptAfter?: string[];

283

/** Interrupt before nodes */

284

interruptBefore?: string[];

285

/** Webhook configuration */

286

webhook?: string;

287

/** Request kwargs */

288

kwargs?: Record<string, any>;

289

}

290

291

interface RunsCreatePayload {

292

/** Input data for the run */

293

input?: Record<string, any>;

294

/** Run configuration */

295

config?: Config;

296

/** Run metadata */

297

metadata?: Metadata;

298

/** Multitask strategy */

299

multitaskStrategy?: MultitaskStrategy;

300

/** Webhook configuration */

301

webhook?: string;

302

/** Interrupt after nodes */

303

interruptAfter?: string[];

304

/** Interrupt before nodes */

305

interruptBefore?: string[];

306

/** Request kwargs */

307

kwargs?: Record<string, any>;

308

}

309

310

interface RunsWaitPayload extends RunsCreatePayload {

311

/** Maximum wait time in seconds */

312

timeout?: number;

313

}

314

```

315

316

### Query and Options

317

318

```typescript { .api }

319

interface ListRunsOptions {

320

/** Maximum runs to return */

321

limit?: number;

322

/** Pagination offset */

323

offset?: number;

324

/** Filter by run status */

325

status?: RunStatus;

326

/** Filter by metadata */

327

metadata?: Record<string, any>;

328

/** Created after timestamp */

329

createdAfter?: string;

330

/** Created before timestamp */

331

createdBefore?: string;

332

}

333

334

interface JoinStreamOptions<TStreamMode extends StreamMode[] = ["values"]> {

335

/** Stream modes to join */

336

streamMode?: TStreamMode;

337

/** Enable subgraph streaming */

338

streamSubgraphs?: boolean;

339

/** Abort signal for cancellation */

340

signal?: AbortSignal;

341

}

342

```

343

344

## Usage Examples

345

346

### Basic Streaming

347

348

```typescript

349

import { Client } from "@langchain/langgraph-sdk";

350

351

const client = new Client({

352

apiUrl: "https://api.langgraph.example.com",

353

apiKey: "your-api-key"

354

});

355

356

// Stream a simple run with values

357

const stream = client.runs.stream(

358

"thread_123",

359

"assistant_456",

360

{

361

input: { message: "Hello, how can you help me?" },

362

streamMode: "values"

363

}

364

);

365

366

for await (const chunk of stream) {

367

if (chunk.event === "values") {

368

console.log("Current state:", chunk.data);

369

}

370

}

371

```

372

373

### Multi-Mode Streaming

374

375

```typescript

376

// Stream with multiple modes for comprehensive monitoring

377

const multiModeStream = client.runs.stream(

378

"thread_123",

379

"assistant_456",

380

{

381

input: {

382

message: "Analyze this data",

383

data: [1, 2, 3, 4, 5]

384

},

385

streamMode: ["values", "updates", "messages", "debug"],

386

streamSubgraphs: true,

387

metadata: { session_id: "session_789" }

388

}

389

);

390

391

for await (const chunk of multiModeStream) {

392

switch (chunk.event) {

393

case "values":

394

console.log("πŸ“Š State values:", chunk.data);

395

break;

396

397

case "updates":

398

console.log("πŸ”„ State updates:", chunk.data);

399

break;

400

401

case "messages":

402

console.log("πŸ’¬ Messages:", chunk.data);

403

break;

404

405

case "debug":

406

console.log("πŸ› Debug info:", chunk.data);

407

break;

408

}

409

410

console.log("⏰ Timestamp:", chunk.timestamp);

411

if (chunk.tags) {

412

console.log("🏷️ Tags:", chunk.tags);

413

}

414

}

415

```

416

417

### Run Creation and Management

418

419

```typescript

420

// Create a run without streaming

421

const run = await client.runs.create(

422

"thread_123",

423

"assistant_456",

424

{

425

input: { query: "What's the weather like?" },

426

config: {

427

configurable: {

428

temperature: 0.7,

429

max_tokens: 1000

430

}

431

},

432

metadata: { priority: "high" },

433

multitaskStrategy: "interrupt"

434

}

435

);

436

437

console.log("Created run:", run.run_id);

438

console.log("Status:", run.status);

439

440

// Wait for completion

441

await client.runs.join("thread_123", run.run_id);

442

443

// Get updated run status

444

const updatedRun = await client.runs.get("thread_123", run.run_id);

445

console.log("Final status:", updatedRun.status);

446

```

447

448

### Batch Processing

449

450

```typescript

451

// Create multiple runs in parallel

452

const batchPayloads = [

453

{

454

assistantId: "assistant_456",

455

input: { task: "analyze_data_1" },

456

config: { tags: ["batch", "data_analysis"] }

457

},

458

{

459

assistantId: "assistant_789",

460

input: { task: "generate_report_1" },

461

config: { tags: ["batch", "reporting"] }

462

},

463

{

464

assistantId: "assistant_456",

465

input: { task: "analyze_data_2" },

466

config: { tags: ["batch", "data_analysis"] }

467

}

468

];

469

470

const batchRuns = await client.runs.createBatch(batchPayloads);

471

472

console.log(`Created ${batchRuns.length} runs in batch`);

473

474

// Monitor all runs

475

const runPromises = batchRuns.map(run =>

476

client.runs.join("thread_123", run.run_id)

477

);

478

479

await Promise.all(runPromises);

480

console.log("All batch runs completed");

481

```

482

483

### Wait Operations

484

485

```typescript

486

// Wait for completion with timeout

487

try {

488

const result = await client.runs.wait(

489

"thread_123",

490

"assistant_456",

491

{

492

input: { message: "Process this request" },

493

timeout: 30, // 30 seconds timeout

494

config: {

495

recursion_limit: 50

496

}

497

}

498

);

499

500

console.log("Final result:", result);

501

} catch (error) {

502

if (error.name === "TimeoutError") {

503

console.log("Run timed out");

504

} else {

505

console.error("Run failed:", error);

506

}

507

}

508

```

509

510

### Stream Joining and Cancellation

511

512

```typescript

513

// Join an existing stream

514

const existingStream = client.runs.joinStream(

515

"thread_123",

516

"run_456",

517

{

518

streamMode: ["values", "messages"],

519

streamSubgraphs: false

520

}

521

);

522

523

// Monitor with cancellation support

524

const abortController = new AbortController();

525

526

// Set timeout for stream monitoring

527

setTimeout(() => {

528

abortController.abort();

529

console.log("Stream monitoring cancelled");

530

}, 30000);

531

532

try {

533

for await (const chunk of existingStream) {

534

if (abortController.signal.aborted) {

535

break;

536

}

537

538

console.log("Stream event:", chunk.event);

539

console.log("Data:", chunk.data);

540

}

541

} catch (error) {

542

if (error.name === "AbortError") {

543

console.log("Stream was cancelled");

544

}

545

}

546

547

// Cancel a run if needed

548

await client.runs.cancel("thread_123", "run_456", true, "interrupt");

549

```

550

551

### Advanced Streaming with Custom Events

552

553

```typescript

554

interface CustomEvent {

555

type: "progress" | "warning" | "metric";

556

payload: Record<string, any>;

557

component: string;

558

}

559

560

// Stream with custom event handling

561

const customStream = client.runs.stream<

562

["values", "custom"],

563

false,

564

{ messages: Message[]; progress: number },

565

{ messages: Message[]; progress: number },

566

CustomEvent

567

>(

568

"thread_123",

569

"assistant_456",

570

{

571

input: { task: "complex_analysis" },

572

streamMode: ["values", "custom"],

573

metadata: { enable_custom_events: true }

574

}

575

);

576

577

for await (const chunk of customStream) {

578

switch (chunk.event) {

579

case "values":

580

console.log("Progress:", chunk.data.progress);

581

console.log("Messages count:", chunk.data.messages.length);

582

break;

583

584

case "custom":

585

const customEvent = chunk.data;

586

console.log(`Custom ${customEvent.type} from ${customEvent.component}:`);

587

console.log(customEvent.payload);

588

589

if (customEvent.type === "warning") {

590

console.warn("⚠️ Warning received:", customEvent.payload);

591

}

592

break;

593

}

594

}

595

```

596

597

### Error Handling and Recovery

598

599

```typescript

600

async function robustStreamProcessing() {

601

const maxRetries = 3;

602

let retryCount = 0;

603

604

while (retryCount < maxRetries) {

605

try {

606

const stream = client.runs.stream(

607

"thread_123",

608

"assistant_456",

609

{

610

input: { message: "Process this carefully" },

611

streamMode: ["values", "debug"],

612

config: {

613

tags: [`attempt_${retryCount + 1}`]

614

}

615

}

616

);

617

618

for await (const chunk of stream) {

619

if (chunk.event === "values") {

620

console.log("Processing:", chunk.data);

621

} else if (chunk.event === "debug") {

622

console.log("Debug:", chunk.data.payload);

623

}

624

}

625

626

console.log("Stream completed successfully");

627

break;

628

629

} catch (error) {

630

retryCount++;

631

console.error(`Attempt ${retryCount} failed:`, error);

632

633

if (retryCount >= maxRetries) {

634

console.error("All retry attempts exhausted");

635

throw error;

636

}

637

638

// Exponential backoff

639

const delay = Math.pow(2, retryCount) * 1000;

640

console.log(`Retrying in ${delay}ms...`);

641

await new Promise(resolve => setTimeout(resolve, delay));

642

}

643

}

644

}

645

646

// Run listing and cleanup

647

async function manageRuns() {

648

// List recent runs

649

const runs = await client.runs.list("thread_123", {

650

limit: 50,

651

status: "success",

652

createdAfter: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString()

653

});

654

655

console.log(`Found ${runs.length} successful runs from last 24h`);

656

657

// Clean up old failed runs

658

const failedRuns = await client.runs.list("thread_123", {

659

status: "error",

660

createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString()

661

});

662

663

for (const run of failedRuns) {

664

try {

665

await client.runs.delete("thread_123", run.run_id);

666

console.log(`Deleted failed run: ${run.run_id}`);

667

} catch (error) {

668

console.error(`Failed to delete run ${run.run_id}:`, error);

669

}

670

}

671

}

672

```

673

674

The RunsClient provides comprehensive run execution capabilities with real-time streaming, flexible event handling, and robust lifecycle management, making it the core component for executing LangGraph assistants and monitoring their progress.