0
# State Management & Channels
1
2
Channel system for managing state communication and synchronization between graph nodes with various storage patterns and aggregation strategies.
3
4
## Capabilities
5
6
### Base Channel System
7
8
Foundation for all state management providing the core channel interface for value storage and updates.
9
10
```typescript { .api }
11
/**
12
* Base class for all channel implementations
13
*/
14
abstract class BaseChannel {
15
/** Update channel with new values */
16
abstract update(values: any[]): any;
17
18
/** Get current channel value */
19
abstract get(): any;
20
21
/** Create checkpoint of current state */
22
abstract checkpoint(): any;
23
24
/** Restore from checkpoint */
25
abstract fromCheckpoint(checkpoint: any): BaseChannel;
26
}
27
28
/**
29
* Binary operator for value aggregation
30
*/
31
type BinaryOperator<ValueType, UpdateType = ValueType> = (
32
left: ValueType,
33
right: UpdateType
34
) => ValueType;
35
```
36
37
### LastValue Channel
38
39
Channel that stores the most recent value, replacing previous values with each update.
40
41
```typescript { .api }
42
/**
43
* Channel storing only the most recent value
44
*/
45
type LastValue<T> = T;
46
47
class LastValueChannel<T> extends BaseChannel {
48
constructor(defaultValue?: T);
49
update(values: T[]): T;
50
get(): T;
51
}
52
```
53
54
**Usage Example:**
55
56
```typescript
57
// For storing current user session
58
const userChannel = new LastValueChannel<string>();
59
userChannel.update(["alice"]);
60
userChannel.update(["bob"]); // Replaces "alice"
61
console.log(userChannel.get()); // "bob"
62
```
63
64
### Topic Channel
65
66
Channel that maintains a complete history of all messages, ideal for conversation logs and audit trails.
67
68
```typescript { .api }
69
/**
70
* Channel storing message history
71
*/
72
type Topic<T> = T[];
73
74
class TopicChannel<T> extends BaseChannel {
75
constructor();
76
update(values: T[]): T[];
77
get(): T[];
78
}
79
```
80
81
**Usage Example:**
82
83
```typescript
84
// For chat message history
85
const messageChannel = new TopicChannel<string>();
86
messageChannel.update(["Hello"]);
87
messageChannel.update(["How are you?"]);
88
console.log(messageChannel.get()); // ["Hello", "How are you?"]
89
```
90
91
### Binary Operator Aggregate
92
93
Channel that uses binary operations to combine values, providing flexible aggregation patterns.
94
95
```typescript { .api }
96
/**
97
* Channel using binary operators for value aggregation
98
*/
99
class BinaryOperatorAggregate<ValueType, UpdateType> extends BaseChannel {
100
constructor(
101
operator: BinaryOperator<ValueType, UpdateType>,
102
defaultValue?: ValueType
103
);
104
update(values: UpdateType[]): ValueType;
105
get(): ValueType;
106
}
107
```
108
109
**Usage Examples:**
110
111
```typescript
112
// Sum aggregation
113
const sumChannel = new BinaryOperatorAggregate<number, number>(
114
(a, b) => a + b,
115
0
116
);
117
sumChannel.update([10, 20, 30]); // Result: 60
118
119
// Object merging
120
const configChannel = new BinaryOperatorAggregate<
121
Record<string, any>,
122
Record<string, any>
123
>(
124
(existing, update) => ({ ...existing, ...update }),
125
{}
126
);
127
configChannel.update([{ theme: "dark" }, { lang: "en" }]);
128
// Result: { theme: "dark", lang: "en" }
129
130
// Array concatenation
131
const listChannel = new BinaryOperatorAggregate<string[], string[]>(
132
(existing, newItems) => existing.concat(newItems),
133
[]
134
);
135
```
136
137
### Barrier Channels
138
139
Synchronization channels that wait for specific conditions before proceeding, enabling complex coordination patterns.
140
141
```typescript { .api }
142
/**
143
* Dynamic barrier that waits for variable conditions
144
*/
145
type DynamicBarrierValue = any;
146
147
class DynamicBarrierChannel extends BaseChannel {
148
constructor(condition: (values: any[]) => boolean);
149
update(values: any[]): any;
150
get(): any;
151
}
152
153
/**
154
* Named barrier waiting for specific named values
155
*/
156
type NamedBarrierValue = any;
157
158
class NamedBarrierChannel extends BaseChannel {
159
constructor(names: string[]);
160
update(values: any[]): any;
161
get(): any;
162
}
163
164
/**
165
* Barrier waiting for named channel values
166
*/
167
type WaitForNames = any;
168
169
class WaitForNamesChannel extends BaseChannel {
170
constructor(names: string[]);
171
update(values: any[]): any;
172
get(): any;
173
}
174
```
175
176
**Usage Example:**
177
178
```typescript
179
// Wait for multiple async operations
180
const barrier = new NamedBarrierChannel(["auth", "data", "config"]);
181
182
// Each operation reports completion
183
barrier.update([{ name: "auth", value: userToken }]);
184
barrier.update([{ name: "data", value: userData }]);
185
barrier.update([{ name: "config", value: appConfig }]);
186
187
// Now barrier.get() returns combined result
188
```
189
190
### Ephemeral Channels
191
192
Temporary storage channels that don't persist across checkpoints, useful for intermediate computations.
193
194
```typescript { .api }
195
/**
196
* Temporary storage that doesn't persist in checkpoints
197
*/
198
type EphemeralValue<T> = T;
199
200
class EphemeralChannel<T> extends BaseChannel {
201
constructor(defaultValue?: T);
202
update(values: T[]): T;
203
get(): T;
204
checkpoint(): null; // Returns null - not persisted
205
}
206
```
207
208
### Any Value Channel
209
210
Generic channel that can store any type of value without type constraints.
211
212
```typescript { .api }
213
/**
214
* Channel storing any type of value
215
*/
216
type AnyValue = any;
217
218
class AnyValueChannel extends BaseChannel {
219
constructor();
220
update(values: any[]): any;
221
get(): any;
222
}
223
```
224
225
## Channel Utilities
226
227
### Channel Operations
228
229
Utility functions for common channel operations and transformations.
230
231
```typescript { .api }
232
/**
233
* Channel utility class for common operations
234
*/
235
class Channel {
236
/** Create subscription to multiple channels */
237
static subscribeTo(
238
channels: string | string[],
239
options?: { key?: string; tags?: string[] }
240
): PregelNode;
241
242
/** Create write operations to channels */
243
static writeTo(
244
channels: string[],
245
writes?: Record<string, any>
246
): ChannelWrite;
247
}
248
249
interface ChannelWrite {
250
channels: string[];
251
values: any[];
252
}
253
254
interface PregelNode {
255
channels: string[];
256
mapper?: (values: any[]) => any;
257
}
258
```
259
260
**Usage Examples:**
261
262
```typescript
263
// Subscribe to multiple channels
264
const subscriber = Channel.subscribeTo(
265
["user_input", "system_state"],
266
{ key: "combined_input" }
267
);
268
269
// Write to multiple channels
270
const writer = Channel.writeTo(
271
["output", "log"],
272
{ output: result, log: `Processed at ${Date.now()}` }
273
);
274
```
275
276
## State Reducers
277
278
Common patterns for state reduction in channels and annotations.
279
280
```typescript { .api }
281
/** Common reducer patterns */
282
type StateReducer<T, U = T> = (current: T, update: U) => T;
283
284
// Built-in reducers
285
const addMessages: StateReducer<string[], string[]> = (x, y) => x.concat(y);
286
const replaceValue: StateReducer<any, any> = (x, y) => y;
287
const mergeObjects: StateReducer<Record<string, any>, Record<string, any>> =
288
(x, y) => ({ ...x, ...y });
289
const sumNumbers: StateReducer<number, number> = (x, y) => x + y;
290
const maxValue: StateReducer<number, number> = (x, y) => Math.max(x, y);
291
```
292
293
## Advanced Channel Patterns
294
295
### Multi-Channel Coordination
296
297
```typescript
298
// Coordinate multiple channels with barriers
299
const workflow = new StateGraph(Annotation.Root({
300
tasks: Annotation<string[]>({
301
reducer: (x, y) => x.concat(y),
302
default: () => [],
303
}),
304
completed: Annotation<Record<string, boolean>>({
305
reducer: (x, y) => ({ ...x, ...y }),
306
default: () => ({}),
307
}),
308
}))
309
.addNode("coordinator", async (state) => {
310
const allCompleted = ["task1", "task2", "task3"]
311
.every(task => state.completed[task]);
312
313
if (allCompleted) {
314
return { tasks: ["All tasks completed"] };
315
}
316
return { tasks: ["Waiting for tasks..."] };
317
});
318
```
319
320
### Channel-Based State Machines
321
322
```typescript
323
// State machine using channels
324
const stateMachine = Annotation.Root({
325
state: Annotation<string>({
326
reducer: (current, next) => next, // Always take new state
327
default: () => "idle",
328
}),
329
data: Annotation<any>({
330
reducer: (current, update) => ({ ...current, ...update }),
331
default: () => ({}),
332
}),
333
});
334
```