Redis fixtures and fixture factories for Pytest.
95
Pending
Does it follow best practices?
Impact
95%
1.11xAverage score across 10 eval scenarios
Pending
The risk profile of this skill
Redis fixtures and fixture factories for Pytest. This plugin provides three main fixtures for testing code that relies on Redis databases: redisdb (a Redis client fixture that constructs a client and cleans the database after tests), redis_proc (a session-scoped fixture that starts/stops Redis instances), and redis_nooproc (for connecting to already running Redis servers).
pip install pytest-redisFor creating custom fixtures:
from pytest_redis import factoriesIndividual factory imports:
from pytest_redis.factories import redis_proc, redis_noproc, redisdbImport built-in fixtures (automatically available, no explicit import needed):
# These are automatically registered as pytest fixtures when pytest-redis is installed
# Available fixtures: redis_proc, redis_nooproc, redisdbImport internal components (for advanced usage):
from pytest_redis.config import get_config, RedisConfigType
from pytest_redis.executor import RedisExecutor, NoopRedis
from pytest_redis.exception import RedisUnsupported, RedisMisconfigured, UnixSocketTooLongNote: Factory function naming: redis_noproc() (factory function) creates fixtures named ending in _nooproc (the actual fixture), while the built-in fixture is named redis_nooproc.
from typing import Callable, Generator, List, Optional, Set, Tuple, TypedDict, Union
from pathlib import Path
from _pytest.fixtures import FixtureRequest
from _pytest.tmpdir import TempPathFactory
from packaging.version import Version
import redis
class RedisConfigType(TypedDict):
"""Pytest redis config definition type."""
host: str
port: Optional[int]
username: str
password: str
exec: str
timeout: int
loglevel: str
db_count: int
save: str
compression: bool
rdbchecksum: bool
syslog: bool
decode: bool
datadir: str
modules: List[str]The plugin automatically provides three fixtures when installed:
def test_redis(redisdb):
"""Test using the built-in Redis client fixture."""
# The redisdb fixture provides a Redis client and cleans up after the test
redisdb.set('test_key', 'test_value')
# Your code that uses Redis
my_component = MyRedisBasedComponent()
my_component.do_something()
# Verify results
assert redisdb.get('result_key') == b'expected_value'
# Database is automatically cleaned after this testfrom pytest_redis import factories
# Create custom Redis process and client fixtures
redis_my_proc = factories.redis_proc(port=None) # Random port
redis_my = factories.redisdb('redis_my_proc')
def test_custom_redis(redis_my):
"""Test using custom Redis fixtures."""
redis_my.set('custom_key', 'custom_value')
assert redis_my.get('custom_key') == b'custom_value'def test_existing_redis(redis_nooproc):
"""Test connecting to an already running Redis server."""
# Connects to Redis server at configured host/port (default: 127.0.0.1:6379)
# Does not start/stop Redis process - expects Redis to already be running
redis_nooproc.set('key', 'value')
assert redis_nooproc.get('key') == b'value'Three fixtures are automatically available when pytest-redis is installed:
# Automatically registered fixtures (no import needed)
def redis_proc(request):
"""
Session-scoped fixture that starts/stops Redis server process.
Returns:
RedisExecutor: Redis server process executor
"""
def redis_nooproc(request):
"""
Session-scoped fixture for connecting to existing Redis server.
Returns:
NoopRedis: Connection to existing Redis server
"""
def redisdb(request):
"""
Function-scoped Redis client fixture with automatic cleanup.
Returns:
redis.Redis: Redis client instance
"""Create custom fixtures with specific configurations:
def redis_proc(
executable: Optional[str] = None,
timeout: Optional[int] = None,
host: Optional[str] = None,
port: Union[
None,
str,
int,
Tuple[int, int],
Set[int],
List[str],
List[int],
List[Tuple[int, int]],
List[Set[int]],
List[Union[Set[int], Tuple[int, int]]],
List[Union[str, int, Tuple[int, int], Set[int]]],
] = -1,
username: Optional[str] = None,
password: Optional[str] = None,
db_count: Optional[int] = None,
save: Optional[str] = None,
compression: Optional[bool] = None,
checksum: Optional[bool] = None,
syslog: Optional[bool] = None,
loglevel: Optional[str] = None,
datadir: Optional[str] = None,
modules: Optional[List[str]] = None,
) -> Callable[[FixtureRequest, TempPathFactory], Generator[RedisExecutor, None, None]]:
"""
Factory for creating Redis process fixtures.
Parameters:
- executable: Path to redis-server executable
- timeout: Client connection timeout in seconds
- host: Hostname for Redis server (default: 127.0.0.1)
- port: Port configuration - exact port, random port (None), port ranges, or sets
- username: Authentication username
- password: Authentication password
- db_count: Number of Redis databases (default: 8)
- save: Redis persistence configuration (seconds keys format)
- compression: Enable dump file compression
- checksum: Add checksum to RDB files
- syslog: Enable system logger
- loglevel: Log verbosity level (notice, warning, verbose, debug)
- datadir: Directory for Redis data files
- modules: List of Redis extension module paths to load
Returns:
Callable: Function that creates Redis process fixture
"""
def redis_noproc(
host: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
startup_timeout: int = 15,
) -> Callable[[FixtureRequest], Generator[NoopRedis, None, None]]:
"""
Factory for creating no-process Redis fixtures (connects to existing server).
Parameters:
- host: Redis server hostname
- port: Redis server port
- username: Authentication username
- password: Authentication password
- startup_timeout: Connection timeout in seconds
Returns:
Callable: Function that creates no-process Redis fixture
"""
def redisdb(
process_fixture_name: str,
dbnum: int = 0,
decode: Optional[bool] = None,
) -> Callable[[FixtureRequest], Generator[redis.Redis, None, None]]:
"""
Factory for creating Redis client fixtures.
Parameters:
- process_fixture_name: Name of process fixture to use for connection
- dbnum: Redis database number to use (default: 0)
- decode: Whether to decode Redis responses to strings
Returns:
Callable: Function that creates Redis client fixture
"""pytest-redis can be configured via command line options or pytest.ini settings:
def pytest_addoption(parser: Parser) -> None:
"""
Register pytest command line options and ini settings.
Command line options:
--redis-exec: Redis server executable path
--redis-host: Redis server hostname
--redis-port: Redis server port
--redis-username: Authentication username
--redis-password: Authentication password
--redis-timeout: Client connection timeout
--redis-loglevel: Redis log verbosity level
--redis-db-count: Number of Redis databases
--redis-save: Redis persistence configuration
--redis-compression: Enable dump file compression
--redis-rdbchecksum: Add checksum to RDB files
--redis-syslog: Enable system logger
--redis-client-decode: Decode Redis responses (note: different from ini name)
--redis-datadir: Redis data directory
--redis-modules: Redis extension modules (comma-separated)
Corresponding ini settings:
redis_exec, redis_host, redis_port, redis_username, redis_password,
redis_timeout, redis_loglevel, redis_db_count, redis_save,
redis_compression, redis_rdbchecksum, redis_syslog, redis_decode,
redis_datadir, redis_modules
"""Internal classes that manage Redis processes:
class RedisExecutor:
"""
Redis server process executor.
Attributes:
host: Redis server hostname
port: Redis server port
username: Authentication username
password: Authentication password
unixsocket: Unix socket path for Redis server
executable: Path to redis-server executable
Inherits from mirakuru.TCPExecutor for process management.
"""
MIN_SUPPORTED_VERSION: Version = parse("2.6")
def __init__(
self,
executable: str,
databases: int,
redis_timeout: int,
loglevel: str,
host: str,
port: int,
username: Optional[str] = None,
password: Optional[str] = None,
startup_timeout: int = 60,
save: str = "",
daemonize: str = "no",
rdbcompression: bool = True,
rdbchecksum: bool = False,
syslog_enabled: bool = False,
appendonly: str = "no",
datadir: Optional[Path] = None,
modules: Optional[List[str]] = None,
) -> None:
"""Initialize RedisExecutor."""
def start(self) -> "RedisExecutor":
"""Check supported version before starting."""
@property
def version(self) -> Version:
"""Return redis version."""
class NoopRedis:
"""
No-operation Redis executor for existing servers.
Attributes:
host: Redis server hostname
port: Redis server port
username: Authentication username
password: Authentication password
unixsocket: Unix socket path (typically None for noop)
timeout: Connection timeout
Connects to existing Redis instance without managing process lifecycle.
"""
def __init__(
self,
host: str,
port: int,
username: Optional[str] = None,
password: Optional[str] = None,
unixsocket: Optional[str] = None,
startup_timeout: int = 15,
) -> None:
"""Initialize NoopRedis."""
def start(self) -> "NoopRedis":
"""Start is a NOOP - waits for Redis to be available."""
def redis_available(self) -> bool:
"""Return True if connecting to Redis is possible."""def get_config(request: FixtureRequest) -> RedisConfigType:
"""
Extract Redis configuration from pytest options and ini settings.
Parameters:
request: pytest FixtureRequest object
Returns:
RedisConfigType: Dictionary containing Redis configuration
"""Custom exceptions for Redis-specific issues:
class RedisUnsupported(Exception):
"""Raised when Redis version < 2.6 is detected."""
class RedisMisconfigured(Exception):
"""Raised when redis_exec points to non-existing file."""
class UnixSocketTooLong(Exception):
"""Raised when Unix socket path exceeds maximum length."""[tool:pytest]
redis_host = 127.0.0.1
redis_port = 6379
redis_timeout = 30
redis_loglevel = notice
redis_db_count = 16
redis_decode = falsefrom pytest_redis import factories
# Custom Redis with specific port and authentication
redis_proc2 = factories.redis_proc(port=6381)
redis_proc3 = factories.redis_proc(port=6385, password="secretpassword")
# Create client fixtures for the custom processes
redisdb2 = factories.redisdb('redis_proc2')
redisdb3 = factories.redisdb('redis_proc3')
# No-process fixtures connecting to existing Redis instances
redis_nooproc2 = factories.redis_noproc(port=6381, startup_timeout=1)
redis_nooproc3 = factories.redis_noproc(port=6385, password="secretpassword")
redisdb2_noop = factories.redisdb('redis_nooproc2')
redisdb3_noop = factories.redisdb('redis_nooproc3')
# Redis with extension modules and custom data directory
module_redis_proc = factories.redis_proc(
modules=["/path/to/redis-module.so"],
datadir="/tmp/redis-test-data"
)
module_redis = factories.redisdb('module_redis_proc')from pytest_redis import factories
# Primary Redis for main data
redis_main_proc = factories.redis_proc(port=6379)
redis_main = factories.redisdb('redis_main_proc')
# Secondary Redis for caching
redis_cache_proc = factories.redis_proc(port=6380)
redis_cache = factories.redisdb('redis_cache_proc', dbnum=1)
def test_multi_redis(redis_main, redis_cache):
"""Test using multiple Redis instances."""
redis_main.set('data_key', 'main_data')
redis_cache.set('cache_key', 'cached_data')
assert redis_main.get('data_key') == b'main_data'
assert redis_cache.get('cache_key') == b'cached_data'
# Each Redis instance maintains separate data
assert redis_main.get('cache_key') is None
assert redis_cache.get('data_key') is Nonedef test_database_isolation(redisdb):
"""Each test gets a clean Redis database."""
# Database is empty at start of each test
assert redisdb.dbsize() == 0
# Make changes
redisdb.set('test_key', 'test_value')
redisdb.lpush('test_list', 'item1', 'item2')
# Changes are visible within the test
assert redisdb.get('test_key') == b'test_value'
assert redisdb.llen('test_list') == 2
# Database is automatically cleaned after this test