Redis fixtures and fixture factories for Pytest.
npx @tessl/cli install tessl/pypi-pytest-redis@3.1.0Redis fixtures and fixture factories for Pytest. This plugin provides three main fixtures for testing code that relies on Redis databases: redisdb (a Redis client fixture that constructs a client and cleans the database after tests), redis_proc (a session-scoped fixture that starts/stops Redis instances), and redis_nooproc (for connecting to already running Redis servers).
pip install pytest-redisFor creating custom fixtures:
from pytest_redis import factoriesIndividual factory imports:
from pytest_redis.factories import redis_proc, redis_noproc, redisdbImport built-in fixtures (automatically available, no explicit import needed):
# These are automatically registered as pytest fixtures when pytest-redis is installed
# Available fixtures: redis_proc, redis_nooproc, redisdbImport internal components (for advanced usage):
from pytest_redis.config import get_config, RedisConfigType
from pytest_redis.executor import RedisExecutor, NoopRedis
from pytest_redis.exception import RedisUnsupported, RedisMisconfigured, UnixSocketTooLongNote: Factory function naming: redis_noproc() (factory function) creates fixtures named ending in _nooproc (the actual fixture), while the built-in fixture is named redis_nooproc.
from typing import Callable, Generator, List, Optional, Set, Tuple, TypedDict, Union
from pathlib import Path
from _pytest.fixtures import FixtureRequest
from _pytest.tmpdir import TempPathFactory
from packaging.version import Version
import redis
class RedisConfigType(TypedDict):
"""Pytest redis config definition type."""
host: str
port: Optional[int]
username: str
password: str
exec: str
timeout: int
loglevel: str
db_count: int
save: str
compression: bool
rdbchecksum: bool
syslog: bool
decode: bool
datadir: str
modules: List[str]The plugin automatically provides three fixtures when installed:
def test_redis(redisdb):
"""Test using the built-in Redis client fixture."""
# The redisdb fixture provides a Redis client and cleans up after the test
redisdb.set('test_key', 'test_value')
# Your code that uses Redis
my_component = MyRedisBasedComponent()
my_component.do_something()
# Verify results
assert redisdb.get('result_key') == b'expected_value'
# Database is automatically cleaned after this testfrom pytest_redis import factories
# Create custom Redis process and client fixtures
redis_my_proc = factories.redis_proc(port=None) # Random port
redis_my = factories.redisdb('redis_my_proc')
def test_custom_redis(redis_my):
"""Test using custom Redis fixtures."""
redis_my.set('custom_key', 'custom_value')
assert redis_my.get('custom_key') == b'custom_value'def test_existing_redis(redis_nooproc):
"""Test connecting to an already running Redis server."""
# Connects to Redis server at configured host/port (default: 127.0.0.1:6379)
# Does not start/stop Redis process - expects Redis to already be running
redis_nooproc.set('key', 'value')
assert redis_nooproc.get('key') == b'value'Three fixtures are automatically available when pytest-redis is installed:
# Automatically registered fixtures (no import needed)
def redis_proc(request):
"""
Session-scoped fixture that starts/stops Redis server process.
Returns:
RedisExecutor: Redis server process executor
"""
def redis_nooproc(request):
"""
Session-scoped fixture for connecting to existing Redis server.
Returns:
NoopRedis: Connection to existing Redis server
"""
def redisdb(request):
"""
Function-scoped Redis client fixture with automatic cleanup.
Returns:
redis.Redis: Redis client instance
"""Create custom fixtures with specific configurations:
def redis_proc(
executable: Optional[str] = None,
timeout: Optional[int] = None,
host: Optional[str] = None,
port: Union[
None,
str,
int,
Tuple[int, int],
Set[int],
List[str],
List[int],
List[Tuple[int, int]],
List[Set[int]],
List[Union[Set[int], Tuple[int, int]]],
List[Union[str, int, Tuple[int, int], Set[int]]],
] = -1,
username: Optional[str] = None,
password: Optional[str] = None,
db_count: Optional[int] = None,
save: Optional[str] = None,
compression: Optional[bool] = None,
checksum: Optional[bool] = None,
syslog: Optional[bool] = None,
loglevel: Optional[str] = None,
datadir: Optional[str] = None,
modules: Optional[List[str]] = None,
) -> Callable[[FixtureRequest, TempPathFactory], Generator[RedisExecutor, None, None]]:
"""
Factory for creating Redis process fixtures.
Parameters:
- executable: Path to redis-server executable
- timeout: Client connection timeout in seconds
- host: Hostname for Redis server (default: 127.0.0.1)
- port: Port configuration - exact port, random port (None), port ranges, or sets
- username: Authentication username
- password: Authentication password
- db_count: Number of Redis databases (default: 8)
- save: Redis persistence configuration (seconds keys format)
- compression: Enable dump file compression
- checksum: Add checksum to RDB files
- syslog: Enable system logger
- loglevel: Log verbosity level (notice, warning, verbose, debug)
- datadir: Directory for Redis data files
- modules: List of Redis extension module paths to load
Returns:
Callable: Function that creates Redis process fixture
"""
def redis_noproc(
host: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
startup_timeout: int = 15,
) -> Callable[[FixtureRequest], Generator[NoopRedis, None, None]]:
"""
Factory for creating no-process Redis fixtures (connects to existing server).
Parameters:
- host: Redis server hostname
- port: Redis server port
- username: Authentication username
- password: Authentication password
- startup_timeout: Connection timeout in seconds
Returns:
Callable: Function that creates no-process Redis fixture
"""
def redisdb(
process_fixture_name: str,
dbnum: int = 0,
decode: Optional[bool] = None,
) -> Callable[[FixtureRequest], Generator[redis.Redis, None, None]]:
"""
Factory for creating Redis client fixtures.
Parameters:
- process_fixture_name: Name of process fixture to use for connection
- dbnum: Redis database number to use (default: 0)
- decode: Whether to decode Redis responses to strings
Returns:
Callable: Function that creates Redis client fixture
"""pytest-redis can be configured via command line options or pytest.ini settings:
def pytest_addoption(parser: Parser) -> None:
"""
Register pytest command line options and ini settings.
Command line options:
--redis-exec: Redis server executable path
--redis-host: Redis server hostname
--redis-port: Redis server port
--redis-username: Authentication username
--redis-password: Authentication password
--redis-timeout: Client connection timeout
--redis-loglevel: Redis log verbosity level
--redis-db-count: Number of Redis databases
--redis-save: Redis persistence configuration
--redis-compression: Enable dump file compression
--redis-rdbchecksum: Add checksum to RDB files
--redis-syslog: Enable system logger
--redis-client-decode: Decode Redis responses (note: different from ini name)
--redis-datadir: Redis data directory
--redis-modules: Redis extension modules (comma-separated)
Corresponding ini settings:
redis_exec, redis_host, redis_port, redis_username, redis_password,
redis_timeout, redis_loglevel, redis_db_count, redis_save,
redis_compression, redis_rdbchecksum, redis_syslog, redis_decode,
redis_datadir, redis_modules
"""Internal classes that manage Redis processes:
class RedisExecutor:
"""
Redis server process executor.
Attributes:
host: Redis server hostname
port: Redis server port
username: Authentication username
password: Authentication password
unixsocket: Unix socket path for Redis server
executable: Path to redis-server executable
Inherits from mirakuru.TCPExecutor for process management.
"""
MIN_SUPPORTED_VERSION: Version = parse("2.6")
def __init__(
self,
executable: str,
databases: int,
redis_timeout: int,
loglevel: str,
host: str,
port: int,
username: Optional[str] = None,
password: Optional[str] = None,
startup_timeout: int = 60,
save: str = "",
daemonize: str = "no",
rdbcompression: bool = True,
rdbchecksum: bool = False,
syslog_enabled: bool = False,
appendonly: str = "no",
datadir: Optional[Path] = None,
modules: Optional[List[str]] = None,
) -> None:
"""Initialize RedisExecutor."""
def start(self) -> "RedisExecutor":
"""Check supported version before starting."""
@property
def version(self) -> Version:
"""Return redis version."""
class NoopRedis:
"""
No-operation Redis executor for existing servers.
Attributes:
host: Redis server hostname
port: Redis server port
username: Authentication username
password: Authentication password
unixsocket: Unix socket path (typically None for noop)
timeout: Connection timeout
Connects to existing Redis instance without managing process lifecycle.
"""
def __init__(
self,
host: str,
port: int,
username: Optional[str] = None,
password: Optional[str] = None,
unixsocket: Optional[str] = None,
startup_timeout: int = 15,
) -> None:
"""Initialize NoopRedis."""
def start(self) -> "NoopRedis":
"""Start is a NOOP - waits for Redis to be available."""
def redis_available(self) -> bool:
"""Return True if connecting to Redis is possible."""def get_config(request: FixtureRequest) -> RedisConfigType:
"""
Extract Redis configuration from pytest options and ini settings.
Parameters:
request: pytest FixtureRequest object
Returns:
RedisConfigType: Dictionary containing Redis configuration
"""Custom exceptions for Redis-specific issues:
class RedisUnsupported(Exception):
"""Raised when Redis version < 2.6 is detected."""
class RedisMisconfigured(Exception):
"""Raised when redis_exec points to non-existing file."""
class UnixSocketTooLong(Exception):
"""Raised when Unix socket path exceeds maximum length."""[tool:pytest]
redis_host = 127.0.0.1
redis_port = 6379
redis_timeout = 30
redis_loglevel = notice
redis_db_count = 16
redis_decode = falsefrom pytest_redis import factories
# Custom Redis with specific port and authentication
redis_proc2 = factories.redis_proc(port=6381)
redis_proc3 = factories.redis_proc(port=6385, password="secretpassword")
# Create client fixtures for the custom processes
redisdb2 = factories.redisdb('redis_proc2')
redisdb3 = factories.redisdb('redis_proc3')
# No-process fixtures connecting to existing Redis instances
redis_nooproc2 = factories.redis_noproc(port=6381, startup_timeout=1)
redis_nooproc3 = factories.redis_noproc(port=6385, password="secretpassword")
redisdb2_noop = factories.redisdb('redis_nooproc2')
redisdb3_noop = factories.redisdb('redis_nooproc3')
# Redis with extension modules and custom data directory
module_redis_proc = factories.redis_proc(
modules=["/path/to/redis-module.so"],
datadir="/tmp/redis-test-data"
)
module_redis = factories.redisdb('module_redis_proc')from pytest_redis import factories
# Primary Redis for main data
redis_main_proc = factories.redis_proc(port=6379)
redis_main = factories.redisdb('redis_main_proc')
# Secondary Redis for caching
redis_cache_proc = factories.redis_proc(port=6380)
redis_cache = factories.redisdb('redis_cache_proc', dbnum=1)
def test_multi_redis(redis_main, redis_cache):
"""Test using multiple Redis instances."""
redis_main.set('data_key', 'main_data')
redis_cache.set('cache_key', 'cached_data')
assert redis_main.get('data_key') == b'main_data'
assert redis_cache.get('cache_key') == b'cached_data'
# Each Redis instance maintains separate data
assert redis_main.get('cache_key') is None
assert redis_cache.get('data_key') is Nonedef test_database_isolation(redisdb):
"""Each test gets a clean Redis database."""
# Database is empty at start of each test
assert redisdb.dbsize() == 0
# Make changes
redisdb.set('test_key', 'test_value')
redisdb.lpush('test_list', 'item1', 'item2')
# Changes are visible within the test
assert redisdb.get('test_key') == b'test_value'
assert redisdb.llen('test_list') == 2
# Database is automatically cleaned after this test