or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md

data-io.mddocs/

0

# Data I/O Operations

1

2

Reading and writing data from various sources and formats. Supports batch and streaming data with extensive configuration options, built-in format support, and custom data source integration.

3

4

## Capabilities

5

6

### DataFrameReader

7

8

Interface for reading data from external storage systems into DataFrames.

9

10

```scala { .api }

11

/**

12

* Interface for reading data from external storage systems

13

*/

14

class DataFrameReader {

15

/** Specify data source format */

16

def format(source: String): DataFrameReader

17

18

/** Set schema for the data */

19

def schema(schema: StructType): DataFrameReader

20

def schema(schemaString: String): DataFrameReader

21

22

/** Set options for the data source */

23

def option(key: String, value: String): DataFrameReader

24

def option(key: String, value: Boolean): DataFrameReader

25

def option(key: String, value: Long): DataFrameReader

26

def option(key: String, value: Double): DataFrameReader

27

def options(options: scala.collection.Map[String, String]): DataFrameReader

28

def options(options: java.util.Map[String, String]): DataFrameReader

29

30

/** Load data using generic interface */

31

def load(): DataFrame

32

def load(path: String): DataFrame

33

def load(paths: String*): DataFrame

34

35

/** Built-in format readers */

36

def json(path: String): DataFrame

37

def json(paths: String*): DataFrame

38

def json(jsonRDD: RDD[String]): DataFrame

39

def json(jsonDataset: Dataset[String]): DataFrame

40

41

def parquet(paths: String*): DataFrame

42

43

def orc(paths: String*): DataFrame

44

45

def text(paths: String*): DataFrame

46

def textFile(paths: String*): Dataset[String]

47

48

def csv(paths: String*): DataFrame

49

50

def table(tableName: String): DataFrame

51

52

/** JDBC data source */

53

def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame

54

def jdbc(url: String, table: String, predicates: Array[String],

55

connectionProperties: java.util.Properties): DataFrame

56

def jdbc(url: String, table: String, columnName: String,

57

lowerBound: Long, upperBound: Long, numPartitions: Int,

58

connectionProperties: java.util.Properties): DataFrame

59

}

60

```

61

62

**Usage Examples:**

63

64

```scala

65

// JSON with schema inference

66

val df1 = spark.read

67

.option("multiline", "true")

68

.json("path/to/file.json")

69

70

// CSV with custom options

71

val df2 = spark.read

72

.option("header", "true")

73

.option("inferSchema", "true")

74

.option("delimiter", ",")

75

.csv("path/to/file.csv")

76

77

// Parquet (schema preserved)

78

val df3 = spark.read.parquet("path/to/*.parquet")

79

80

// With explicit schema

81

import org.apache.spark.sql.types._

82

83

val schema = StructType(Seq(

84

StructField("name", StringType, nullable = true),

85

StructField("age", IntegerType, nullable = true),

86

StructField("salary", DoubleType, nullable = true)

87

))

88

89

val df4 = spark.read

90

.schema(schema)

91

.option("header", "true")

92

.csv("employees.csv")

93

94

// JDBC connection

95

val df5 = spark.read

96

.format("jdbc")

97

.option("url", "jdbc:postgresql://localhost:5432/mydb")

98

.option("dbtable", "employees")

99

.option("user", "username")

100

.option("password", "password")

101

.load()

102

103

// Custom data source

104

val df6 = spark.read

105

.format("org.apache.spark.sql.cassandra")

106

.option("keyspace", "mykeyspace")

107

.option("table", "mytable")

108

.load()

109

```

110

111

### DataFrameWriter

112

113

Interface for writing Dataset to external storage systems.

114

115

```scala { .api }

116

/**

117

* Interface for writing Dataset to external storage systems

118

* @tparam T Type of the Dataset

119

*/

120

class DataFrameWriter[T] {

121

/** Specify output format */

122

def format(source: String): DataFrameWriter[T]

123

124

/** Set save mode */

125

def mode(saveMode: SaveMode): DataFrameWriter[T]

126

def mode(saveMode: String): DataFrameWriter[T]

127

128

/** Set options for the data source */

129

def option(key: String, value: String): DataFrameWriter[T]

130

def option(key: String, value: Boolean): DataFrameWriter[T]

131

def option(key: String, value: Long): DataFrameWriter[T]

132

def option(key: String, value: Double): DataFrameWriter[T]

133

def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]

134

def options(options: java.util.Map[String, String]): DataFrameWriter[T]

135

136

/** Partition output by columns */

137

def partitionBy(colNames: String*): DataFrameWriter[T]

138

139

/** Bucket output by columns */

140

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

141

def sortBy(colName: String, colNames: String*): DataFrameWriter[T]

142

143

/** Save using generic interface */

144

def save(): Unit

145

def save(path: String): Unit

146

147

/** Built-in format writers */

148

def json(path: String): Unit

149

def parquet(path: String): Unit

150

def orc(path: String): Unit

151

def text(path: String): Unit

152

def csv(path: String): Unit

153

154

/** Save as table */

155

def saveAsTable(tableName: String): Unit

156

def insertInto(tableName: String): Unit

157

158

/** JDBC output */

159

def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit

160

}

161

162

/**

163

* Save modes for writing data

164

*/

165

object SaveMode extends Enumeration {

166

type SaveMode = Value

167

val Append, Overwrite, ErrorIfExists, Ignore = Value

168

}

169

```

170

171

**Usage Examples:**

172

173

```scala

174

val df = spark.table("employees")

175

176

// Basic save operations

177

df.write

178

.mode(SaveMode.Overwrite)

179

.parquet("output/employees.parquet")

180

181

df.write

182

.mode("append")

183

.option("header", "true")

184

.csv("output/employees.csv")

185

186

// Partitioned output

187

df.write

188

.mode(SaveMode.Overwrite)

189

.partitionBy("department", "year")

190

.parquet("output/employees_partitioned")

191

192

// Bucketed output

193

df.write

194

.mode(SaveMode.Overwrite)

195

.bucketBy(10, "employee_id")

196

.sortBy("salary")

197

.saveAsTable("bucketed_employees")

198

199

// JDBC output

200

df.write

201

.mode(SaveMode.Overwrite)

202

.format("jdbc")

203

.option("url", "jdbc:postgresql://localhost:5432/mydb")

204

.option("dbtable", "employees")

205

.option("user", "username")

206

.option("password", "password")

207

.save()

208

209

// Custom format with options

210

df.write

211

.mode(SaveMode.Append)

212

.format("delta")

213

.option("mergeSchema", "true")

214

.save("path/to/delta-table")

215

```

216

217

### Common Data Source Options

218

219

Configuration options for built-in data sources.

220

221

```scala { .api }

222

// CSV Options

223

object CsvOptions {

224

val DELIMITER = "delimiter" // Field delimiter (default: ",")

225

val QUOTE = "quote" // Quote character (default: "\"")

226

val ESCAPE = "escape" // Escape character (default: "\")

227

val HEADER = "header" // First line is header (default: "false")

228

val INFER_SCHEMA = "inferSchema" // Infer schema from data (default: "false")

229

val NULL_VALUE = "nullValue" // String representation of null (default: "")

230

val DATE_FORMAT = "dateFormat" // Date format (default: "yyyy-MM-dd")

231

val TIMESTAMP_FORMAT = "timestampFormat" // Timestamp format

232

val MAX_COLUMNS = "maxColumns" // Maximum number of columns

233

val MAX_CHARS_PER_COLUMN = "maxCharsPerColumn" // Max chars per column

234

val ENCODING = "encoding" // Character encoding (default: "UTF-8")

235

val COMMENT = "comment" // Comment character

236

val MODE = "mode" // Parse mode: PERMISSIVE, DROPMALFORMED, FAILFAST

237

}

238

239

// JSON Options

240

object JsonOptions {

241

val ALLOW_COMMENTS = "allowComments" // Allow comments (default: "false")

242

val ALLOW_UNQUOTED_FIELD_NAMES = "allowUnquotedFieldNames" // Allow unquoted field names

243

val ALLOW_SINGLE_QUOTES = "allowSingleQuotes" // Allow single quotes

244

val ALLOW_NUMERIC_LEADING_ZEROS = "allowNumericLeadingZeros" // Allow leading zeros

245

val ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER = "allowBackslashEscapingAnyCharacter"

246

val MULTILINE = "multiline" // Parse multiline JSON (default: "false")

247

val DATE_FORMAT = "dateFormat" // Date format

248

val TIMESTAMP_FORMAT = "timestampFormat" // Timestamp format

249

val PRIMITIVE_AS_STRING = "primitivesAsString" // Parse primitives as strings

250

}

251

252

// Parquet Options

253

object ParquetOptions {

254

val MERGE_SCHEMA = "mergeSchema" // Merge schemas from multiple files

255

val COMPRESSION = "compression" // Compression codec: none, snappy, gzip, lzo

256

val DICTIONARY_ENCODING = "dictionaryEncoding" // Use dictionary encoding

257

}

258

259

// JDBC Options

260

object JdbcOptions {

261

val DRIVER = "driver" // JDBC driver class name

262

val USER = "user" // Username

263

val PASSWORD = "password" // Password

264

val FETCH_SIZE = "fetchsize" // JDBC fetch size

265

val BATCH_SIZE = "batchsize" // JDBC batch size for inserts

266

val ISOLATION_LEVEL = "isolationLevel" // Transaction isolation level

267

val NUM_PARTITIONS = "numPartitions" // Number of partitions for parallel reads

268

val PARTITION_COLUMN = "partitionColumn" // Column for partitioning

269

val LOWER_BOUND = "lowerBound" // Lower bound for partitioning

270

val UPPER_BOUND = "upperBound" // Upper bound for partitioning

271

val QUERY_TIMEOUT = "queryTimeout" // Query timeout in seconds

272

val CREATE_TABLE_OPTIONS = "createTableOptions" // Options for CREATE TABLE

273

val CREATE_TABLE_COLUMN_TYPES = "createTableColumnTypes" // Column types for CREATE TABLE

274

val CUSTOM_SCHEMA = "customSchema" // Custom schema for reading

275

}

276

```

277

278

### Built-in Data Sources

279

280

Support for various data formats and storage systems.

281

282

```scala { .api }

283

// File-based sources

284

val CSV_SOURCE = "csv"

285

val JSON_SOURCE = "json"

286

val PARQUET_SOURCE = "parquet"

287

val ORC_SOURCE = "orc"

288

val TEXT_SOURCE = "text"

289

val AVRO_SOURCE = "avro" // Requires spark-avro package

290

291

// Database sources

292

val JDBC_SOURCE = "jdbc"

293

294

// Big data sources

295

val HIVE_SOURCE = "hive"

296

val DELTA_SOURCE = "delta" // Requires Delta Lake

297

val ICEBERG_SOURCE = "iceberg" // Requires Apache Iceberg

298

299

// Cloud sources (require appropriate dependencies)

300

val S3_SOURCE = "s3"

301

val AZURE_SOURCE = "azure"

302

val GCS_SOURCE = "gcs"

303

304

// Streaming sources

305

val KAFKA_SOURCE = "kafka"

306

val SOCKET_SOURCE = "socket"

307

val RATE_SOURCE = "rate" // For testing

308

```

309

310

### Advanced I/O Patterns

311

312

Common patterns for data ingestion and output.

313

314

**Multi-format reading:**

315

316

```scala

317

// Read from multiple formats

318

val jsonDf = spark.read.json("data/*.json")

319

val csvDf = spark.read.option("header", "true").csv("data/*.csv")

320

val combined = jsonDf.union(csvDf)

321

322

// Schema evolution with Parquet

323

val df1 = spark.read.parquet("data/year=2021")

324

val df2 = spark.read.parquet("data/year=2022")

325

val merged = df1.union(df2)

326

```

327

328

**Optimized writes:**

329

330

```scala

331

// Optimize partition size

332

df.repartition(200).write

333

.mode(SaveMode.Overwrite)

334

.parquet("output/data")

335

336

// Coalesce for fewer files

337

df.coalesce(10).write

338

.mode(SaveMode.Overwrite)

339

.json("output/data")

340

341

// Dynamic partitioning

342

df.write

343

.mode(SaveMode.Overwrite)

344

.partitionBy("year", "month")

345

.option("maxRecordsPerFile", "100000")

346

.parquet("output/partitioned_data")

347

```

348

349

**Error handling:**

350

351

```scala

352

// Handle malformed records

353

val df = spark.read

354

.option("mode", "DROPMALFORMED") // or "PERMISSIVE", "FAILFAST"

355

.option("columnNameOfCorruptRecord", "_corrupt_record")

356

.json("data/potentially_bad.json")

357

358

// Validate data after read

359

val validDf = df.filter(col("_corrupt_record").isNull)

360

val corruptDf = df.filter(col("_corrupt_record").isNotNull)

361

```

362

363

**Custom data sources:**

364

365

```scala

366

// Register custom format

367

spark.sql("CREATE TABLE custom_table USING org.example.CustomDataSource OPTIONS (path 'data/custom')")

368

369

// Use programmatically

370

val df = spark.read

371

.format("org.example.CustomDataSource")

372

.option("customOption", "value")

373

.load("data/custom")

374

```