or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-loading.mdindex.md
tile.json

tessl/pypi-dagster-pyspark

Package for PySpark Dagster framework components

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-pyspark@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-pyspark@0.27.0

index.mddocs/

Dagster PySpark

Dagster-PySpark is a specialized library that enables seamless integration of Apache PySpark with Dagster's data orchestration framework. It provides essential resources for configuring and managing Spark sessions within Dagster operations and jobs, supporting both modern resource-based approaches and legacy configurations.

Package Information

  • Package Name: dagster-pyspark
  • Language: Python
  • Installation: pip install dagster-pyspark

Core Imports

from dagster_pyspark import (
    PySparkResource,
    LazyPySparkResource,
    pyspark_resource,
    lazy_pyspark_resource,
    DataFrame
)
from dagster import InitResourceContext

Basic Usage

Modern Resource Approach (Recommended)

from dagster import op, job
from dagster_pyspark import PySparkResource

@op
def my_spark_op(pyspark: PySparkResource):
    spark_session = pyspark.spark_session
    df = spark_session.read.json("path/to/data.json")
    df.show()
    return df

@job(
    resource_defs={
        "pyspark": PySparkResource(
            spark_config={
                "spark.executor.memory": "2g",
                "spark.executor.cores": "2"
            }
        )
    }
)
def my_spark_job():
    my_spark_op()

Legacy Resource Approach

from dagster import op, job
from dagster_pyspark import pyspark_resource

@op(required_resource_keys={"pyspark"})
def my_spark_op(context):
    spark_session = context.resources.pyspark.spark_session
    df = spark_session.read.json("path/to/data.json")
    df.show()
    return df

my_pyspark_resource = pyspark_resource.configured({
    "spark_conf": {"spark.executor.memory": "2g"}
})

@job(resource_defs={"pyspark": my_pyspark_resource})
def my_spark_job():
    my_spark_op()

Architecture

Dagster-PySpark follows a resource-based architecture:

  • Resource Classes: Modern PySparkResource and LazyPySparkResource providing direct Spark session access
  • Legacy Functions: pyspark_resource and lazy_pyspark_resource for backwards compatibility
  • Type System: DataFrame type with comprehensive loading capabilities for multiple data formats
  • Session Management: Automatic Spark session lifecycle management within Dagster execution context

The lazy variants avoid Spark session creation overhead until the session is actually accessed, improving performance for jobs that may not always need Spark resources.

Capabilities

PySpark Resource Management

Modern ConfigurableResource classes for managing PySpark sessions with flexible configuration and lifecycle management.

class PySparkResource(ConfigurableResource):
    spark_config: dict[str, Any]
    
    def setup_for_execution(self, context: InitResourceContext) -> None: ...
    
    @property
    def spark_session(self) -> Any: ...
    
    @property  
    def spark_context(self) -> Any: ...

class LazyPySparkResource(ConfigurableResource):
    spark_config: dict[str, Any]
    
    @property
    def spark_session(self) -> Any: ...
    
    @property
    def spark_context(self) -> Any: ...

Legacy Resource Functions

Legacy resource factory functions for backwards compatibility with existing Dagster codebases.

from dagster import resource
from dagster_spark.configs_spark import spark_config

@resource({"spark_conf": spark_config()})
def pyspark_resource(init_context) -> PySparkResource: ...

@resource({"spark_conf": spark_config()})
def lazy_pyspark_resource(init_context: InitResourceContext) -> LazyPySparkResource: ...

DataFrame Type and Data Loading

Comprehensive data loading capabilities with support for multiple file formats, database connections, and extensive configuration options.

DataFrame = PythonObjectDagsterType(
    python_type=pyspark.sql.DataFrame,
    name="PySparkDataFrame",
    description="A PySpark data frame.",
    loader=dataframe_loader
)

Data Loading and DataFrame Operations

Types

Core Types

class PySparkResource(ConfigurableResource):
    """Resource providing access to a PySpark Session for executing PySpark code within Dagster."""
    spark_config: dict[str, Any]
    
    def setup_for_execution(self, context: InitResourceContext) -> None: ...

class LazyPySparkResource(ConfigurableResource):  
    """Lazily-created PySpark resource that avoids session creation until accessed."""
    spark_config: dict[str, Any]