Saga middleware for Redux to handle side effects using ES6 generators
—
Channel system for communication between sagas and external event sources, enabling integration with WebSockets, DOM events, and other async data sources.
Creates a channel for inter-saga communication. Channels are useful for connecting sagas to external input sources or for communication between sagas.
/**
* Create channel for inter-saga communication
* @param buffer - Optional buffer to control message queuing (default: 10 messages, FIFO)
* @returns Channel that can be used with take/put effects
*/
function channel<T extends NotUndefined>(buffer?: Buffer<T>): Channel<T>;
interface Channel<T> extends TakeableChannel<T>, PuttableChannel<T>, FlushableChannel<T> {
take(cb: (message: T | END) => void): void;
put(message: T | END): void;
flush(cb: (items: T[] | END) => void): void;
close(): void;
}Usage Examples:
import { channel, take, put, fork } from "redux-saga/effects";
function* producer(chan) {
for (let i = 0; i < 5; i++) {
yield put(chan, `message-${i}`);
yield delay(1000);
}
chan.close();
}
function* consumer(chan) {
try {
while (true) {
const message = yield take(chan);
console.log('Received:', message);
}
} catch (e) {
console.log('Channel closed');
}
}
function* channelSaga() {
const chan = yield call(channel);
yield fork(producer, chan);
yield fork(consumer, chan);
}Creates a channel that subscribes to an external event source. The channel will queue incoming events until takers are registered.
/**
* Create channel that subscribes to external event source
* @param subscribe - Function that subscribes to events and returns unsubscribe function
* @param buffer - Optional buffer for queuing events
* @returns EventChannel for external event integration
*/
function eventChannel<T extends NotUndefined>(
subscribe: Subscribe<T>,
buffer?: Buffer<T>
): EventChannel<T>;
type Subscribe<T> = (cb: (input: T | END) => void) => Unsubscribe;
type Unsubscribe = () => void;
interface EventChannel<T extends NotUndefined> {
take(cb: (message: T | END) => void): void;
flush(cb: (items: T[] | END) => void): void;
close(): void;
}Usage Examples:
import { eventChannel, take, call } from "redux-saga/effects";
// WebSocket integration
function createWebSocketChannel(socket) {
return eventChannel(emit => {
const onMessage = (event) => emit(JSON.parse(event.data));
const onError = (error) => emit(new Error(error.message));
const onClose = () => emit(END);
socket.addEventListener('message', onMessage);
socket.addEventListener('error', onError);
socket.addEventListener('close', onClose);
// Return unsubscribe function
return () => {
socket.removeEventListener('message', onMessage);
socket.removeEventListener('error', onError);
socket.removeEventListener('close', onClose);
};
});
}
function* watchWebSocket() {
const socket = new WebSocket('ws://localhost:8080');
const channel = yield call(createWebSocketChannel, socket);
try {
while (true) {
const message = yield take(channel);
console.log('WebSocket message:', message);
// Handle message
}
} finally {
channel.close();
}
}
// DOM event integration
function createClickChannel(element) {
return eventChannel(emit => {
const clickHandler = (event) => emit(event);
element.addEventListener('click', clickHandler);
return () => element.removeEventListener('click', clickHandler);
});
}
function* watchClicks() {
const button = document.getElementById('my-button');
const channel = yield call(createClickChannel, button);
while (true) {
const clickEvent = yield take(channel);
console.log('Button clicked!', clickEvent);
}
}Creates a multicast channel that can have multiple takers. Unlike regular channels, multicast channels deliver messages to all registered takers.
/**
* Create multicast channel (delivers to all takers)
* @returns MulticastChannel that broadcasts to all takers
*/
function multicastChannel<T extends NotUndefined>(): MulticastChannel<T>;
interface MulticastChannel<T extends NotUndefined> {
take(cb: (message: T | END) => void, matcher?: Predicate<T>): void;
put(message: T | END): void;
close(): void;
}Usage Examples:
import { multicastChannel, take, put, fork } from "redux-saga/effects";
function* subscriber1(chan, name) {
while (true) {
const message = yield take(chan);
console.log(`${name} received:`, message);
}
}
function* subscriber2(chan, name) {
while (true) {
const message = yield take(chan);
console.log(`${name} received:`, message);
}
}
function* broadcaster(chan) {
for (let i = 0; i < 3; i++) {
yield put(chan, `broadcast-${i}`);
yield delay(1000);
}
}
function* multicastExample() {
const chan = yield call(multicastChannel);
// Multiple subscribers will all receive messages
yield fork(subscriber1, chan, 'Sub1');
yield fork(subscriber2, chan, 'Sub2');
yield fork(broadcaster, chan);
}Creates the standard channel used internally by Redux-Saga for dispatched actions.
/**
* Create standard channel (used internally for actions)
* @returns MulticastChannel for action dispatching
*/
function stdChannel<T extends NotUndefined>(): MulticastChannel<T>;Creates an effect that queues actions matching a pattern using an event channel. Useful for controlling action processing rate.
/**
* Create channel that queues actions matching pattern
* @param pattern - Action pattern to match
* @param buffer - Optional buffer for controlling queuing
* @returns ActionChannelEffect
*/
function actionChannel(
pattern: ActionPattern,
buffer?: Buffer<Action>
): ActionChannelEffect;Usage Examples:
import { actionChannel, take, call } from "redux-saga/effects";
function* handleRequestsOneAtATime() {
// Create channel that queues USER_REQUEST actions
const requestChan = yield actionChannel('USER_REQUEST');
while (true) {
// Process one request at a time
const action = yield take(requestChan);
yield call(processUserRequest, action.payload);
}
}
// With custom buffer
function* handleWithCustomBuffer() {
const chan = yield actionChannel('DATA_REQUEST', buffers.sliding(5));
while (true) {
const action = yield take(chan);
yield call(processData, action.payload);
}
}Creates an effect that flushes all buffered items from a channel. Returns the flushed items to the saga.
/**
* Flush all buffered items from channel
* @param channel - Channel to flush
* @returns FlushEffect that resolves with flushed items
*/
function flush<T>(channel: FlushableChannel<T>): FlushEffect<T>;Usage Examples:
import { actionChannel, flush, take } from "redux-saga/effects";
function* batchProcessor() {
const requestChan = yield actionChannel('BATCH_REQUEST');
try {
while (true) {
const action = yield take(requestChan);
yield call(processBatchItem, action.payload);
}
} finally {
// Flush remaining items on saga termination
const remainingItems = yield flush(requestChan);
if (remainingItems.length > 0) {
console.log('Processing remaining items:', remainingItems);
for (const item of remainingItems) {
yield call(processBatchItem, item.payload);
}
}
}
}Redux-Saga provides several buffer types for controlling how channels queue messages:
const buffers: {
/** No buffering, messages lost if no takers */
none<T>(): Buffer<T>;
/** Buffer up to limit, throw on overflow (default: 10) */
fixed<T>(limit?: number): Buffer<T>;
/** Like fixed but expands dynamically on overflow */
expanding<T>(limit?: number): Buffer<T>;
/** Like fixed but silently drops messages on overflow */
dropping<T>(limit?: number): Buffer<T>;
/** Like fixed but drops oldest messages on overflow */
sliding<T>(limit?: number): Buffer<T>;
};Buffer Usage Examples:
import { channel, buffers } from "redux-saga";
function* bufferExamples() {
// No buffering - messages lost if no takers
const noneChannel = yield call(channel, buffers.none());
// Fixed buffer - throws error on overflow
const fixedChannel = yield call(channel, buffers.fixed(5));
// Expanding buffer - grows dynamically
const expandingChannel = yield call(channel, buffers.expanding(10));
// Dropping buffer - drops new messages on overflow
const droppingChannel = yield call(channel, buffers.dropping(3));
// Sliding buffer - drops oldest messages on overflow
const slidingChannel = yield call(channel, buffers.sliding(3));
}function createWebSocketSaga(url) {
return function* () {
const socket = new WebSocket(url);
const channel = yield call(createWebSocketChannel, socket);
try {
while (true) {
const message = yield take(channel);
yield put({ type: 'WEBSOCKET_MESSAGE', payload: message });
}
} catch (error) {
yield put({ type: 'WEBSOCKET_ERROR', error });
} finally {
socket.close();
}
};
}function createSSEChannel(url) {
return eventChannel(emit => {
const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
emit(JSON.parse(event.data));
};
eventSource.onerror = (error) => {
emit(new Error('SSE connection failed'));
};
return () => eventSource.close();
});
}Install with Tessl CLI
npx tessl i tessl/npm-redux-saga