Package for PySpark Dagster framework components
—
Comprehensive data loading capabilities for PySpark DataFrames within Dagster, supporting multiple file formats, database connections, and extensive configuration options through the DataFrame type system.
The DataFrame type provides automatic loading capabilities for PySpark DataFrames with extensive configuration options for different data sources.
DataFrame = PythonObjectDagsterType(
python_type=pyspark.sql.DataFrame,
name="PySparkDataFrame",
description="A PySpark data frame.",
loader=dataframe_loader
)Load CSV files with comprehensive parsing and schema options.
@dagster_type_loader(
config_schema=Selector({
"csv": Permissive({
"path": Field(Any, is_required=True,
description="string, or list of strings, for input path(s), or RDD of Strings storing CSV rows"),
"schema": Field(Any, is_required=False,
description="optional pyspark.sql.types.StructType for input schema or DDL-formatted string"),
"sep": Field(String, is_required=False,
description="separator for each field and value (default: ',')"),
"encoding": Field(String, is_required=False,
description="decodes CSV files by given encoding (default: 'UTF-8')"),
"quote": Field(String, is_required=False,
description="single character for escaping quoted values (default: '\"')"),
"escape": Field(String, is_required=False,
description="single character for escaping quotes (default: '\\')"),
"comment": Field(String, is_required=False,
description="single character for skipping comment lines"),
"header": Field(Bool, is_required=False,
description="uses first line as column names (default: false)"),
"inferSchema": Field(Bool, is_required=False,
description="infers input schema automatically (requires extra pass, default: false)"),
"enforceSchema": Field(Bool, is_required=False,
description="forcibly apply specified/inferred schema (default: true)"),
"ignoreLeadingWhiteSpace": Field(Bool, is_required=False,
description="skip leading whitespaces from values (default: false)"),
"ignoreTrailingWhiteSpace": Field(Bool, is_required=False,
description="skip trailing whitespaces from values (default: false)"),
"nullValue": Field(String, is_required=False,
description="string representation of null value (default: empty string)"),
"nanValue": Field(String, is_required=False,
description="string representation of NaN value (default: 'NaN')"),
"positiveInf": Field(String, is_required=False,
description="string representation of positive infinity (default: 'Inf')"),
"negativeInf": Field(String, is_required=False,
description="string representation of negative infinity (default: 'Inf')"),
"dateFormat": Field(String, is_required=False,
description="date format pattern (default: 'yyyy-MM-dd')"),
"timestampFormat": Field(String, is_required=False,
description="timestamp format pattern (default: 'yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]')"),
"maxColumns": Field(Int, is_required=False,
description="hard limit for number of columns (default: 20480)"),
"maxCharsPerColumn": Field(Int, is_required=False,
description="maximum characters per column (default: -1 unlimited)"),
"mode": Field(String, is_required=False,
description="mode for corrupt records: PERMISSIVE, DROPMALFORMED, FAILFAST (default: PERMISSIVE)"),
"columnNameOfCorruptRecord": Field(String, is_required=False,
description="column name for malformed records in PERMISSIVE mode"),
"multiLine": Field(Bool, is_required=False,
description="parse records spanning multiple lines (default: false)"),
"charToEscapeQuoteEscaping": Field(String, is_required=False,
description="character for escaping quote escape character"),
"samplingRatio": Field(Float, is_required=False,
description="fraction of rows for schema inference (default: 1.0)"),
"emptyValue": Field(String, is_required=False,
description="string representation of empty value (default: empty string)"),
"locale": Field(String, is_required=False,
description="locale for parsing dates/timestamps (default: 'en-US')"),
"lineSep": Field(String, is_required=False,
description="line separator for parsing (covers \\r, \\r\\n, \\n)"),
"pathGlobFilter": Field(String, is_required=False,
description="glob pattern to include files matching pattern"),
"recursiveFileLookup": Field(Bool, is_required=False,
description="recursively scan directory for files (disables partition discovery)")
})
}),
required_resource_keys={"pyspark"}
)
def dataframe_loader(context, config): ...Load JSON files with parsing options and schema inference.
@dagster_type_loader(
config_schema=Selector({
"json": Permissive({
"path": Field(Any, is_required=True,
description="path to JSON dataset, list of paths, or RDD of JSON objects"),
"schema": Field(Any, is_required=False,
description="optional pyspark.sql.types.StructType or DDL-formatted string"),
"primitivesAsString": Field(Bool, is_required=False,
description="infer all primitive values as string type (default: false)"),
"prefersDecimal": Field(Bool, is_required=False,
description="infer floating-point values as decimal type (default: false)"),
"allowComments": Field(Bool, is_required=False,
description="ignore Java/C++ style comments (default: false)"),
"allowUnquotedFieldNames": Field(String, is_required=False,
description="allow unquoted JSON field names (default: false)"),
"allowSingleQuotes": Field(Bool, is_required=False,
description="allow single quotes in addition to double quotes (default: true)"),
"allowNumericLeadingZero": Field(Bool, is_required=False,
description="allow leading zeros in numbers (default: false)"),
"allowBackslashEscapingAnyCharacter": Field(Bool, is_required=False,
description="allow backslash quoting of any character (default: false)"),
"mode": Field(String, is_required=False,
description="mode for corrupt records (default: PERMISSIVE)"),
"columnNameOfCorruptRecord": Field(String, is_required=False,
description="column name for malformed records"),
"dateFormat": Field(String, is_required=False,
description="date format pattern (default: 'yyyy-MM-dd')"),
"timestampFormat": Field(String, is_required=False,
description="timestamp format pattern"),
"multiLine": Field(Bool, is_required=False,
description="parse one record spanning multiple lines per file (default: false)"),
"allowUnquotedControlChars": Field(Bool, is_required=False,
description="allow JSON strings with unquoted control characters"),
"encoding": Field(String, is_required=False,
description="encoding for JSON files (auto-detected when multiLine=true)"),
"lineSep": Field(String, is_required=False,
description="line separator (covers \\r, \\r\\n, \\n)"),
"samplingRatio": Field(Float, is_required=False,
description="fraction of JSON objects for schema inference (default: 1.0)"),
"dropFieldIfAllNull": Field(Bool, is_required=False,
description="ignore columns with all null values during schema inference (default: false)"),
"locale": Field(String, is_required=False,
description="locale for parsing dates/timestamps (default: 'en-US')"),
"pathGlobFilter": Field(String, is_required=False,
description="glob pattern for file inclusion"),
"recursiveFileLookup": Field(Bool, is_required=False,
description="recursively scan directory for files")
})
}),
required_resource_keys={"pyspark"}
)
def dataframe_loader(context, config): ...Load Parquet files with minimal configuration required.
@dagster_type_loader(
config_schema=Selector({
"parquet": Permissive({
"path": Field(Any, is_required=True,
description="string or list of strings for input path(s)")
})
}),
required_resource_keys={"pyspark"}
)
def dataframe_loader(context, config): ...Load data from relational databases via JDBC connections.
@dagster_type_loader(
config_schema=Selector({
"jdbc": Permissive({
"url": Field(String, is_required=True,
description="JDBC URL of the form 'jdbc:subprotocol:subname'"),
"table": Field(String, is_required=True,
description="name of the table"),
"column": Field(String, is_required=False,
description="column for partitioning (numeric, date, or timestamp type)"),
"lowerBound": Field(Int, is_required=False,
description="minimum value of partitioning column"),
"upperBound": Field(Int, is_required=False,
description="maximum value of partitioning column"),
"numPartitions": Field(Int, is_required=False,
description="number of partitions"),
"predicates": Field(list, is_required=False,
description="list of WHERE clause expressions for partitioning"),
"properties": Field(Permissive(), is_required=False,
description="JDBC connection properties dictionary (user, password, etc.)")
})
}),
required_resource_keys={"pyspark"}
)
def dataframe_loader(context, config): ...Load Apache ORC (Optimized Row Columnar) files.
@dagster_type_loader(
config_schema=Selector({
"orc": Permissive({
"path": Field(Any, is_required=True,
description="string or list of strings for input path(s)")
})
}),
required_resource_keys={"pyspark"}
)
def dataframe_loader(context, config): ...Load data from Spark catalog tables.
@dagster_type_loader(
config_schema=Selector({
"table": Permissive({
"tableName": Field(String, is_required=True,
description="name of the table")
})
}),
required_resource_keys={"pyspark"}
)
def dataframe_loader(context, config): ...Load plain text files with optional line processing.
@dagster_type_loader(
config_schema=Selector({
"text": Permissive({
"path": Field(Any, is_required=True,
description="string or list of strings for input path(s)"),
"wholetext": Field(Bool, is_required=False,
description="read each file as a single row (default: false)"),
"lineSep": Field(String, is_required=False,
description="line separator (covers \\r, \\r\\n, \\n)"),
"pathGlobFilter": Field(String, is_required=False,
description="glob pattern for file inclusion"),
"recursiveFileLookup": Field(Bool, is_required=False,
description="recursively scan directory for files")
})
}),
required_resource_keys={"pyspark"}
)
def dataframe_loader(context, config): ...Load data using generic Spark DataFrameReader options.
@dagster_type_loader(
config_schema=Selector({
"other": Permissive()
}),
required_resource_keys={"pyspark"}
)
def dataframe_loader(context, config): ...from dagster import op, job, In
from dagster_pyspark import DataFrame, PySparkResource
@op(ins={"data": In(DataFrame)})
def process_csv_data(data):
data.show()
return data.count()
@job(
resource_defs={"pyspark": PySparkResource(spark_config={})}
)
def csv_processing_job():
process_csv_data()
# Configuration for CSV input:
# {
# "ops": {
# "process_csv_data": {
# "inputs": {
# "data": {
# "csv": {
# "path": "/path/to/data.csv",
# "header": true,
# "inferSchema": true,
# "sep": ",",
# "encoding": "UTF-8"
# }
# }
# }
# }
# }
# }from dagster import op, job, In
from dagster_pyspark import DataFrame, PySparkResource
@op(ins={"sales_data": In(DataFrame)})
def analyze_sales(sales_data):
return sales_data.groupBy("region").sum("revenue").collect()
@job(
resource_defs={"pyspark": PySparkResource(spark_config={})}
)
def sales_analysis_job():
analyze_sales()
# Configuration for JDBC input:
# {
# "ops": {
# "analyze_sales": {
# "inputs": {
# "sales_data": {
# "jdbc": {
# "url": "jdbc:postgresql://localhost:5432/sales_db",
# "table": "sales_transactions",
# "properties": {
# "user": "analyst",
# "password": "secure_password"
# },
# "numPartitions": 4
# }
# }
# }
# }
# }
# }from dagster import op, job, In
from dagster_pyspark import DataFrame, PySparkResource
@op(ins={"events": In(DataFrame)})
def process_events(events):
return events.filter(events.event_type == "purchase").count()
@job(
resource_defs={"pyspark": PySparkResource(spark_config={})}
)
def event_processing_job():
process_events()
# Configuration for JSON input:
# {
# "ops": {
# "process_events": {
# "inputs": {
# "events": {
# "json": {
# "path": "/path/to/events/*.json",
# "multiLine": true,
# "allowComments": true,
# "timestampFormat": "yyyy-MM-dd HH:mm:ss"
# }
# }
# }
# }
# }
# }Install with Tessl CLI
npx tessl i tessl/pypi-dagster-pyspark