or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-loading.mdindex.md

data-loading.mddocs/

0

# Data Loading and DataFrame Operations

1

2

Comprehensive data loading capabilities for PySpark DataFrames within Dagster, supporting multiple file formats, database connections, and extensive configuration options through the DataFrame type system.

3

4

## Capabilities

5

6

### DataFrame Type

7

8

The DataFrame type provides automatic loading capabilities for PySpark DataFrames with extensive configuration options for different data sources.

9

10

```python { .api }

11

DataFrame = PythonObjectDagsterType(

12

python_type=pyspark.sql.DataFrame,

13

name="PySparkDataFrame",

14

description="A PySpark data frame.",

15

loader=dataframe_loader

16

)

17

```

18

19

### CSV Data Loading

20

21

Load CSV files with comprehensive parsing and schema options.

22

23

```python { .api }

24

@dagster_type_loader(

25

config_schema=Selector({

26

"csv": Permissive({

27

"path": Field(Any, is_required=True,

28

description="string, or list of strings, for input path(s), or RDD of Strings storing CSV rows"),

29

"schema": Field(Any, is_required=False,

30

description="optional pyspark.sql.types.StructType for input schema or DDL-formatted string"),

31

"sep": Field(String, is_required=False,

32

description="separator for each field and value (default: ',')"),

33

"encoding": Field(String, is_required=False,

34

description="decodes CSV files by given encoding (default: 'UTF-8')"),

35

"quote": Field(String, is_required=False,

36

description="single character for escaping quoted values (default: '\"')"),

37

"escape": Field(String, is_required=False,

38

description="single character for escaping quotes (default: '\\')"),

39

"comment": Field(String, is_required=False,

40

description="single character for skipping comment lines"),

41

"header": Field(Bool, is_required=False,

42

description="uses first line as column names (default: false)"),

43

"inferSchema": Field(Bool, is_required=False,

44

description="infers input schema automatically (requires extra pass, default: false)"),

45

"enforceSchema": Field(Bool, is_required=False,

46

description="forcibly apply specified/inferred schema (default: true)"),

47

"ignoreLeadingWhiteSpace": Field(Bool, is_required=False,

48

description="skip leading whitespaces from values (default: false)"),

49

"ignoreTrailingWhiteSpace": Field(Bool, is_required=False,

50

description="skip trailing whitespaces from values (default: false)"),

51

"nullValue": Field(String, is_required=False,

52

description="string representation of null value (default: empty string)"),

53

"nanValue": Field(String, is_required=False,

54

description="string representation of NaN value (default: 'NaN')"),

55

"positiveInf": Field(String, is_required=False,

56

description="string representation of positive infinity (default: 'Inf')"),

57

"negativeInf": Field(String, is_required=False,

58

description="string representation of negative infinity (default: 'Inf')"),

59

"dateFormat": Field(String, is_required=False,

60

description="date format pattern (default: 'yyyy-MM-dd')"),

61

"timestampFormat": Field(String, is_required=False,

62

description="timestamp format pattern (default: 'yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]')"),

63

"maxColumns": Field(Int, is_required=False,

64

description="hard limit for number of columns (default: 20480)"),

65

"maxCharsPerColumn": Field(Int, is_required=False,

66

description="maximum characters per column (default: -1 unlimited)"),

67

"mode": Field(String, is_required=False,

68

description="mode for corrupt records: PERMISSIVE, DROPMALFORMED, FAILFAST (default: PERMISSIVE)"),

69

"columnNameOfCorruptRecord": Field(String, is_required=False,

70

description="column name for malformed records in PERMISSIVE mode"),

71

"multiLine": Field(Bool, is_required=False,

72

description="parse records spanning multiple lines (default: false)"),

73

"charToEscapeQuoteEscaping": Field(String, is_required=False,

74

description="character for escaping quote escape character"),

75

"samplingRatio": Field(Float, is_required=False,

76

description="fraction of rows for schema inference (default: 1.0)"),

77

"emptyValue": Field(String, is_required=False,

78

description="string representation of empty value (default: empty string)"),

79

"locale": Field(String, is_required=False,

80

description="locale for parsing dates/timestamps (default: 'en-US')"),

81

"lineSep": Field(String, is_required=False,

82

description="line separator for parsing (covers \\r, \\r\\n, \\n)"),

83

"pathGlobFilter": Field(String, is_required=False,

84

description="glob pattern to include files matching pattern"),

85

"recursiveFileLookup": Field(Bool, is_required=False,

86

description="recursively scan directory for files (disables partition discovery)")

87

})

88

}),

89

required_resource_keys={"pyspark"}

90

)

91

def dataframe_loader(context, config): ...

92

```

93

94

### JSON Data Loading

95

96

Load JSON files with parsing options and schema inference.

97

98

```python { .api }

99

@dagster_type_loader(

100

config_schema=Selector({

101

"json": Permissive({

102

"path": Field(Any, is_required=True,

103

description="path to JSON dataset, list of paths, or RDD of JSON objects"),

104

"schema": Field(Any, is_required=False,

105

description="optional pyspark.sql.types.StructType or DDL-formatted string"),

106

"primitivesAsString": Field(Bool, is_required=False,

107

description="infer all primitive values as string type (default: false)"),

108

"prefersDecimal": Field(Bool, is_required=False,

109

description="infer floating-point values as decimal type (default: false)"),

110

"allowComments": Field(Bool, is_required=False,

111

description="ignore Java/C++ style comments (default: false)"),

112

"allowUnquotedFieldNames": Field(String, is_required=False,

113

description="allow unquoted JSON field names (default: false)"),

114

"allowSingleQuotes": Field(Bool, is_required=False,

115

description="allow single quotes in addition to double quotes (default: true)"),

116

"allowNumericLeadingZero": Field(Bool, is_required=False,

117

description="allow leading zeros in numbers (default: false)"),

118

"allowBackslashEscapingAnyCharacter": Field(Bool, is_required=False,

119

description="allow backslash quoting of any character (default: false)"),

120

"mode": Field(String, is_required=False,

121

description="mode for corrupt records (default: PERMISSIVE)"),

122

"columnNameOfCorruptRecord": Field(String, is_required=False,

123

description="column name for malformed records"),

124

"dateFormat": Field(String, is_required=False,

125

description="date format pattern (default: 'yyyy-MM-dd')"),

126

"timestampFormat": Field(String, is_required=False,

127

description="timestamp format pattern"),

128

"multiLine": Field(Bool, is_required=False,

129

description="parse one record spanning multiple lines per file (default: false)"),

130

"allowUnquotedControlChars": Field(Bool, is_required=False,

131

description="allow JSON strings with unquoted control characters"),

132

"encoding": Field(String, is_required=False,

133

description="encoding for JSON files (auto-detected when multiLine=true)"),

134

"lineSep": Field(String, is_required=False,

135

description="line separator (covers \\r, \\r\\n, \\n)"),

136

"samplingRatio": Field(Float, is_required=False,

137

description="fraction of JSON objects for schema inference (default: 1.0)"),

138

"dropFieldIfAllNull": Field(Bool, is_required=False,

139

description="ignore columns with all null values during schema inference (default: false)"),

140

"locale": Field(String, is_required=False,

141

description="locale for parsing dates/timestamps (default: 'en-US')"),

142

"pathGlobFilter": Field(String, is_required=False,

143

description="glob pattern for file inclusion"),

144

"recursiveFileLookup": Field(Bool, is_required=False,

145

description="recursively scan directory for files")

146

})

147

}),

148

required_resource_keys={"pyspark"}

149

)

150

def dataframe_loader(context, config): ...

151

```

152

153

### Parquet Data Loading

154

155

Load Parquet files with minimal configuration required.

156

157

```python { .api }

158

@dagster_type_loader(

159

config_schema=Selector({

160

"parquet": Permissive({

161

"path": Field(Any, is_required=True,

162

description="string or list of strings for input path(s)")

163

})

164

}),

165

required_resource_keys={"pyspark"}

166

)

167

def dataframe_loader(context, config): ...

168

```

169

170

### JDBC Database Loading

171

172

Load data from relational databases via JDBC connections.

173

174

```python { .api }

175

@dagster_type_loader(

176

config_schema=Selector({

177

"jdbc": Permissive({

178

"url": Field(String, is_required=True,

179

description="JDBC URL of the form 'jdbc:subprotocol:subname'"),

180

"table": Field(String, is_required=True,

181

description="name of the table"),

182

"column": Field(String, is_required=False,

183

description="column for partitioning (numeric, date, or timestamp type)"),

184

"lowerBound": Field(Int, is_required=False,

185

description="minimum value of partitioning column"),

186

"upperBound": Field(Int, is_required=False,

187

description="maximum value of partitioning column"),

188

"numPartitions": Field(Int, is_required=False,

189

description="number of partitions"),

190

"predicates": Field(list, is_required=False,

191

description="list of WHERE clause expressions for partitioning"),

192

"properties": Field(Permissive(), is_required=False,

193

description="JDBC connection properties dictionary (user, password, etc.)")

194

})

195

}),

196

required_resource_keys={"pyspark"}

197

)

198

def dataframe_loader(context, config): ...

199

```

200

201

### ORC Data Loading

202

203

Load Apache ORC (Optimized Row Columnar) files.

204

205

```python { .api }

206

@dagster_type_loader(

207

config_schema=Selector({

208

"orc": Permissive({

209

"path": Field(Any, is_required=True,

210

description="string or list of strings for input path(s)")

211

})

212

}),

213

required_resource_keys={"pyspark"}

214

)

215

def dataframe_loader(context, config): ...

216

```

217

218

### Table Data Loading

219

220

Load data from Spark catalog tables.

221

222

```python { .api }

223

@dagster_type_loader(

224

config_schema=Selector({

225

"table": Permissive({

226

"tableName": Field(String, is_required=True,

227

description="name of the table")

228

})

229

}),

230

required_resource_keys={"pyspark"}

231

)

232

def dataframe_loader(context, config): ...

233

```

234

235

### Text File Loading

236

237

Load plain text files with optional line processing.

238

239

```python { .api }

240

@dagster_type_loader(

241

config_schema=Selector({

242

"text": Permissive({

243

"path": Field(Any, is_required=True,

244

description="string or list of strings for input path(s)"),

245

"wholetext": Field(Bool, is_required=False,

246

description="read each file as a single row (default: false)"),

247

"lineSep": Field(String, is_required=False,

248

description="line separator (covers \\r, \\r\\n, \\n)"),

249

"pathGlobFilter": Field(String, is_required=False,

250

description="glob pattern for file inclusion"),

251

"recursiveFileLookup": Field(Bool, is_required=False,

252

description="recursively scan directory for files")

253

})

254

}),

255

required_resource_keys={"pyspark"}

256

)

257

def dataframe_loader(context, config): ...

258

```

259

260

### Generic Data Loading

261

262

Load data using generic Spark DataFrameReader options.

263

264

```python { .api }

265

@dagster_type_loader(

266

config_schema=Selector({

267

"other": Permissive()

268

}),

269

required_resource_keys={"pyspark"}

270

)

271

def dataframe_loader(context, config): ...

272

```

273

274

## Usage Examples

275

276

### Loading CSV with Custom Schema

277

278

```python

279

from dagster import op, job, In

280

from dagster_pyspark import DataFrame, PySparkResource

281

282

@op(ins={"data": In(DataFrame)})

283

def process_csv_data(data):

284

data.show()

285

return data.count()

286

287

@job(

288

resource_defs={"pyspark": PySparkResource(spark_config={})}

289

)

290

def csv_processing_job():

291

process_csv_data()

292

293

# Configuration for CSV input:

294

# {

295

# "ops": {

296

# "process_csv_data": {

297

# "inputs": {

298

# "data": {

299

# "csv": {

300

# "path": "/path/to/data.csv",

301

# "header": true,

302

# "inferSchema": true,

303

# "sep": ",",

304

# "encoding": "UTF-8"

305

# }

306

# }

307

# }

308

# }

309

# }

310

# }

311

```

312

313

### Loading from Database

314

315

```python

316

from dagster import op, job, In

317

from dagster_pyspark import DataFrame, PySparkResource

318

319

@op(ins={"sales_data": In(DataFrame)})

320

def analyze_sales(sales_data):

321

return sales_data.groupBy("region").sum("revenue").collect()

322

323

@job(

324

resource_defs={"pyspark": PySparkResource(spark_config={})}

325

)

326

def sales_analysis_job():

327

analyze_sales()

328

329

# Configuration for JDBC input:

330

# {

331

# "ops": {

332

# "analyze_sales": {

333

# "inputs": {

334

# "sales_data": {

335

# "jdbc": {

336

# "url": "jdbc:postgresql://localhost:5432/sales_db",

337

# "table": "sales_transactions",

338

# "properties": {

339

# "user": "analyst",

340

# "password": "secure_password"

341

# },

342

# "numPartitions": 4

343

# }

344

# }

345

# }

346

# }

347

# }

348

# }

349

```

350

351

### Loading JSON with Schema Inference

352

353

```python

354

from dagster import op, job, In

355

from dagster_pyspark import DataFrame, PySparkResource

356

357

@op(ins={"events": In(DataFrame)})

358

def process_events(events):

359

return events.filter(events.event_type == "purchase").count()

360

361

@job(

362

resource_defs={"pyspark": PySparkResource(spark_config={})}

363

)

364

def event_processing_job():

365

process_events()

366

367

# Configuration for JSON input:

368

# {

369

# "ops": {

370

# "process_events": {

371

# "inputs": {

372

# "events": {

373

# "json": {

374

# "path": "/path/to/events/*.json",

375

# "multiLine": true,

376

# "allowComments": true,

377

# "timestampFormat": "yyyy-MM-dd HH:mm:ss"

378

# }

379

# }

380

# }

381

# }

382

# }

383

# }

384

```