Python client for Redis database and key-value store
—
Redis Sentinel provides high availability for Redis deployments through automatic failover, service discovery, and monitoring. Sentinel acts as a distributed system that monitors Redis master and replica instances and manages failover when needed.
Redis Sentinel client for connecting to Redis with automatic failover and service discovery.
class Sentinel:
def __init__(
self,
sentinels: List[Tuple[str, int]],
min_other_sentinels: int = 0,
sentinel_kwargs: Optional[Dict[str, Any]] = None,
password: Optional[str] = None,
db: int = 0,
decode_responses: bool = False,
**kwargs
): ...
def discover_master(self, service_name: str) -> Tuple[str, int]: ...
def discover_slaves(self, service_name: str) -> List[Tuple[str, int]]: ...
def master_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
connection_pool_class: Type[ConnectionPool] = SentinelConnectionPool,
**kwargs
) -> Redis: ...
def slave_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
connection_pool_class: Type[ConnectionPool] = SentinelConnectionPool,
**kwargs
) -> Redis: ...
def execute_command(
self,
*args,
**kwargs
) -> Any: ...Commands for managing and monitoring Sentinel instances and Redis services.
def sentinel_masters(self) -> List[Dict[str, Any]]: ...
def sentinel_master(self, service_name: str) -> Dict[str, Any]: ...
def sentinel_slaves(self, service_name: str) -> List[Dict[str, Any]]: ...
def sentinel_sentinels(self, service_name: str) -> List[Dict[str, Any]]: ...
def sentinel_get_master_addr_by_name(self, service_name: str) -> Optional[Tuple[str, int]]: ...
def sentinel_reset(self, pattern: str) -> int: ...
def sentinel_failover(self, service_name: str) -> bool: ...
def sentinel_ckquorum(self, service_name: str) -> bool: ...
def sentinel_flushconfig(self) -> bool: ...
def sentinel_monitor(
self,
service_name: str,
ip: str,
port: int,
quorum: int
) -> bool: ...
def sentinel_remove(self, service_name: str) -> bool: ...
def sentinel_set(self, service_name: str, option: str, value: Any) -> bool: ...Specialized connection pool and connection classes for Sentinel-managed Redis instances.
class SentinelConnectionPool(ConnectionPool):
def __init__(
self,
service_name: str,
sentinel_manager: SentinelManager,
**kwargs
): ...
def get_master_address(self) -> Tuple[str, int]: ...
def rotate_slaves(self) -> None: ...
class SentinelManagedConnection(Connection):
def __init__(
self,
**kwargs
): ...
class SentinelManagedSSLConnection(SSLConnection):
def __init__(
self,
**kwargs
): ...import redis
from redis.sentinel import Sentinel
# Configure Sentinel connections
sentinels = [
('localhost', 26379),
('localhost', 26380),
('localhost', 26381)
]
# Create Sentinel instance
sentinel = Sentinel(sentinels, socket_timeout=0.1)
# Discover master and slaves
master_address = sentinel.discover_master('mymaster')
slave_addresses = sentinel.discover_slaves('mymaster')
print(f"Master: {master_address}")
print(f"Slaves: {slave_addresses}")from redis.sentinel import Sentinel
# Setup Sentinel
sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)]
sentinel = Sentinel(sentinels, socket_timeout=0.1)
# Get Redis master client
master = sentinel.master_for('mymaster', socket_timeout=0.1)
# Get Redis slave client for read operations
slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
# Write to master
master.set('key', 'value')
# Read from slave
value = slave.get('key')
print(f"Value from slave: {value}")from redis.sentinel import Sentinel
# Sentinel with password for Redis instances
sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)]
sentinel = Sentinel(
sentinels,
socket_timeout=0.1,
password='redis_password', # Password for Redis instances
sentinel_kwargs={
'password': 'sentinel_password' # Password for Sentinel instances
}
)
# Connect to authenticated Redis
master = sentinel.master_for('mymaster', password='redis_password')
slave = sentinel.slave_for('mymaster', password='redis_password')from redis.sentinel import Sentinel
from redis import Redis
# Custom Sentinel configuration
sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381')]
sentinel = Sentinel(
sentinels,
min_other_sentinels=1, # Require at least 1 other sentinel
socket_timeout=0.1,
socket_connect_timeout=0.1,
socket_keepalive=True,
retry_on_timeout=True,
decode_responses=True
)
# Get Redis clients with custom settings
master = sentinel.master_for(
'mymaster',
redis_class=Redis,
socket_timeout=0.1,
socket_connect_timeout=0.1,
socket_keepalive=True,
retry_on_timeout=True,
max_connections=20,
health_check_interval=30
)
slave = sentinel.slave_for(
'mymaster',
redis_class=Redis,
socket_timeout=0.1,
socket_connect_timeout=0.1,
socket_keepalive=True,
retry_on_timeout=True,
max_connections=20,
health_check_interval=30
)from redis.sentinel import Sentinel
sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)]
sentinel = Sentinel(sentinels)
# Get all masters managed by Sentinel
masters = sentinel.sentinel_masters()
for master in masters:
print(f"Master: {master['name']} at {master['ip']}:{master['port']}")
print(f"Status: {master['flags']}")
print(f"Slaves: {master['num-slaves']}")
print(f"Sentinels: {master['num-other-sentinels']}")
# Get specific master info
master_info = sentinel.sentinel_master('mymaster')
print(f"Master info: {master_info}")
# Get slaves for a master
slaves = sentinel.sentinel_slaves('mymaster')
for slave in slaves:
print(f"Slave: {slave['ip']}:{slave['port']} - {slave['flags']}")
# Get other sentinels monitoring this master
sentinels_info = sentinel.sentinel_sentinels('mymaster')
for sent in sentinels_info:
print(f"Sentinel: {sent['ip']}:{sent['port']} - {sent['flags']}")import time
from redis.sentinel import Sentinel
from redis.exceptions import ConnectionError, ResponseError
sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)]
sentinel = Sentinel(sentinels, socket_timeout=0.1)
# Get master with automatic failover
master = sentinel.master_for('mymaster', socket_timeout=0.1)
def robust_operation():
"""Perform Redis operations with failover handling"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Try to perform operation
master.set('test_key', f'value_{int(time.time())}')
value = master.get('test_key')
print(f"Operation successful: {value}")
return
except ConnectionError as e:
retry_count += 1
print(f"Connection error (attempt {retry_count}): {e}")
if retry_count < max_retries:
print("Waiting for failover...")
time.sleep(2) # Wait for Sentinel failover
# Get new master connection after failover
master = sentinel.master_for('mymaster', socket_timeout=0.1)
else:
print("Max retries exceeded")
raise
# Demonstrate robust operation
robust_operation()from redis.sentinel import Sentinel
sentinels = [('localhost', 26379)]
sentinel = Sentinel(sentinels)
# Add a new master to monitor
sentinel.sentinel_monitor('newmaster', '127.0.0.1', 6380, 2)
# Configure master settings
sentinel.sentinel_set('newmaster', 'down-after-milliseconds', 5000)
sentinel.sentinel_set('newmaster', 'failover-timeout', 60000)
sentinel.sentinel_set('newmaster', 'parallel-syncs', 1)
# Check quorum for failover
can_failover = sentinel.sentinel_ckquorum('newmaster')
print(f"Can perform failover: {can_failover}")
# Force manual failover
if can_failover:
result = sentinel.sentinel_failover('newmaster')
print(f"Failover initiated: {result}")
# Remove master from monitoring
# sentinel.sentinel_remove('newmaster')
# Reset sentinel state
reset_count = sentinel.sentinel_reset('*')
print(f"Reset {reset_count} masters")
# Save configuration
sentinel.sentinel_flushconfig()import time
from redis.sentinel import Sentinel
sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)]
sentinel = Sentinel(sentinels)
# Get master for publishing
master = sentinel.master_for('mymaster')
# Get slave for subscribing (optional - can use master)
slave = sentinel.slave_for('mymaster')
# Publisher function
def publish_messages():
for i in range(10):
master.publish('news', f'Breaking news {i}')
time.sleep(1)
# Subscriber function
def subscribe_messages():
pubsub = master.pubsub() # Use master for pub/sub reliability
pubsub.subscribe('news')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
# Use threading or asyncio to run both
import threading
publisher_thread = threading.Thread(target=publish_messages)
subscriber_thread = threading.Thread(target=subscribe_messages)
subscriber_thread.start()
time.sleep(0.5) # Small delay to ensure subscription
publisher_thread.start()
publisher_thread.join()from redis.sentinel import Sentinel
from redis.exceptions import (
ConnectionError,
ResponseError,
MasterNotFoundError,
SlaveNotFoundError
)
def setup_sentinel_client():
sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)]
try:
sentinel = Sentinel(sentinels, socket_timeout=1.0)
# Test master discovery
master_addr = sentinel.discover_master('mymaster')
print(f"Master found at: {master_addr}")
# Get master client
master = sentinel.master_for('mymaster', socket_timeout=1.0)
# Test connection
master.ping()
print("Successfully connected to Redis master via Sentinel")
return sentinel, master
except MasterNotFoundError as e:
print(f"Master not found: {e}")
raise
except ConnectionError as e:
print(f"Cannot connect to Sentinel: {e}")
raise
except ResponseError as e:
print(f"Sentinel response error: {e}")
raise
# Use the setup function
try:
sentinel, master = setup_sentinel_client()
# Perform operations
master.set('health_check', 'ok')
status = master.get('health_check')
print(f"Health check: {status}")
except Exception as e:
print(f"Failed to setup Sentinel client: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-redis