or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

basic-effects.mdchannels.mdconcurrency-effects.mdhelper-effects.mdindex.mdmiddleware.mdtesting.mdutilities.md

channels.mddocs/

0

# Channels

1

2

Channel system for communication between sagas and external event sources, enabling integration with WebSockets, DOM events, and other async data sources.

3

4

## Capabilities

5

6

### channel

7

8

Creates a channel for inter-saga communication. Channels are useful for connecting sagas to external input sources or for communication between sagas.

9

10

```typescript { .api }

11

/**

12

* Create channel for inter-saga communication

13

* @param buffer - Optional buffer to control message queuing (default: 10 messages, FIFO)

14

* @returns Channel that can be used with take/put effects

15

*/

16

function channel<T extends NotUndefined>(buffer?: Buffer<T>): Channel<T>;

17

18

interface Channel<T> extends TakeableChannel<T>, PuttableChannel<T>, FlushableChannel<T> {

19

take(cb: (message: T | END) => void): void;

20

put(message: T | END): void;

21

flush(cb: (items: T[] | END) => void): void;

22

close(): void;

23

}

24

```

25

26

**Usage Examples:**

27

28

```typescript

29

import { channel, take, put, fork } from "redux-saga/effects";

30

31

function* producer(chan) {

32

for (let i = 0; i < 5; i++) {

33

yield put(chan, `message-${i}`);

34

yield delay(1000);

35

}

36

chan.close();

37

}

38

39

function* consumer(chan) {

40

try {

41

while (true) {

42

const message = yield take(chan);

43

console.log('Received:', message);

44

}

45

} catch (e) {

46

console.log('Channel closed');

47

}

48

}

49

50

function* channelSaga() {

51

const chan = yield call(channel);

52

yield fork(producer, chan);

53

yield fork(consumer, chan);

54

}

55

```

56

57

### eventChannel

58

59

Creates a channel that subscribes to an external event source. The channel will queue incoming events until takers are registered.

60

61

```typescript { .api }

62

/**

63

* Create channel that subscribes to external event source

64

* @param subscribe - Function that subscribes to events and returns unsubscribe function

65

* @param buffer - Optional buffer for queuing events

66

* @returns EventChannel for external event integration

67

*/

68

function eventChannel<T extends NotUndefined>(

69

subscribe: Subscribe<T>,

70

buffer?: Buffer<T>

71

): EventChannel<T>;

72

73

type Subscribe<T> = (cb: (input: T | END) => void) => Unsubscribe;

74

type Unsubscribe = () => void;

75

76

interface EventChannel<T extends NotUndefined> {

77

take(cb: (message: T | END) => void): void;

78

flush(cb: (items: T[] | END) => void): void;

79

close(): void;

80

}

81

```

82

83

**Usage Examples:**

84

85

```typescript

86

import { eventChannel, take, call } from "redux-saga/effects";

87

88

// WebSocket integration

89

function createWebSocketChannel(socket) {

90

return eventChannel(emit => {

91

const onMessage = (event) => emit(JSON.parse(event.data));

92

const onError = (error) => emit(new Error(error.message));

93

const onClose = () => emit(END);

94

95

socket.addEventListener('message', onMessage);

96

socket.addEventListener('error', onError);

97

socket.addEventListener('close', onClose);

98

99

// Return unsubscribe function

100

return () => {

101

socket.removeEventListener('message', onMessage);

102

socket.removeEventListener('error', onError);

103

socket.removeEventListener('close', onClose);

104

};

105

});

106

}

107

108

function* watchWebSocket() {

109

const socket = new WebSocket('ws://localhost:8080');

110

const channel = yield call(createWebSocketChannel, socket);

111

112

try {

113

while (true) {

114

const message = yield take(channel);

115

console.log('WebSocket message:', message);

116

// Handle message

117

}

118

} finally {

119

channel.close();

120

}

121

}

122

123

// DOM event integration

124

function createClickChannel(element) {

125

return eventChannel(emit => {

126

const clickHandler = (event) => emit(event);

127

element.addEventListener('click', clickHandler);

128

129

return () => element.removeEventListener('click', clickHandler);

130

});

131

}

132

133

function* watchClicks() {

134

const button = document.getElementById('my-button');

135

const channel = yield call(createClickChannel, button);

136

137

while (true) {

138

const clickEvent = yield take(channel);

139

console.log('Button clicked!', clickEvent);

140

}

141

}

142

```

143

144

### multicastChannel

145

146

Creates a multicast channel that can have multiple takers. Unlike regular channels, multicast channels deliver messages to all registered takers.

147

148

```typescript { .api }

149

/**

150

* Create multicast channel (delivers to all takers)

151

* @returns MulticastChannel that broadcasts to all takers

152

*/

153

function multicastChannel<T extends NotUndefined>(): MulticastChannel<T>;

154

155

interface MulticastChannel<T extends NotUndefined> {

156

take(cb: (message: T | END) => void, matcher?: Predicate<T>): void;

157

put(message: T | END): void;

158

close(): void;

159

}

160

```

161

162

**Usage Examples:**

163

164

```typescript

165

import { multicastChannel, take, put, fork } from "redux-saga/effects";

166

167

function* subscriber1(chan, name) {

168

while (true) {

169

const message = yield take(chan);

170

console.log(`${name} received:`, message);

171

}

172

}

173

174

function* subscriber2(chan, name) {

175

while (true) {

176

const message = yield take(chan);

177

console.log(`${name} received:`, message);

178

}

179

}

180

181

function* broadcaster(chan) {

182

for (let i = 0; i < 3; i++) {

183

yield put(chan, `broadcast-${i}`);

184

yield delay(1000);

185

}

186

}

187

188

function* multicastExample() {

189

const chan = yield call(multicastChannel);

190

191

// Multiple subscribers will all receive messages

192

yield fork(subscriber1, chan, 'Sub1');

193

yield fork(subscriber2, chan, 'Sub2');

194

yield fork(broadcaster, chan);

195

}

196

```

197

198

### stdChannel

199

200

Creates the standard channel used internally by Redux-Saga for dispatched actions.

201

202

```typescript { .api }

203

/**

204

* Create standard channel (used internally for actions)

205

* @returns MulticastChannel for action dispatching

206

*/

207

function stdChannel<T extends NotUndefined>(): MulticastChannel<T>;

208

```

209

210

### actionChannel

211

212

Creates an effect that queues actions matching a pattern using an event channel. Useful for controlling action processing rate.

213

214

```typescript { .api }

215

/**

216

* Create channel that queues actions matching pattern

217

* @param pattern - Action pattern to match

218

* @param buffer - Optional buffer for controlling queuing

219

* @returns ActionChannelEffect

220

*/

221

function actionChannel(

222

pattern: ActionPattern,

223

buffer?: Buffer<Action>

224

): ActionChannelEffect;

225

```

226

227

**Usage Examples:**

228

229

```typescript

230

import { actionChannel, take, call } from "redux-saga/effects";

231

232

function* handleRequestsOneAtATime() {

233

// Create channel that queues USER_REQUEST actions

234

const requestChan = yield actionChannel('USER_REQUEST');

235

236

while (true) {

237

// Process one request at a time

238

const action = yield take(requestChan);

239

yield call(processUserRequest, action.payload);

240

}

241

}

242

243

// With custom buffer

244

function* handleWithCustomBuffer() {

245

const chan = yield actionChannel('DATA_REQUEST', buffers.sliding(5));

246

247

while (true) {

248

const action = yield take(chan);

249

yield call(processData, action.payload);

250

}

251

}

252

```

253

254

### flush

255

256

Creates an effect that flushes all buffered items from a channel. Returns the flushed items to the saga.

257

258

```typescript { .api }

259

/**

260

* Flush all buffered items from channel

261

* @param channel - Channel to flush

262

* @returns FlushEffect that resolves with flushed items

263

*/

264

function flush<T>(channel: FlushableChannel<T>): FlushEffect<T>;

265

```

266

267

**Usage Examples:**

268

269

```typescript

270

import { actionChannel, flush, take } from "redux-saga/effects";

271

272

function* batchProcessor() {

273

const requestChan = yield actionChannel('BATCH_REQUEST');

274

275

try {

276

while (true) {

277

const action = yield take(requestChan);

278

yield call(processBatchItem, action.payload);

279

}

280

} finally {

281

// Flush remaining items on saga termination

282

const remainingItems = yield flush(requestChan);

283

if (remainingItems.length > 0) {

284

console.log('Processing remaining items:', remainingItems);

285

for (const item of remainingItems) {

286

yield call(processBatchItem, item.payload);

287

}

288

}

289

}

290

}

291

```

292

293

## Buffers

294

295

Redux-Saga provides several buffer types for controlling how channels queue messages:

296

297

```typescript { .api }

298

const buffers: {

299

/** No buffering, messages lost if no takers */

300

none<T>(): Buffer<T>;

301

/** Buffer up to limit, throw on overflow (default: 10) */

302

fixed<T>(limit?: number): Buffer<T>;

303

/** Like fixed but expands dynamically on overflow */

304

expanding<T>(limit?: number): Buffer<T>;

305

/** Like fixed but silently drops messages on overflow */

306

dropping<T>(limit?: number): Buffer<T>;

307

/** Like fixed but drops oldest messages on overflow */

308

sliding<T>(limit?: number): Buffer<T>;

309

};

310

```

311

312

**Buffer Usage Examples:**

313

314

```typescript

315

import { channel, buffers } from "redux-saga";

316

317

function* bufferExamples() {

318

// No buffering - messages lost if no takers

319

const noneChannel = yield call(channel, buffers.none());

320

321

// Fixed buffer - throws error on overflow

322

const fixedChannel = yield call(channel, buffers.fixed(5));

323

324

// Expanding buffer - grows dynamically

325

const expandingChannel = yield call(channel, buffers.expanding(10));

326

327

// Dropping buffer - drops new messages on overflow

328

const droppingChannel = yield call(channel, buffers.dropping(3));

329

330

// Sliding buffer - drops oldest messages on overflow

331

const slidingChannel = yield call(channel, buffers.sliding(3));

332

}

333

```

334

335

## Integration Patterns

336

337

### WebSocket Integration

338

339

```typescript

340

function createWebSocketSaga(url) {

341

return function* () {

342

const socket = new WebSocket(url);

343

const channel = yield call(createWebSocketChannel, socket);

344

345

try {

346

while (true) {

347

const message = yield take(channel);

348

yield put({ type: 'WEBSOCKET_MESSAGE', payload: message });

349

}

350

} catch (error) {

351

yield put({ type: 'WEBSOCKET_ERROR', error });

352

} finally {

353

socket.close();

354

}

355

};

356

}

357

```

358

359

### Server-Sent Events

360

361

```typescript

362

function createSSEChannel(url) {

363

return eventChannel(emit => {

364

const eventSource = new EventSource(url);

365

366

eventSource.onmessage = (event) => {

367

emit(JSON.parse(event.data));

368

};

369

370

eventSource.onerror = (error) => {

371

emit(new Error('SSE connection failed'));

372

};

373

374

return () => eventSource.close();

375

});

376

}

377

```