0
# Cluster Pipeline
1
2
The ClusterPipeline class provides cluster-aware command batching with automatic routing to appropriate nodes while respecting Redis Cluster constraints. Unlike standard Redis pipelining, cluster pipelines have limitations due to the distributed nature of Redis Cluster.
3
4
## Capabilities
5
6
### ClusterPipeline Class
7
8
Cluster-aware pipeline for batching commands with automatic node routing and cluster constraint handling.
9
10
```python { .api }
11
class ClusterPipeline(RedisCluster):
12
def __init__(self, connection_pool, result_callbacks=None,
13
response_callbacks=None, startup_nodes=None,
14
read_from_replicas=False, cluster_down_retry_attempts=3):
15
"""
16
Initialize cluster pipeline.
17
18
Parameters:
19
- connection_pool (ClusterConnectionPool): Connection pool instance
20
- result_callbacks (dict, optional): Custom result processing callbacks
21
- response_callbacks (dict, optional): Custom response processing callbacks
22
- startup_nodes (List[StartupNode], optional): Startup nodes list
23
- read_from_replicas (bool): Enable reading from replica nodes
24
- cluster_down_retry_attempts (int): Retry attempts for cluster-down errors
25
"""
26
```
27
28
### Pipeline Context Management
29
30
Use pipelines as context managers for automatic execution and cleanup.
31
32
```python { .api }
33
def __enter__(self):
34
"""Enter pipeline context."""
35
36
def __exit__(self, exc_type, exc_val, exc_tb):
37
"""Exit pipeline context with automatic reset."""
38
```
39
40
### Command Queuing
41
42
Queue commands for batch execution with cluster-aware routing.
43
44
```python { .api }
45
def execute_command(self, *args, **kwargs):
46
"""
47
Queue command for pipeline execution.
48
49
Parameters:
50
- *args: Command name and arguments
51
- **kwargs: Additional command options
52
53
Returns:
54
ClusterPipeline: Pipeline instance for method chaining
55
56
Note: Commands are queued, not executed immediately
57
"""
58
```
59
60
### Pipeline Execution
61
62
Execute all queued commands and return results.
63
64
```python { .api }
65
def execute(self, raise_on_error=True):
66
"""
67
Execute all queued pipeline commands.
68
69
Parameters:
70
- raise_on_error (bool): Raise exception on command errors
71
72
Returns:
73
List[Any]: Results from all executed commands in order
74
75
Raises:
76
RedisClusterException: If commands violate cluster constraints
77
ResponseError: If individual commands fail and raise_on_error=True
78
"""
79
80
def reset(self):
81
"""
82
Clear all queued commands from pipeline.
83
Pipeline can be reused after reset.
84
"""
85
```
86
87
### Supported Commands
88
89
Limited set of Redis commands that work in cluster pipeline mode.
90
91
```python { .api }
92
def delete(self, *names):
93
"""
94
Queue key deletion (single key only in cluster mode).
95
96
Parameters:
97
- *names (str): Key name (only one key allowed)
98
99
Returns:
100
ClusterPipeline: Pipeline instance
101
102
Note: Multi-key delete not supported in cluster pipeline
103
"""
104
105
# Standard Redis commands that work in pipeline
106
def get(self, name): ...
107
def set(self, name, value, **kwargs): ...
108
def incr(self, name, amount=1): ...
109
def decr(self, name, amount=1): ...
110
def hget(self, name, key): ...
111
def hset(self, name, key=None, value=None, mapping=None): ...
112
def lpush(self, name, *values): ...
113
def rpush(self, name, *values): ...
114
def lpop(self, name, count=None): ...
115
def rpop(self, name, count=None): ...
116
def sadd(self, name, *values): ...
117
def srem(self, name, *values): ...
118
def zadd(self, name, mapping, **kwargs): ...
119
def zrem(self, name, *values): ...
120
```
121
122
### Blocked Commands
123
124
Many Redis commands are blocked in cluster pipeline mode due to cluster constraints.
125
126
```python { .api }
127
# These commands raise RedisClusterException when used in pipeline:
128
# - Multi-key operations: mget, mset, del with multiple keys
129
# - Cross-slot operations: smove, rpoplpush, brpoplpush
130
# - Transactions: multi, exec, discard, watch, unwatch
131
# - Pub/sub operations: publish, subscribe, unsubscribe
132
# - Lua scripts with multiple keys
133
# - Server management commands
134
```
135
136
## Pipeline Limitations
137
138
### Cluster Constraints
139
140
Redis Cluster pipeline operations have several important limitations:
141
142
1. **Single Key Operations**: Most pipelined operations must operate on single keys
143
2. **Same Slot Requirement**: Multi-key operations must hash to the same slot
144
3. **No Transactions**: MULTI/EXEC transactions not supported
145
4. **No Cross-Slot Commands**: Commands spanning multiple slots blocked
146
5. **Limited Lua Scripts**: Scripts with multiple keys must use same slot
147
148
### Error Handling
149
150
Pipeline execution can fail at various points due to cluster constraints.
151
152
```python { .api }
153
# Exception types during pipeline execution:
154
# - RedisClusterException: Cluster constraint violations
155
# - ResponseError: Individual command failures
156
# - ConnectionError: Node connectivity issues
157
# - MovedError/AskError: Slot migration during execution
158
```
159
160
## Usage Examples
161
162
### Basic Pipeline Usage
163
164
```python
165
from rediscluster import RedisCluster
166
167
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])
168
169
# Context manager (recommended)
170
with rc.pipeline() as pipe:
171
pipe.set("key1", "value1")
172
pipe.set("key2", "value2")
173
pipe.get("key1")
174
pipe.incr("counter")
175
results = pipe.execute()
176
177
print(results) # [True, True, 'value1', 1]
178
```
179
180
### Manual Pipeline Management
181
182
```python
183
# Manual pipeline creation and cleanup
184
pipe = rc.pipeline()
185
186
try:
187
pipe.set("user:1:name", "Alice")
188
pipe.set("user:1:email", "alice@example.com")
189
pipe.hset("user:1:profile", mapping={"age": 30, "city": "New York"})
190
pipe.get("user:1:name")
191
pipe.hgetall("user:1:profile")
192
193
results = pipe.execute()
194
print(f"Set results: {results[:3]}") # [True, True, 3]
195
print(f"Name: {results[3]}") # Alice
196
print(f"Profile: {results[4]}") # {'age': '30', 'city': 'New York'}
197
198
finally:
199
pipe.reset() # Clean up queued commands
200
```
201
202
### Error Handling
203
204
```python
205
pipe = rc.pipeline()
206
207
try:
208
pipe.set("valid_key", "value")
209
pipe.get("valid_key")
210
pipe.incr("non_numeric_key") # This will fail
211
212
# Execute with error handling
213
results = pipe.execute(raise_on_error=False)
214
215
for i, result in enumerate(results):
216
if isinstance(result, Exception):
217
print(f"Command {i} failed: {result}")
218
else:
219
print(f"Command {i} result: {result}")
220
221
except RedisClusterException as e:
222
print(f"Cluster constraint violation: {e}")
223
224
finally:
225
pipe.reset()
226
```
227
228
### Working with Hash Slots
229
230
```python
231
# Pipeline commands must respect slot constraints
232
import hashlib
233
234
def get_slot(key):
235
"""Calculate Redis Cluster slot for key."""
236
return hashlib.crc16(key.encode()) % 16384
237
238
# Keys that hash to the same slot can be used together
239
key1 = "user:123:profile"
240
key2 = "user:123:settings"
241
242
if get_slot(key1) == get_slot(key2):
243
print("Keys are in same slot - can pipeline together")
244
with rc.pipeline() as pipe:
245
pipe.hset(key1, "name", "John")
246
pipe.hset(key2, "theme", "dark")
247
pipe.hget(key1, "name")
248
pipe.hget(key2, "theme")
249
results = pipe.execute()
250
else:
251
print("Keys in different slots - use separate commands")
252
```
253
254
### Pipeline Performance Optimization
255
256
```python
257
# Batch similar operations for better performance
258
keys_to_set = [f"batch:key:{i}" for i in range(100)]
259
values = [f"value_{i}" for i in range(100)]
260
261
# Process in chunks to avoid large pipeline buildup
262
chunk_size = 50
263
for i in range(0, len(keys_to_set), chunk_size):
264
chunk_keys = keys_to_set[i:i+chunk_size]
265
chunk_values = values[i:i+chunk_size]
266
267
with rc.pipeline() as pipe:
268
for key, value in zip(chunk_keys, chunk_values):
269
pipe.set(key, value)
270
271
results = pipe.execute()
272
print(f"Set {len(results)} keys in chunk {i//chunk_size + 1}")
273
```
274
275
### Read-from-Replicas Pipeline
276
277
```python
278
# Pipeline with replica reads for load distribution
279
rc_with_replicas = RedisCluster(
280
startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],
281
read_from_replicas=True
282
)
283
284
with rc_with_replicas.pipeline(read_from_replicas=True) as pipe:
285
# Read operations can go to replicas
286
pipe.get("readonly_key1")
287
pipe.get("readonly_key2")
288
pipe.hgetall("readonly_hash")
289
290
read_results = pipe.execute()
291
print(f"Read results: {read_results}")
292
```
293
294
### Cluster-Specific Pipeline Patterns
295
296
```python
297
# Pattern: Single-key operations work reliably
298
def pipeline_single_key_operations(rc, key_prefix, count):
299
"""Pipeline operations on keys with same prefix."""
300
with rc.pipeline() as pipe:
301
for i in range(count):
302
key = f"{key_prefix}:{i}"
303
pipe.set(key, f"value_{i}")
304
pipe.expire(key, 3600) # 1 hour TTL
305
306
results = pipe.execute()
307
set_results = results[::2] # Every other result (set operations)
308
expire_results = results[1::2] # Every other result (expire operations)
309
310
return set_results, expire_results
311
312
# Pattern: Avoiding multi-key operations
313
def safe_multi_key_pipeline(rc, operations):
314
"""Execute multi-key operations safely by grouping by slot."""
315
from collections import defaultdict
316
317
# Group operations by calculated slot
318
slot_groups = defaultdict(list)
319
for op in operations:
320
slot = get_slot(op['key'])
321
slot_groups[slot].append(op)
322
323
all_results = []
324
for slot, ops in slot_groups.items():
325
with rc.pipeline() as pipe:
326
for op in ops:
327
getattr(pipe, op['command'])(op['key'], *op.get('args', []))
328
329
results = pipe.execute()
330
all_results.extend(results)
331
332
return all_results
333
```