Package for PySpark Dagster framework components
npx @tessl/cli install tessl/pypi-dagster-pyspark@0.27.0Dagster-PySpark is a specialized library that enables seamless integration of Apache PySpark with Dagster's data orchestration framework. It provides essential resources for configuring and managing Spark sessions within Dagster operations and jobs, supporting both modern resource-based approaches and legacy configurations.
pip install dagster-pysparkfrom dagster_pyspark import (
PySparkResource,
LazyPySparkResource,
pyspark_resource,
lazy_pyspark_resource,
DataFrame
)
from dagster import InitResourceContextfrom dagster import op, job
from dagster_pyspark import PySparkResource
@op
def my_spark_op(pyspark: PySparkResource):
spark_session = pyspark.spark_session
df = spark_session.read.json("path/to/data.json")
df.show()
return df
@job(
resource_defs={
"pyspark": PySparkResource(
spark_config={
"spark.executor.memory": "2g",
"spark.executor.cores": "2"
}
)
}
)
def my_spark_job():
my_spark_op()from dagster import op, job
from dagster_pyspark import pyspark_resource
@op(required_resource_keys={"pyspark"})
def my_spark_op(context):
spark_session = context.resources.pyspark.spark_session
df = spark_session.read.json("path/to/data.json")
df.show()
return df
my_pyspark_resource = pyspark_resource.configured({
"spark_conf": {"spark.executor.memory": "2g"}
})
@job(resource_defs={"pyspark": my_pyspark_resource})
def my_spark_job():
my_spark_op()Dagster-PySpark follows a resource-based architecture:
PySparkResource and LazyPySparkResource providing direct Spark session accesspyspark_resource and lazy_pyspark_resource for backwards compatibilityDataFrame type with comprehensive loading capabilities for multiple data formatsThe lazy variants avoid Spark session creation overhead until the session is actually accessed, improving performance for jobs that may not always need Spark resources.
Modern ConfigurableResource classes for managing PySpark sessions with flexible configuration and lifecycle management.
class PySparkResource(ConfigurableResource):
spark_config: dict[str, Any]
def setup_for_execution(self, context: InitResourceContext) -> None: ...
@property
def spark_session(self) -> Any: ...
@property
def spark_context(self) -> Any: ...
class LazyPySparkResource(ConfigurableResource):
spark_config: dict[str, Any]
@property
def spark_session(self) -> Any: ...
@property
def spark_context(self) -> Any: ...Legacy resource factory functions for backwards compatibility with existing Dagster codebases.
from dagster import resource
from dagster_spark.configs_spark import spark_config
@resource({"spark_conf": spark_config()})
def pyspark_resource(init_context) -> PySparkResource: ...
@resource({"spark_conf": spark_config()})
def lazy_pyspark_resource(init_context: InitResourceContext) -> LazyPySparkResource: ...Comprehensive data loading capabilities with support for multiple file formats, database connections, and extensive configuration options.
DataFrame = PythonObjectDagsterType(
python_type=pyspark.sql.DataFrame,
name="PySparkDataFrame",
description="A PySpark data frame.",
loader=dataframe_loader
)Data Loading and DataFrame Operations
class PySparkResource(ConfigurableResource):
"""Resource providing access to a PySpark Session for executing PySpark code within Dagster."""
spark_config: dict[str, Any]
def setup_for_execution(self, context: InitResourceContext) -> None: ...
class LazyPySparkResource(ConfigurableResource):
"""Lazily-created PySpark resource that avoids session creation until accessed."""
spark_config: dict[str, Any]