pytest plugin to run the tests with support of pyspark.
npx @tessl/cli install tessl/pypi-pytest-spark@0.8.0A pytest plugin that enables seamless integration of Apache Spark (PySpark) with pytest testing framework. Provides session-scoped fixtures including spark_context and spark_session that can be reused across test sessions, supports flexible Spark configuration through pytest.ini including external library loading via spark.jars.packages, and includes support for both traditional Spark deployments and modern Spark Connect architectures.
pip install pytest-sparkimport pytestThe plugin automatically registers fixtures when installed:
def test_my_case(spark_context):
# spark_context fixture available automatically
pass
def test_spark_session_dataframe(spark_session):
# spark_session fixture available automatically
pass# Example test using spark_context fixture
def test_rdd_operations(spark_context):
test_rdd = spark_context.parallelize([1, 2, 3, 4])
result = test_rdd.map(lambda x: x * 2).collect()
assert result == [2, 4, 6, 8]
# Example test using spark_session fixture (Spark 2.0+)
def test_dataframe_operations(spark_session):
test_df = spark_session.createDataFrame([[1, 3], [2, 4]], "a: int, b: int")
result = test_df.select("a").collect()
assert len(result) == 2# Specify Spark installation directory
pytest --spark_home=/opt/spark
# Specify Spark Connect server URL
pytest --spark_connect_url=sc://localhost:15002[pytest]
spark_home = /opt/spark
spark_connect_url = sc://localhost:15002
spark_options =
spark.app.name: my-pytest-spark-tests
spark.executor.instances: 1
spark.jars.packages: com.databricks:spark-xml_2.12:0.5.0Creates a SparkContext instance with reduced logging that persists across the entire test session.
@pytest.fixture(scope='session')
def spark_context(_spark_session):
"""
Return a SparkContext instance with reduced logging (session scope).
Note: Not supported with Spark Connect functionality.
Returns:
SparkContext: Configured SparkContext instance
Raises:
NotImplemented: If used in Spark Connect mode
"""Creates a Hive-enabled SparkSession instance with reduced logging that persists across the entire test session.
@pytest.fixture(scope='session')
def spark_session(_spark_session):
"""
Return a Hive enabled SparkSession instance with reduced logging (session scope).
Available from Spark 2.0 onwards.
Returns:
SparkSession: Configured SparkSession instance with Hive support
Raises:
Exception: If used with Spark versions < 2.0
"""Integration hooks that pytest automatically calls to configure Spark support.
def pytest_addoption(parser):
"""
Add command-line and ini options for spark configuration.
Args:
parser: pytest argument parser
"""
def pytest_configure(config):
"""
Configure Spark based on pytest configuration.
Args:
config: pytest configuration object
"""
def pytest_report_header(config):
"""
Add Spark version and configuration to pytest report header.
Args:
config: pytest configuration object
Returns:
str: Header lines with Spark information
"""For remote Spark server execution (requires Spark 3.4+ with pyspark[connect] or pyspark-connect):
[pytest]
spark_connect_url = sc://remote-spark-server:15002Or via environment variable:
export SPARK_REMOTE=sc://remote-spark-server:15002The plugin provides optimized defaults for testing environments that minimize resource usage while maintaining functionality:
DEFAULTS = {
'spark.app.name': 'pytest-spark',
'spark.default.parallelism': 1,
'spark.dynamicAllocation.enabled': 'false',
'spark.executor.cores': 1,
'spark.executor.instances': 1,
'spark.io.compression.codec': 'lz4',
'spark.rdd.compress': 'false',
'spark.sql.shuffle.partitions': 1,
'spark.shuffle.compress': 'false',
'spark.sql.catalogImplementation': 'hive'
}These can be overridden via spark_options in pytest.ini.
Common exceptions and error conditions: