0
# Data Loading and DataFrame Operations
1
2
Comprehensive data loading capabilities for PySpark DataFrames within Dagster, supporting multiple file formats, database connections, and extensive configuration options through the DataFrame type system.
3
4
## Capabilities
5
6
### DataFrame Type
7
8
The DataFrame type provides automatic loading capabilities for PySpark DataFrames with extensive configuration options for different data sources.
9
10
```python { .api }
11
DataFrame = PythonObjectDagsterType(
12
python_type=pyspark.sql.DataFrame,
13
name="PySparkDataFrame",
14
description="A PySpark data frame.",
15
loader=dataframe_loader
16
)
17
```
18
19
### CSV Data Loading
20
21
Load CSV files with comprehensive parsing and schema options.
22
23
```python { .api }
24
@dagster_type_loader(
25
config_schema=Selector({
26
"csv": Permissive({
27
"path": Field(Any, is_required=True,
28
description="string, or list of strings, for input path(s), or RDD of Strings storing CSV rows"),
29
"schema": Field(Any, is_required=False,
30
description="optional pyspark.sql.types.StructType for input schema or DDL-formatted string"),
31
"sep": Field(String, is_required=False,
32
description="separator for each field and value (default: ',')"),
33
"encoding": Field(String, is_required=False,
34
description="decodes CSV files by given encoding (default: 'UTF-8')"),
35
"quote": Field(String, is_required=False,
36
description="single character for escaping quoted values (default: '\"')"),
37
"escape": Field(String, is_required=False,
38
description="single character for escaping quotes (default: '\\')"),
39
"comment": Field(String, is_required=False,
40
description="single character for skipping comment lines"),
41
"header": Field(Bool, is_required=False,
42
description="uses first line as column names (default: false)"),
43
"inferSchema": Field(Bool, is_required=False,
44
description="infers input schema automatically (requires extra pass, default: false)"),
45
"enforceSchema": Field(Bool, is_required=False,
46
description="forcibly apply specified/inferred schema (default: true)"),
47
"ignoreLeadingWhiteSpace": Field(Bool, is_required=False,
48
description="skip leading whitespaces from values (default: false)"),
49
"ignoreTrailingWhiteSpace": Field(Bool, is_required=False,
50
description="skip trailing whitespaces from values (default: false)"),
51
"nullValue": Field(String, is_required=False,
52
description="string representation of null value (default: empty string)"),
53
"nanValue": Field(String, is_required=False,
54
description="string representation of NaN value (default: 'NaN')"),
55
"positiveInf": Field(String, is_required=False,
56
description="string representation of positive infinity (default: 'Inf')"),
57
"negativeInf": Field(String, is_required=False,
58
description="string representation of negative infinity (default: 'Inf')"),
59
"dateFormat": Field(String, is_required=False,
60
description="date format pattern (default: 'yyyy-MM-dd')"),
61
"timestampFormat": Field(String, is_required=False,
62
description="timestamp format pattern (default: 'yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]')"),
63
"maxColumns": Field(Int, is_required=False,
64
description="hard limit for number of columns (default: 20480)"),
65
"maxCharsPerColumn": Field(Int, is_required=False,
66
description="maximum characters per column (default: -1 unlimited)"),
67
"mode": Field(String, is_required=False,
68
description="mode for corrupt records: PERMISSIVE, DROPMALFORMED, FAILFAST (default: PERMISSIVE)"),
69
"columnNameOfCorruptRecord": Field(String, is_required=False,
70
description="column name for malformed records in PERMISSIVE mode"),
71
"multiLine": Field(Bool, is_required=False,
72
description="parse records spanning multiple lines (default: false)"),
73
"charToEscapeQuoteEscaping": Field(String, is_required=False,
74
description="character for escaping quote escape character"),
75
"samplingRatio": Field(Float, is_required=False,
76
description="fraction of rows for schema inference (default: 1.0)"),
77
"emptyValue": Field(String, is_required=False,
78
description="string representation of empty value (default: empty string)"),
79
"locale": Field(String, is_required=False,
80
description="locale for parsing dates/timestamps (default: 'en-US')"),
81
"lineSep": Field(String, is_required=False,
82
description="line separator for parsing (covers \\r, \\r\\n, \\n)"),
83
"pathGlobFilter": Field(String, is_required=False,
84
description="glob pattern to include files matching pattern"),
85
"recursiveFileLookup": Field(Bool, is_required=False,
86
description="recursively scan directory for files (disables partition discovery)")
87
})
88
}),
89
required_resource_keys={"pyspark"}
90
)
91
def dataframe_loader(context, config): ...
92
```
93
94
### JSON Data Loading
95
96
Load JSON files with parsing options and schema inference.
97
98
```python { .api }
99
@dagster_type_loader(
100
config_schema=Selector({
101
"json": Permissive({
102
"path": Field(Any, is_required=True,
103
description="path to JSON dataset, list of paths, or RDD of JSON objects"),
104
"schema": Field(Any, is_required=False,
105
description="optional pyspark.sql.types.StructType or DDL-formatted string"),
106
"primitivesAsString": Field(Bool, is_required=False,
107
description="infer all primitive values as string type (default: false)"),
108
"prefersDecimal": Field(Bool, is_required=False,
109
description="infer floating-point values as decimal type (default: false)"),
110
"allowComments": Field(Bool, is_required=False,
111
description="ignore Java/C++ style comments (default: false)"),
112
"allowUnquotedFieldNames": Field(String, is_required=False,
113
description="allow unquoted JSON field names (default: false)"),
114
"allowSingleQuotes": Field(Bool, is_required=False,
115
description="allow single quotes in addition to double quotes (default: true)"),
116
"allowNumericLeadingZero": Field(Bool, is_required=False,
117
description="allow leading zeros in numbers (default: false)"),
118
"allowBackslashEscapingAnyCharacter": Field(Bool, is_required=False,
119
description="allow backslash quoting of any character (default: false)"),
120
"mode": Field(String, is_required=False,
121
description="mode for corrupt records (default: PERMISSIVE)"),
122
"columnNameOfCorruptRecord": Field(String, is_required=False,
123
description="column name for malformed records"),
124
"dateFormat": Field(String, is_required=False,
125
description="date format pattern (default: 'yyyy-MM-dd')"),
126
"timestampFormat": Field(String, is_required=False,
127
description="timestamp format pattern"),
128
"multiLine": Field(Bool, is_required=False,
129
description="parse one record spanning multiple lines per file (default: false)"),
130
"allowUnquotedControlChars": Field(Bool, is_required=False,
131
description="allow JSON strings with unquoted control characters"),
132
"encoding": Field(String, is_required=False,
133
description="encoding for JSON files (auto-detected when multiLine=true)"),
134
"lineSep": Field(String, is_required=False,
135
description="line separator (covers \\r, \\r\\n, \\n)"),
136
"samplingRatio": Field(Float, is_required=False,
137
description="fraction of JSON objects for schema inference (default: 1.0)"),
138
"dropFieldIfAllNull": Field(Bool, is_required=False,
139
description="ignore columns with all null values during schema inference (default: false)"),
140
"locale": Field(String, is_required=False,
141
description="locale for parsing dates/timestamps (default: 'en-US')"),
142
"pathGlobFilter": Field(String, is_required=False,
143
description="glob pattern for file inclusion"),
144
"recursiveFileLookup": Field(Bool, is_required=False,
145
description="recursively scan directory for files")
146
})
147
}),
148
required_resource_keys={"pyspark"}
149
)
150
def dataframe_loader(context, config): ...
151
```
152
153
### Parquet Data Loading
154
155
Load Parquet files with minimal configuration required.
156
157
```python { .api }
158
@dagster_type_loader(
159
config_schema=Selector({
160
"parquet": Permissive({
161
"path": Field(Any, is_required=True,
162
description="string or list of strings for input path(s)")
163
})
164
}),
165
required_resource_keys={"pyspark"}
166
)
167
def dataframe_loader(context, config): ...
168
```
169
170
### JDBC Database Loading
171
172
Load data from relational databases via JDBC connections.
173
174
```python { .api }
175
@dagster_type_loader(
176
config_schema=Selector({
177
"jdbc": Permissive({
178
"url": Field(String, is_required=True,
179
description="JDBC URL of the form 'jdbc:subprotocol:subname'"),
180
"table": Field(String, is_required=True,
181
description="name of the table"),
182
"column": Field(String, is_required=False,
183
description="column for partitioning (numeric, date, or timestamp type)"),
184
"lowerBound": Field(Int, is_required=False,
185
description="minimum value of partitioning column"),
186
"upperBound": Field(Int, is_required=False,
187
description="maximum value of partitioning column"),
188
"numPartitions": Field(Int, is_required=False,
189
description="number of partitions"),
190
"predicates": Field(list, is_required=False,
191
description="list of WHERE clause expressions for partitioning"),
192
"properties": Field(Permissive(), is_required=False,
193
description="JDBC connection properties dictionary (user, password, etc.)")
194
})
195
}),
196
required_resource_keys={"pyspark"}
197
)
198
def dataframe_loader(context, config): ...
199
```
200
201
### ORC Data Loading
202
203
Load Apache ORC (Optimized Row Columnar) files.
204
205
```python { .api }
206
@dagster_type_loader(
207
config_schema=Selector({
208
"orc": Permissive({
209
"path": Field(Any, is_required=True,
210
description="string or list of strings for input path(s)")
211
})
212
}),
213
required_resource_keys={"pyspark"}
214
)
215
def dataframe_loader(context, config): ...
216
```
217
218
### Table Data Loading
219
220
Load data from Spark catalog tables.
221
222
```python { .api }
223
@dagster_type_loader(
224
config_schema=Selector({
225
"table": Permissive({
226
"tableName": Field(String, is_required=True,
227
description="name of the table")
228
})
229
}),
230
required_resource_keys={"pyspark"}
231
)
232
def dataframe_loader(context, config): ...
233
```
234
235
### Text File Loading
236
237
Load plain text files with optional line processing.
238
239
```python { .api }
240
@dagster_type_loader(
241
config_schema=Selector({
242
"text": Permissive({
243
"path": Field(Any, is_required=True,
244
description="string or list of strings for input path(s)"),
245
"wholetext": Field(Bool, is_required=False,
246
description="read each file as a single row (default: false)"),
247
"lineSep": Field(String, is_required=False,
248
description="line separator (covers \\r, \\r\\n, \\n)"),
249
"pathGlobFilter": Field(String, is_required=False,
250
description="glob pattern for file inclusion"),
251
"recursiveFileLookup": Field(Bool, is_required=False,
252
description="recursively scan directory for files")
253
})
254
}),
255
required_resource_keys={"pyspark"}
256
)
257
def dataframe_loader(context, config): ...
258
```
259
260
### Generic Data Loading
261
262
Load data using generic Spark DataFrameReader options.
263
264
```python { .api }
265
@dagster_type_loader(
266
config_schema=Selector({
267
"other": Permissive()
268
}),
269
required_resource_keys={"pyspark"}
270
)
271
def dataframe_loader(context, config): ...
272
```
273
274
## Usage Examples
275
276
### Loading CSV with Custom Schema
277
278
```python
279
from dagster import op, job, In
280
from dagster_pyspark import DataFrame, PySparkResource
281
282
@op(ins={"data": In(DataFrame)})
283
def process_csv_data(data):
284
data.show()
285
return data.count()
286
287
@job(
288
resource_defs={"pyspark": PySparkResource(spark_config={})}
289
)
290
def csv_processing_job():
291
process_csv_data()
292
293
# Configuration for CSV input:
294
# {
295
# "ops": {
296
# "process_csv_data": {
297
# "inputs": {
298
# "data": {
299
# "csv": {
300
# "path": "/path/to/data.csv",
301
# "header": true,
302
# "inferSchema": true,
303
# "sep": ",",
304
# "encoding": "UTF-8"
305
# }
306
# }
307
# }
308
# }
309
# }
310
# }
311
```
312
313
### Loading from Database
314
315
```python
316
from dagster import op, job, In
317
from dagster_pyspark import DataFrame, PySparkResource
318
319
@op(ins={"sales_data": In(DataFrame)})
320
def analyze_sales(sales_data):
321
return sales_data.groupBy("region").sum("revenue").collect()
322
323
@job(
324
resource_defs={"pyspark": PySparkResource(spark_config={})}
325
)
326
def sales_analysis_job():
327
analyze_sales()
328
329
# Configuration for JDBC input:
330
# {
331
# "ops": {
332
# "analyze_sales": {
333
# "inputs": {
334
# "sales_data": {
335
# "jdbc": {
336
# "url": "jdbc:postgresql://localhost:5432/sales_db",
337
# "table": "sales_transactions",
338
# "properties": {
339
# "user": "analyst",
340
# "password": "secure_password"
341
# },
342
# "numPartitions": 4
343
# }
344
# }
345
# }
346
# }
347
# }
348
# }
349
```
350
351
### Loading JSON with Schema Inference
352
353
```python
354
from dagster import op, job, In
355
from dagster_pyspark import DataFrame, PySparkResource
356
357
@op(ins={"events": In(DataFrame)})
358
def process_events(events):
359
return events.filter(events.event_type == "purchase").count()
360
361
@job(
362
resource_defs={"pyspark": PySparkResource(spark_config={})}
363
)
364
def event_processing_job():
365
process_events()
366
367
# Configuration for JSON input:
368
# {
369
# "ops": {
370
# "process_events": {
371
# "inputs": {
372
# "events": {
373
# "json": {
374
# "path": "/path/to/events/*.json",
375
# "multiLine": true,
376
# "allowComments": true,
377
# "timestampFormat": "yyyy-MM-dd HH:mm:ss"
378
# }
379
# }
380
# }
381
# }
382
# }
383
# }
384
```