or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdconnections.mdexceptions.mdindex.mdpipeline.mdpubsub.md

pipeline.mddocs/

0

# Cluster Pipeline

1

2

The ClusterPipeline class provides cluster-aware command batching with automatic routing to appropriate nodes while respecting Redis Cluster constraints. Unlike standard Redis pipelining, cluster pipelines have limitations due to the distributed nature of Redis Cluster.

3

4

## Capabilities

5

6

### ClusterPipeline Class

7

8

Cluster-aware pipeline for batching commands with automatic node routing and cluster constraint handling.

9

10

```python { .api }

11

class ClusterPipeline(RedisCluster):

12

def __init__(self, connection_pool, result_callbacks=None,

13

response_callbacks=None, startup_nodes=None,

14

read_from_replicas=False, cluster_down_retry_attempts=3):

15

"""

16

Initialize cluster pipeline.

17

18

Parameters:

19

- connection_pool (ClusterConnectionPool): Connection pool instance

20

- result_callbacks (dict, optional): Custom result processing callbacks

21

- response_callbacks (dict, optional): Custom response processing callbacks

22

- startup_nodes (List[StartupNode], optional): Startup nodes list

23

- read_from_replicas (bool): Enable reading from replica nodes

24

- cluster_down_retry_attempts (int): Retry attempts for cluster-down errors

25

"""

26

```

27

28

### Pipeline Context Management

29

30

Use pipelines as context managers for automatic execution and cleanup.

31

32

```python { .api }

33

def __enter__(self):

34

"""Enter pipeline context."""

35

36

def __exit__(self, exc_type, exc_val, exc_tb):

37

"""Exit pipeline context with automatic reset."""

38

```

39

40

### Command Queuing

41

42

Queue commands for batch execution with cluster-aware routing.

43

44

```python { .api }

45

def execute_command(self, *args, **kwargs):

46

"""

47

Queue command for pipeline execution.

48

49

Parameters:

50

- *args: Command name and arguments

51

- **kwargs: Additional command options

52

53

Returns:

54

ClusterPipeline: Pipeline instance for method chaining

55

56

Note: Commands are queued, not executed immediately

57

"""

58

```

59

60

### Pipeline Execution

61

62

Execute all queued commands and return results.

63

64

```python { .api }

65

def execute(self, raise_on_error=True):

66

"""

67

Execute all queued pipeline commands.

68

69

Parameters:

70

- raise_on_error (bool): Raise exception on command errors

71

72

Returns:

73

List[Any]: Results from all executed commands in order

74

75

Raises:

76

RedisClusterException: If commands violate cluster constraints

77

ResponseError: If individual commands fail and raise_on_error=True

78

"""

79

80

def reset(self):

81

"""

82

Clear all queued commands from pipeline.

83

Pipeline can be reused after reset.

84

"""

85

```

86

87

### Supported Commands

88

89

Limited set of Redis commands that work in cluster pipeline mode.

90

91

```python { .api }

92

def delete(self, *names):

93

"""

94

Queue key deletion (single key only in cluster mode).

95

96

Parameters:

97

- *names (str): Key name (only one key allowed)

98

99

Returns:

100

ClusterPipeline: Pipeline instance

101

102

Note: Multi-key delete not supported in cluster pipeline

103

"""

104

105

# Standard Redis commands that work in pipeline

106

def get(self, name): ...

107

def set(self, name, value, **kwargs): ...

108

def incr(self, name, amount=1): ...

109

def decr(self, name, amount=1): ...

110

def hget(self, name, key): ...

111

def hset(self, name, key=None, value=None, mapping=None): ...

112

def lpush(self, name, *values): ...

113

def rpush(self, name, *values): ...

114

def lpop(self, name, count=None): ...

115

def rpop(self, name, count=None): ...

116

def sadd(self, name, *values): ...

117

def srem(self, name, *values): ...

118

def zadd(self, name, mapping, **kwargs): ...

119

def zrem(self, name, *values): ...

120

```

121

122

### Blocked Commands

123

124

Many Redis commands are blocked in cluster pipeline mode due to cluster constraints.

125

126

```python { .api }

127

# These commands raise RedisClusterException when used in pipeline:

128

# - Multi-key operations: mget, mset, del with multiple keys

129

# - Cross-slot operations: smove, rpoplpush, brpoplpush

130

# - Transactions: multi, exec, discard, watch, unwatch

131

# - Pub/sub operations: publish, subscribe, unsubscribe

132

# - Lua scripts with multiple keys

133

# - Server management commands

134

```

135

136

## Pipeline Limitations

137

138

### Cluster Constraints

139

140

Redis Cluster pipeline operations have several important limitations:

141

142

1. **Single Key Operations**: Most pipelined operations must operate on single keys

143

2. **Same Slot Requirement**: Multi-key operations must hash to the same slot

144

3. **No Transactions**: MULTI/EXEC transactions not supported

145

4. **No Cross-Slot Commands**: Commands spanning multiple slots blocked

146

5. **Limited Lua Scripts**: Scripts with multiple keys must use same slot

147

148

### Error Handling

149

150

Pipeline execution can fail at various points due to cluster constraints.

151

152

```python { .api }

153

# Exception types during pipeline execution:

154

# - RedisClusterException: Cluster constraint violations

155

# - ResponseError: Individual command failures

156

# - ConnectionError: Node connectivity issues

157

# - MovedError/AskError: Slot migration during execution

158

```

159

160

## Usage Examples

161

162

### Basic Pipeline Usage

163

164

```python

165

from rediscluster import RedisCluster

166

167

rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

168

169

# Context manager (recommended)

170

with rc.pipeline() as pipe:

171

pipe.set("key1", "value1")

172

pipe.set("key2", "value2")

173

pipe.get("key1")

174

pipe.incr("counter")

175

results = pipe.execute()

176

177

print(results) # [True, True, 'value1', 1]

178

```

179

180

### Manual Pipeline Management

181

182

```python

183

# Manual pipeline creation and cleanup

184

pipe = rc.pipeline()

185

186

try:

187

pipe.set("user:1:name", "Alice")

188

pipe.set("user:1:email", "alice@example.com")

189

pipe.hset("user:1:profile", mapping={"age": 30, "city": "New York"})

190

pipe.get("user:1:name")

191

pipe.hgetall("user:1:profile")

192

193

results = pipe.execute()

194

print(f"Set results: {results[:3]}") # [True, True, 3]

195

print(f"Name: {results[3]}") # Alice

196

print(f"Profile: {results[4]}") # {'age': '30', 'city': 'New York'}

197

198

finally:

199

pipe.reset() # Clean up queued commands

200

```

201

202

### Error Handling

203

204

```python

205

pipe = rc.pipeline()

206

207

try:

208

pipe.set("valid_key", "value")

209

pipe.get("valid_key")

210

pipe.incr("non_numeric_key") # This will fail

211

212

# Execute with error handling

213

results = pipe.execute(raise_on_error=False)

214

215

for i, result in enumerate(results):

216

if isinstance(result, Exception):

217

print(f"Command {i} failed: {result}")

218

else:

219

print(f"Command {i} result: {result}")

220

221

except RedisClusterException as e:

222

print(f"Cluster constraint violation: {e}")

223

224

finally:

225

pipe.reset()

226

```

227

228

### Working with Hash Slots

229

230

```python

231

# Pipeline commands must respect slot constraints

232

import hashlib

233

234

def get_slot(key):

235

"""Calculate Redis Cluster slot for key."""

236

return hashlib.crc16(key.encode()) % 16384

237

238

# Keys that hash to the same slot can be used together

239

key1 = "user:123:profile"

240

key2 = "user:123:settings"

241

242

if get_slot(key1) == get_slot(key2):

243

print("Keys are in same slot - can pipeline together")

244

with rc.pipeline() as pipe:

245

pipe.hset(key1, "name", "John")

246

pipe.hset(key2, "theme", "dark")

247

pipe.hget(key1, "name")

248

pipe.hget(key2, "theme")

249

results = pipe.execute()

250

else:

251

print("Keys in different slots - use separate commands")

252

```

253

254

### Pipeline Performance Optimization

255

256

```python

257

# Batch similar operations for better performance

258

keys_to_set = [f"batch:key:{i}" for i in range(100)]

259

values = [f"value_{i}" for i in range(100)]

260

261

# Process in chunks to avoid large pipeline buildup

262

chunk_size = 50

263

for i in range(0, len(keys_to_set), chunk_size):

264

chunk_keys = keys_to_set[i:i+chunk_size]

265

chunk_values = values[i:i+chunk_size]

266

267

with rc.pipeline() as pipe:

268

for key, value in zip(chunk_keys, chunk_values):

269

pipe.set(key, value)

270

271

results = pipe.execute()

272

print(f"Set {len(results)} keys in chunk {i//chunk_size + 1}")

273

```

274

275

### Read-from-Replicas Pipeline

276

277

```python

278

# Pipeline with replica reads for load distribution

279

rc_with_replicas = RedisCluster(

280

startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],

281

read_from_replicas=True

282

)

283

284

with rc_with_replicas.pipeline(read_from_replicas=True) as pipe:

285

# Read operations can go to replicas

286

pipe.get("readonly_key1")

287

pipe.get("readonly_key2")

288

pipe.hgetall("readonly_hash")

289

290

read_results = pipe.execute()

291

print(f"Read results: {read_results}")

292

```

293

294

### Cluster-Specific Pipeline Patterns

295

296

```python

297

# Pattern: Single-key operations work reliably

298

def pipeline_single_key_operations(rc, key_prefix, count):

299

"""Pipeline operations on keys with same prefix."""

300

with rc.pipeline() as pipe:

301

for i in range(count):

302

key = f"{key_prefix}:{i}"

303

pipe.set(key, f"value_{i}")

304

pipe.expire(key, 3600) # 1 hour TTL

305

306

results = pipe.execute()

307

set_results = results[::2] # Every other result (set operations)

308

expire_results = results[1::2] # Every other result (expire operations)

309

310

return set_results, expire_results

311

312

# Pattern: Avoiding multi-key operations

313

def safe_multi_key_pipeline(rc, operations):

314

"""Execute multi-key operations safely by grouping by slot."""

315

from collections import defaultdict

316

317

# Group operations by calculated slot

318

slot_groups = defaultdict(list)

319

for op in operations:

320

slot = get_slot(op['key'])

321

slot_groups[slot].append(op)

322

323

all_results = []

324

for slot, ops in slot_groups.items():

325

with rc.pipeline() as pipe:

326

for op in ops:

327

getattr(pipe, op['command'])(op['key'], *op.get('args', []))

328

329

results = pipe.execute()

330

all_results.extend(results)

331

332

return all_results

333

```