or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster.mdcommands.mdconfiguration.mdindex.mdpipelining.mdpubsub.mdredis-client.mdstreaming.md
tile.json

pipelining.mddocs/

0

# Pipelining & Transactions

1

2

ioredis provides powerful command batching and atomic transaction capabilities through pipelining and MULTI/EXEC transactions. These features enable high-performance batch operations and consistent data manipulation.

3

4

## Capabilities

5

6

### Pipelining

7

8

Pipeline multiple commands for execution in a single network round-trip, dramatically improving performance for batch operations.

9

10

```typescript { .api }

11

// Create pipeline

12

pipeline(commands?: unknown[][]): ChainableCommander;

13

14

interface ChainableCommander {

15

// All Redis commands available for chaining

16

get(key: RedisKey): ChainableCommander;

17

set(key: RedisKey, value: RedisValue): ChainableCommander;

18

hget(key: RedisKey, field: string): ChainableCommander;

19

hset(key: RedisKey, field: string, value: RedisValue): ChainableCommander;

20

// ... all other Redis commands

21

22

// Execute pipeline

23

exec(callback?: Callback<Array<[Error | null, any]>>): Promise<Array<[Error | null, any]>>;

24

execBuffer(callback?: Callback<Array<[Error | null, any]>>): Promise<Array<[Error | null, any]>>;

25

26

// Pipeline properties

27

readonly length: number;

28

readonly isPipeline: true;

29

}

30

```

31

32

**Usage Examples:**

33

34

```typescript

35

import Redis from "ioredis";

36

37

const redis = new Redis();

38

39

// Basic pipeline

40

const pipeline = redis.pipeline();

41

pipeline.set("key1", "value1");

42

pipeline.set("key2", "value2");

43

pipeline.get("key1");

44

pipeline.get("key2");

45

46

const results = await pipeline.exec();

47

console.log(results);

48

// [

49

// [null, "OK"], // set key1

50

// [null, "OK"], // set key2

51

// [null, "value1"], // get key1

52

// [null, "value2"] // get key2

53

// ]

54

55

// Chained pipeline

56

const results2 = await redis

57

.pipeline()

58

.set("user:123", "Alice")

59

.hset("user:123:profile", "email", "alice@example.com")

60

.hset("user:123:profile", "age", "25")

61

.exec();

62

63

// Pipeline with predefined commands

64

const commands = [

65

["set", "key1", "value1"],

66

["set", "key2", "value2"],

67

["mget", "key1", "key2"]

68

];

69

const results3 = await redis.pipeline(commands).exec();

70

```

71

72

### Error Handling in Pipelines

73

74

Handle individual command errors within pipeline execution.

75

76

```typescript { .api }

77

// Pipeline results format

78

type PipelineResult = Array<[Error | null, any]>;

79

```

80

81

**Usage Examples:**

82

83

```typescript

84

const pipeline = redis.pipeline();

85

pipeline.set("valid_key", "value");

86

pipeline.get("nonexistent_key");

87

pipeline.hget("wrong_type_key", "field"); // Error if key is not a hash

88

89

const results = await pipeline.exec();

90

91

results.forEach(([error, result], index) => {

92

if (error) {

93

console.error(`Command ${index} failed:`, error.message);

94

} else {

95

console.log(`Command ${index} result:`, result);

96

}

97

});

98

```

99

100

### Transactions (MULTI/EXEC)

101

102

Execute multiple commands atomically using Redis transactions with optimistic locking support.

103

104

```typescript { .api }

105

// Start transaction

106

multi(options?: { pipeline?: boolean }): ChainableCommander | Promise<"OK">;

107

108

// Transaction methods (same interface as pipeline)

109

interface ChainableCommander {

110

// All Redis commands available

111

// ... Redis commands

112

113

// Transaction control

114

exec(callback?: Callback<Array<[Error | null, any]> | null>): Promise<Array<[Error | null, any]> | null>;

115

discard(): Promise<"OK">;

116

117

// Watch keys for optimistic locking (before MULTI)

118

watch(...keys: RedisKey[]): Promise<"OK">;

119

unwatch(): Promise<"OK">;

120

}

121

```

122

123

**Usage Examples:**

124

125

```typescript

126

// Basic transaction

127

const transaction = redis.multi();

128

transaction.set("account:1:balance", "100");

129

transaction.set("account:2:balance", "200");

130

transaction.incrby("account:1:balance", -50);

131

transaction.incrby("account:2:balance", 50);

132

133

const results = await transaction.exec();

134

if (results) {

135

console.log("Transaction completed successfully");

136

} else {

137

console.log("Transaction was discarded");

138

}

139

140

// Optimistic locking with WATCH

141

await redis.watch("balance");

142

const currentBalance = parseInt(await redis.get("balance") || "0");

143

144

if (currentBalance >= 10) {

145

const transaction = redis.multi();

146

transaction.decrby("balance", 10);

147

transaction.incrby("purchases", 1);

148

149

const results = await transaction.exec();

150

if (results) {

151

console.log("Purchase successful");

152

} else {

153

console.log("Balance was modified, transaction cancelled");

154

}

155

} else {

156

await redis.unwatch();

157

console.log("Insufficient balance");

158

}

159

```

160

161

### Auto-Pipelining

162

163

Automatic command batching for improved performance without manual pipeline management.

164

165

```typescript { .api }

166

interface RedisOptions {

167

enableAutoPipelining?: boolean;

168

autoPipeliningIgnoredCommands?: string[];

169

}

170

171

// Redis instance properties

172

readonly autoPipelineQueueSize: number;

173

```

174

175

**Usage Examples:**

176

177

```typescript

178

const redis = new Redis({

179

enableAutoPipelining: true,

180

autoPipeliningIgnoredCommands: ["subscribe", "psubscribe"]

181

});

182

183

// These commands will be automatically batched

184

const promise1 = redis.get("key1");

185

const promise2 = redis.get("key2");

186

const promise3 = redis.set("key3", "value3");

187

188

// All resolve when the batch executes

189

const [value1, value2, result3] = await Promise.all([promise1, promise2, promise3]);

190

191

// Check queue size

192

console.log(`Commands in auto-pipeline queue: ${redis.autoPipelineQueueSize}`);

193

```

194

195

## Advanced Usage

196

197

### Pipeline with Callbacks

198

199

Use callbacks with pipeline operations for specific use cases.

200

201

```typescript { .api }

202

pipeline().exec(callback?: Callback<Array<[Error | null, any]>>): Promise<Array<[Error | null, any]>>;

203

```

204

205

```typescript

206

const pipeline = redis.pipeline();

207

pipeline.set("key1", "value1");

208

pipeline.get("key1");

209

210

pipeline.exec((err, results) => {

211

if (err) {

212

console.error("Pipeline error:", err);

213

return;

214

}

215

216

results.forEach(([cmdErr, result], index) => {

217

if (cmdErr) {

218

console.error(`Command ${index} error:`, cmdErr);

219

} else {

220

console.log(`Command ${index}:`, result);

221

}

222

});

223

});

224

```

225

226

### Transaction Rollback Detection

227

228

Detect when transactions are rolled back due to WATCH key modifications.

229

230

```typescript

231

async function transferMoney(fromAccount: string, toAccount: string, amount: number) {

232

await redis.watch(fromAccount, toAccount);

233

234

const fromBalance = parseInt(await redis.get(fromAccount) || "0");

235

const toBalance = parseInt(await redis.get(toAccount) || "0");

236

237

if (fromBalance < amount) {

238

await redis.unwatch();

239

throw new Error("Insufficient funds");

240

}

241

242

const transaction = redis.multi();

243

transaction.set(fromAccount, fromBalance - amount);

244

transaction.set(toAccount, toBalance + amount);

245

246

const results = await transaction.exec();

247

248

if (results === null) {

249

throw new Error("Transaction failed - accounts were modified during transfer");

250

}

251

252

return { success: true, fromBalance: fromBalance - amount, toBalance: toBalance + amount };

253

}

254

255

// Usage

256

try {

257

const result = await transferMoney("account:alice", "account:bob", 50);

258

console.log("Transfer successful:", result);

259

} catch (error) {

260

console.error("Transfer failed:", error.message);

261

}

262

```

263

264

### Buffer Support in Pipelines

265

266

Handle binary data in pipeline operations.

267

268

```typescript { .api }

269

// Buffer variants

270

execBuffer(callback?: Callback<Array<[Error | null, any]>>): Promise<Array<[Error | null, any]>>;

271

```

272

273

```typescript

274

const pipeline = redis.pipeline();

275

pipeline.set(Buffer.from("binary_key"), Buffer.from("binary_value"));

276

pipeline.get(Buffer.from("binary_key"));

277

278

const results = await pipeline.execBuffer();

279

const [, [, binaryValue]] = results;

280

console.log("Binary value:", binaryValue); // Buffer

281

```

282

283

### Transaction with Custom Logic

284

285

Implement complex business logic with transactions and conditional execution.

286

287

```typescript

288

async function processOrder(userId: string, productId: string, quantity: number) {

289

const userKey = `user:${userId}`;

290

const productKey = `product:${productId}`;

291

const orderKey = `order:${Date.now()}`;

292

293

// Watch relevant keys

294

await redis.watch(userKey, productKey);

295

296

// Get current state

297

const [userCredits, productStock] = await Promise.all([

298

redis.hget(userKey, "credits").then(c => parseInt(c || "0")),

299

redis.hget(productKey, "stock").then(s => parseInt(s || "0"))

300

]);

301

302

const productPrice = parseInt(await redis.hget(productKey, "price") || "0");

303

const totalCost = productPrice * quantity;

304

305

// Validate business rules

306

if (userCredits < totalCost) {

307

await redis.unwatch();

308

throw new Error("Insufficient credits");

309

}

310

311

if (productStock < quantity) {

312

await redis.unwatch();

313

throw new Error("Insufficient stock");

314

}

315

316

// Execute transaction

317

const transaction = redis.multi();

318

transaction.hincrby(userKey, "credits", -totalCost);

319

transaction.hincrby(productKey, "stock", -quantity);

320

transaction.hset(orderKey, {

321

userId,

322

productId,

323

quantity: quantity.toString(),

324

totalCost: totalCost.toString(),

325

timestamp: Date.now().toString()

326

});

327

328

const results = await transaction.exec();

329

330

if (results === null) {

331

throw new Error("Order processing failed - data was modified during processing");

332

}

333

334

return {

335

orderId: orderKey,

336

remainingCredits: userCredits - totalCost,

337

remainingStock: productStock - quantity

338

};

339

}

340

```

341

342

## Performance Considerations

343

344

### Pipeline vs Auto-Pipelining

345

346

Choose the appropriate batching strategy based on your use case.

347

348

```typescript

349

// Manual pipeline - full control, explicit batching

350

const pipeline = redis.pipeline();

351

for (let i = 0; i < 1000; i++) {

352

pipeline.set(`key:${i}`, `value:${i}`);

353

}

354

const results = await pipeline.exec();

355

356

// Auto-pipelining - automatic batching, simpler code

357

const promises = [];

358

for (let i = 0; i < 1000; i++) {

359

promises.push(redis.set(`key:${i}`, `value:${i}`));

360

}

361

const results2 = await Promise.all(promises);

362

```

363

364

### Transaction Performance

365

366

Minimize transaction scope and duration for better performance.

367

368

```typescript

369

// Good - minimal transaction scope

370

await redis.watch("counter");

371

const current = await redis.get("counter");

372

const transaction = redis.multi();

373

transaction.set("counter", parseInt(current) + 1);

374

await transaction.exec();

375

376

// Better - use atomic operations when possible

377

await redis.incr("counter");

378

```

379

380

## Types

381

382

```typescript { .api }

383

type PipelineResult = Array<[Error | null, any]>;

384

type Callback<T> = (err?: Error | null, result?: T) => void;

385

type RedisKey = string | Buffer;

386

type RedisValue = string | Buffer | number;

387

```