0
# Pipelining & Transactions
1
2
ioredis provides powerful command batching and atomic transaction capabilities through pipelining and MULTI/EXEC transactions. These features enable high-performance batch operations and consistent data manipulation.
3
4
## Capabilities
5
6
### Pipelining
7
8
Pipeline multiple commands for execution in a single network round-trip, dramatically improving performance for batch operations.
9
10
```typescript { .api }
11
// Create pipeline
12
pipeline(commands?: unknown[][]): ChainableCommander;
13
14
interface ChainableCommander {
15
// All Redis commands available for chaining
16
get(key: RedisKey): ChainableCommander;
17
set(key: RedisKey, value: RedisValue): ChainableCommander;
18
hget(key: RedisKey, field: string): ChainableCommander;
19
hset(key: RedisKey, field: string, value: RedisValue): ChainableCommander;
20
// ... all other Redis commands
21
22
// Execute pipeline
23
exec(callback?: Callback<Array<[Error | null, any]>>): Promise<Array<[Error | null, any]>>;
24
execBuffer(callback?: Callback<Array<[Error | null, any]>>): Promise<Array<[Error | null, any]>>;
25
26
// Pipeline properties
27
readonly length: number;
28
readonly isPipeline: true;
29
}
30
```
31
32
**Usage Examples:**
33
34
```typescript
35
import Redis from "ioredis";
36
37
const redis = new Redis();
38
39
// Basic pipeline
40
const pipeline = redis.pipeline();
41
pipeline.set("key1", "value1");
42
pipeline.set("key2", "value2");
43
pipeline.get("key1");
44
pipeline.get("key2");
45
46
const results = await pipeline.exec();
47
console.log(results);
48
// [
49
// [null, "OK"], // set key1
50
// [null, "OK"], // set key2
51
// [null, "value1"], // get key1
52
// [null, "value2"] // get key2
53
// ]
54
55
// Chained pipeline
56
const results2 = await redis
57
.pipeline()
58
.set("user:123", "Alice")
59
.hset("user:123:profile", "email", "alice@example.com")
60
.hset("user:123:profile", "age", "25")
61
.exec();
62
63
// Pipeline with predefined commands
64
const commands = [
65
["set", "key1", "value1"],
66
["set", "key2", "value2"],
67
["mget", "key1", "key2"]
68
];
69
const results3 = await redis.pipeline(commands).exec();
70
```
71
72
### Error Handling in Pipelines
73
74
Handle individual command errors within pipeline execution.
75
76
```typescript { .api }
77
// Pipeline results format
78
type PipelineResult = Array<[Error | null, any]>;
79
```
80
81
**Usage Examples:**
82
83
```typescript
84
const pipeline = redis.pipeline();
85
pipeline.set("valid_key", "value");
86
pipeline.get("nonexistent_key");
87
pipeline.hget("wrong_type_key", "field"); // Error if key is not a hash
88
89
const results = await pipeline.exec();
90
91
results.forEach(([error, result], index) => {
92
if (error) {
93
console.error(`Command ${index} failed:`, error.message);
94
} else {
95
console.log(`Command ${index} result:`, result);
96
}
97
});
98
```
99
100
### Transactions (MULTI/EXEC)
101
102
Execute multiple commands atomically using Redis transactions with optimistic locking support.
103
104
```typescript { .api }
105
// Start transaction
106
multi(options?: { pipeline?: boolean }): ChainableCommander | Promise<"OK">;
107
108
// Transaction methods (same interface as pipeline)
109
interface ChainableCommander {
110
// All Redis commands available
111
// ... Redis commands
112
113
// Transaction control
114
exec(callback?: Callback<Array<[Error | null, any]> | null>): Promise<Array<[Error | null, any]> | null>;
115
discard(): Promise<"OK">;
116
117
// Watch keys for optimistic locking (before MULTI)
118
watch(...keys: RedisKey[]): Promise<"OK">;
119
unwatch(): Promise<"OK">;
120
}
121
```
122
123
**Usage Examples:**
124
125
```typescript
126
// Basic transaction
127
const transaction = redis.multi();
128
transaction.set("account:1:balance", "100");
129
transaction.set("account:2:balance", "200");
130
transaction.incrby("account:1:balance", -50);
131
transaction.incrby("account:2:balance", 50);
132
133
const results = await transaction.exec();
134
if (results) {
135
console.log("Transaction completed successfully");
136
} else {
137
console.log("Transaction was discarded");
138
}
139
140
// Optimistic locking with WATCH
141
await redis.watch("balance");
142
const currentBalance = parseInt(await redis.get("balance") || "0");
143
144
if (currentBalance >= 10) {
145
const transaction = redis.multi();
146
transaction.decrby("balance", 10);
147
transaction.incrby("purchases", 1);
148
149
const results = await transaction.exec();
150
if (results) {
151
console.log("Purchase successful");
152
} else {
153
console.log("Balance was modified, transaction cancelled");
154
}
155
} else {
156
await redis.unwatch();
157
console.log("Insufficient balance");
158
}
159
```
160
161
### Auto-Pipelining
162
163
Automatic command batching for improved performance without manual pipeline management.
164
165
```typescript { .api }
166
interface RedisOptions {
167
enableAutoPipelining?: boolean;
168
autoPipeliningIgnoredCommands?: string[];
169
}
170
171
// Redis instance properties
172
readonly autoPipelineQueueSize: number;
173
```
174
175
**Usage Examples:**
176
177
```typescript
178
const redis = new Redis({
179
enableAutoPipelining: true,
180
autoPipeliningIgnoredCommands: ["subscribe", "psubscribe"]
181
});
182
183
// These commands will be automatically batched
184
const promise1 = redis.get("key1");
185
const promise2 = redis.get("key2");
186
const promise3 = redis.set("key3", "value3");
187
188
// All resolve when the batch executes
189
const [value1, value2, result3] = await Promise.all([promise1, promise2, promise3]);
190
191
// Check queue size
192
console.log(`Commands in auto-pipeline queue: ${redis.autoPipelineQueueSize}`);
193
```
194
195
## Advanced Usage
196
197
### Pipeline with Callbacks
198
199
Use callbacks with pipeline operations for specific use cases.
200
201
```typescript { .api }
202
pipeline().exec(callback?: Callback<Array<[Error | null, any]>>): Promise<Array<[Error | null, any]>>;
203
```
204
205
```typescript
206
const pipeline = redis.pipeline();
207
pipeline.set("key1", "value1");
208
pipeline.get("key1");
209
210
pipeline.exec((err, results) => {
211
if (err) {
212
console.error("Pipeline error:", err);
213
return;
214
}
215
216
results.forEach(([cmdErr, result], index) => {
217
if (cmdErr) {
218
console.error(`Command ${index} error:`, cmdErr);
219
} else {
220
console.log(`Command ${index}:`, result);
221
}
222
});
223
});
224
```
225
226
### Transaction Rollback Detection
227
228
Detect when transactions are rolled back due to WATCH key modifications.
229
230
```typescript
231
async function transferMoney(fromAccount: string, toAccount: string, amount: number) {
232
await redis.watch(fromAccount, toAccount);
233
234
const fromBalance = parseInt(await redis.get(fromAccount) || "0");
235
const toBalance = parseInt(await redis.get(toAccount) || "0");
236
237
if (fromBalance < amount) {
238
await redis.unwatch();
239
throw new Error("Insufficient funds");
240
}
241
242
const transaction = redis.multi();
243
transaction.set(fromAccount, fromBalance - amount);
244
transaction.set(toAccount, toBalance + amount);
245
246
const results = await transaction.exec();
247
248
if (results === null) {
249
throw new Error("Transaction failed - accounts were modified during transfer");
250
}
251
252
return { success: true, fromBalance: fromBalance - amount, toBalance: toBalance + amount };
253
}
254
255
// Usage
256
try {
257
const result = await transferMoney("account:alice", "account:bob", 50);
258
console.log("Transfer successful:", result);
259
} catch (error) {
260
console.error("Transfer failed:", error.message);
261
}
262
```
263
264
### Buffer Support in Pipelines
265
266
Handle binary data in pipeline operations.
267
268
```typescript { .api }
269
// Buffer variants
270
execBuffer(callback?: Callback<Array<[Error | null, any]>>): Promise<Array<[Error | null, any]>>;
271
```
272
273
```typescript
274
const pipeline = redis.pipeline();
275
pipeline.set(Buffer.from("binary_key"), Buffer.from("binary_value"));
276
pipeline.get(Buffer.from("binary_key"));
277
278
const results = await pipeline.execBuffer();
279
const [, [, binaryValue]] = results;
280
console.log("Binary value:", binaryValue); // Buffer
281
```
282
283
### Transaction with Custom Logic
284
285
Implement complex business logic with transactions and conditional execution.
286
287
```typescript
288
async function processOrder(userId: string, productId: string, quantity: number) {
289
const userKey = `user:${userId}`;
290
const productKey = `product:${productId}`;
291
const orderKey = `order:${Date.now()}`;
292
293
// Watch relevant keys
294
await redis.watch(userKey, productKey);
295
296
// Get current state
297
const [userCredits, productStock] = await Promise.all([
298
redis.hget(userKey, "credits").then(c => parseInt(c || "0")),
299
redis.hget(productKey, "stock").then(s => parseInt(s || "0"))
300
]);
301
302
const productPrice = parseInt(await redis.hget(productKey, "price") || "0");
303
const totalCost = productPrice * quantity;
304
305
// Validate business rules
306
if (userCredits < totalCost) {
307
await redis.unwatch();
308
throw new Error("Insufficient credits");
309
}
310
311
if (productStock < quantity) {
312
await redis.unwatch();
313
throw new Error("Insufficient stock");
314
}
315
316
// Execute transaction
317
const transaction = redis.multi();
318
transaction.hincrby(userKey, "credits", -totalCost);
319
transaction.hincrby(productKey, "stock", -quantity);
320
transaction.hset(orderKey, {
321
userId,
322
productId,
323
quantity: quantity.toString(),
324
totalCost: totalCost.toString(),
325
timestamp: Date.now().toString()
326
});
327
328
const results = await transaction.exec();
329
330
if (results === null) {
331
throw new Error("Order processing failed - data was modified during processing");
332
}
333
334
return {
335
orderId: orderKey,
336
remainingCredits: userCredits - totalCost,
337
remainingStock: productStock - quantity
338
};
339
}
340
```
341
342
## Performance Considerations
343
344
### Pipeline vs Auto-Pipelining
345
346
Choose the appropriate batching strategy based on your use case.
347
348
```typescript
349
// Manual pipeline - full control, explicit batching
350
const pipeline = redis.pipeline();
351
for (let i = 0; i < 1000; i++) {
352
pipeline.set(`key:${i}`, `value:${i}`);
353
}
354
const results = await pipeline.exec();
355
356
// Auto-pipelining - automatic batching, simpler code
357
const promises = [];
358
for (let i = 0; i < 1000; i++) {
359
promises.push(redis.set(`key:${i}`, `value:${i}`));
360
}
361
const results2 = await Promise.all(promises);
362
```
363
364
### Transaction Performance
365
366
Minimize transaction scope and duration for better performance.
367
368
```typescript
369
// Good - minimal transaction scope
370
await redis.watch("counter");
371
const current = await redis.get("counter");
372
const transaction = redis.multi();
373
transaction.set("counter", parseInt(current) + 1);
374
await transaction.exec();
375
376
// Better - use atomic operations when possible
377
await redis.incr("counter");
378
```
379
380
## Types
381
382
```typescript { .api }
383
type PipelineResult = Array<[Error | null, any]>;
384
type Callback<T> = (err?: Error | null, result?: T) => void;
385
type RedisKey = string | Buffer;
386
type RedisValue = string | Buffer | number;
387
```