or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

data-sources.mddocs/

0

# Data Sources

1

2

Spark provides extensive support for reading and writing data from various sources including local filesystems, HDFS, object stores, and databases. This document covers all the data source APIs available in Spark 1.0.0.

3

4

## Text Files

5

6

### Reading Text Files

7

8

**textFile**: Read text files line by line

9

```scala { .api }

10

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

11

```

12

13

```scala

14

// Local filesystem

15

val localFile = sc.textFile("file:///path/to/local/file.txt")

16

17

// HDFS

18

val hdfsFile = sc.textFile("hdfs://namenode:port/path/to/file.txt")

19

20

// Multiple files with wildcards

21

val multipleFiles = sc.textFile("hdfs://path/to/files/*.txt")

22

23

// Specify minimum partitions

24

val partitionedFile = sc.textFile("hdfs://path/to/large/file.txt", 8)

25

26

// Compressed files (automatically detected)

27

val gzipFile = sc.textFile("hdfs://path/to/file.txt.gz")

28

```

29

30

**wholeTextFiles**: Read entire files as key-value pairs

31

```scala { .api }

32

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

33

```

34

35

```scala

36

// Read directory of files - returns (filename, file_content) pairs

37

val filesRDD = sc.wholeTextFiles("hdfs://path/to/directory/")

38

39

filesRDD.foreach { case (filename, content) =>

40

println(s"File: $filename")

41

println(s"Size: ${content.length} characters")

42

println(s"Lines: ${content.count(_ == '\n') + 1}")

43

}

44

45

// Process each file separately

46

val processedFiles = filesRDD.mapValues { content =>

47

content.split("\n").filter(_.nonEmpty).length

48

}

49

```

50

51

### Saving Text Files

52

53

**saveAsTextFile**: Save RDD as text files

54

```scala { .api }

55

def saveAsTextFile(path: String): Unit

56

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

57

```

58

59

```scala

60

val data = sc.parallelize(Array("line1", "line2", "line3"))

61

62

// Save as text files

63

data.saveAsTextFile("hdfs://path/to/output")

64

65

// Save with compression

66

import org.apache.hadoop.io.compress.GzipCodec

67

data.saveAsTextFile("hdfs://path/to/compressed-output", classOf[GzipCodec])

68

69

// Other compression codecs

70

import org.apache.hadoop.io.compress.{BZip2Codec, DefaultCodec, SnappyCodec}

71

data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])

72

data.saveAsTextFile("output-snappy", classOf[SnappyCodec])

73

```

74

75

## Object Files

76

77

Spark's native binary format using Java serialization.

78

79

### Reading Object Files

80

81

**objectFile**: Load RDD of objects

82

```scala { .api }

83

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

84

```

85

86

```scala

87

// Save and load custom objects

88

case class Person(name: String, age: Int)

89

90

val people = sc.parallelize(Array(

91

Person("Alice", 25),

92

Person("Bob", 30),

93

Person("Charlie", 35)

94

))

95

96

// Save as object file

97

people.saveAsObjectFile("hdfs://path/to/people")

98

99

// Load back

100

val loadedPeople: RDD[Person] = sc.objectFile[Person]("hdfs://path/to/people")

101

```

102

103

### Saving Object Files

104

105

**saveAsObjectFile**: Save RDD as serialized objects

106

```scala { .api }

107

def saveAsObjectFile(path: String): Unit

108

```

109

110

```scala

111

val complexData = sc.parallelize(Array(

112

Map("name" -> "Alice", "scores" -> List(85, 92, 78)),

113

Map("name" -> "Bob", "scores" -> List(91, 87, 94))

114

))

115

116

complexData.saveAsObjectFile("hdfs://path/to/complex-data")

117

```

118

119

## Hadoop Files

120

121

Spark integrates with Hadoop's input and output formats for reading various file types.

122

123

### Sequence Files

124

125

**Reading SequenceFiles**:

126

```scala { .api }

127

def sequenceFile[K, V](

128

path: String,

129

keyClass: Class[K],

130

valueClass: Class[V],

131

minPartitions: Int = defaultMinPartitions

132

): RDD[(K, V)]

133

```

134

135

```scala

136

import org.apache.hadoop.io.{IntWritable, Text}

137

138

val seqFile = sc.sequenceFile[IntWritable, Text](

139

"hdfs://path/to/sequencefile",

140

classOf[IntWritable],

141

classOf[Text]

142

)

143

144

// Convert Writable types to Scala types

145

val converted = seqFile.map { case (key, value) =>

146

(key.get(), value.toString)

147

}

148

```

149

150

**Saving SequenceFiles**:

151

```scala { .api }

152

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

153

```

154

155

```scala

156

// For RDDs of types that can be converted to Writable

157

val pairs = sc.parallelize(Array((1, "apple"), (2, "banana"), (3, "orange")))

158

159

// Convert to Writable types

160

val writablePairs = pairs.map { case (k, v) =>

161

(new IntWritable(k), new Text(v))

162

}

163

164

writablePairs.saveAsSequenceFile("hdfs://path/to/output")

165

166

// With compression

167

writablePairs.saveAsSequenceFile("hdfs://path/to/compressed", Some(classOf[GzipCodec]))

168

```

169

170

### Generic Hadoop Files

171

172

**hadoopFile**: Read files with custom InputFormat

173

```scala { .api }

174

def hadoopFile[K, V](

175

path: String,

176

inputFormatClass: Class[_ <: InputFormat[K, V]],

177

keyClass: Class[K],

178

valueClass: Class[V],

179

minPartitions: Int = defaultMinPartitions

180

): RDD[(K, V)]

181

```

182

183

```scala

184

import org.apache.hadoop.mapred.{TextInputFormat, FileInputFormat}

185

import org.apache.hadoop.io.{LongWritable, Text}

186

187

// Read with TextInputFormat (returns line number, line content)

188

val textWithLineNumbers = sc.hadoopFile[LongWritable, Text](

189

"hdfs://path/to/file",

190

classOf[TextInputFormat],

191

classOf[LongWritable],

192

classOf[Text]

193

)

194

195

// Custom InputFormat example

196

class CustomInputFormat extends InputFormat[Text, Text] {

197

// Implementation here

198

}

199

200

val customFile = sc.hadoopFile[Text, Text](

201

"hdfs://path/to/custom/format",

202

classOf[CustomInputFormat],

203

classOf[Text],

204

classOf[Text]

205

)

206

```

207

208

**newAPIHadoopFile**: Use new Hadoop API

209

```scala { .api }

210

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](

211

path: String,

212

fClass: Class[F],

213

kClass: Class[K],

214

vClass: Class[V],

215

conf: Configuration = hadoopConfiguration

216

): RDD[(K, V)]

217

```

218

219

```scala

220

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

221

import org.apache.hadoop.io.{LongWritable, Text}

222

223

val newAPIFile = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](

224

"hdfs://path/to/file",

225

classOf[TextInputFormat],

226

classOf[LongWritable],

227

classOf[Text]

228

)

229

```

230

231

### Hadoop Configuration

232

233

Access and modify Hadoop configuration:

234

235

```scala { .api }

236

def hadoopConfiguration: Configuration

237

```

238

239

```scala

240

val hadoopConf = sc.hadoopConfiguration

241

242

// Configure S3 access

243

hadoopConf.set("fs.s3a.access.key", "your-access-key")

244

hadoopConf.set("fs.s3a.secret.key", "your-secret-key")

245

hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")

246

247

// Configure compression

248

hadoopConf.set("mapreduce.map.output.compress", "true")

249

hadoopConf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")

250

251

// Now read from S3

252

val s3Data = sc.textFile("s3a://bucket-name/path/to/file")

253

```

254

255

## Saving with Hadoop Formats

256

257

### Save as Hadoop Files

258

259

**saveAsHadoopFile**: Save with old Hadoop API

260

```scala { .api }

261

def saveAsHadoopFile[F <: OutputFormat[K, V]](

262

path: String,

263

keyClass: Class[_],

264

valueClass: Class[_],

265

outputFormatClass: Class[F],

266

codec: Class[_ <: CompressionCodec]

267

): Unit

268

269

// Simplified versions

270

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit

271

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit

272

```

273

274

```scala

275

import org.apache.hadoop.mapred.{TextOutputFormat, SequenceFileOutputFormat}

276

import org.apache.hadoop.io.{IntWritable, Text}

277

278

val pairs = sc.parallelize(Array((1, "apple"), (2, "banana")))

279

val writablePairs = pairs.map { case (k, v) => (new IntWritable(k), new Text(v)) }

280

281

// Save as text with custom format

282

writablePairs.saveAsHadoopFile[TextOutputFormat[IntWritable, Text]](

283

"hdfs://path/to/output",

284

classOf[IntWritable],

285

classOf[Text],

286

classOf[TextOutputFormat[IntWritable, Text]]

287

)

288

```

289

290

**saveAsNewAPIHadoopFile**: Save with new Hadoop API

291

```scala { .api }

292

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](

293

path: String,

294

keyClass: Class[_],

295

valueClass: Class[_],

296

outputFormatClass: Class[F],

297

conf: Configuration = context.hadoopConfiguration

298

): Unit

299

```

300

301

**saveAsHadoopDataset**: Save using JobConf

302

```scala { .api }

303

def saveAsHadoopDataset(conf: JobConf): Unit

304

```

305

306

```scala

307

import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}

308

309

val jobConf = new JobConf()

310

jobConf.setOutputFormat(classOf[TextOutputFormat[IntWritable, Text]])

311

jobConf.setOutputKeyClass(classOf[IntWritable])

312

jobConf.setOutputValueClass(classOf[Text])

313

jobConf.setOutputPath(new Path("hdfs://path/to/output"))

314

315

writablePairs.saveAsHadoopDataset(jobConf)

316

```

317

318

## Database Connectivity

319

320

### JDBC Data Sources

321

322

While Spark 1.0.0 doesn't have built-in JDBC DataFrames, you can read from databases using custom input formats:

323

324

```scala

325

import java.sql.{Connection, DriverManager, ResultSet}

326

327

// Custom function to read from database

328

def readFromDatabase(url: String, query: String): RDD[String] = {

329

sc.parallelize(Seq(query)).mapPartitions { queries =>

330

val connection = DriverManager.getConnection(url)

331

val statement = connection.createStatement()

332

333

queries.flatMap { query =>

334

val resultSet = statement.executeQuery(query)

335

val results = scala.collection.mutable.ListBuffer[String]()

336

337

while (resultSet.next()) {

338

// Extract data from ResultSet

339

results += resultSet.getString(1) // Assuming single column

340

}

341

342

resultSet.close()

343

statement.close()

344

connection.close()

345

results

346

}

347

}

348

}

349

350

val dbData = readFromDatabase("jdbc:mysql://localhost:3306/mydb", "SELECT * FROM users")

351

```

352

353

### Custom Data Sources

354

355

Create custom data sources by implementing InputFormat:

356

357

```scala

358

import org.apache.hadoop.mapred.{InputFormat, InputSplit, JobConf, RecordReader, Reporter}

359

360

class CustomInputFormat extends InputFormat[LongWritable, Text] {

361

def getSplits(job: JobConf, numSplits: Int): Array[InputSplit] = {

362

// Create input splits

363

Array[InputSplit]()

364

}

365

366

def getRecordReader(split: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, Text] = {

367

// Return record reader

368

null

369

}

370

}

371

372

// Use custom format

373

val customData = sc.hadoopFile[LongWritable, Text](

374

"path",

375

classOf[CustomInputFormat],

376

classOf[LongWritable],

377

classOf[Text]

378

)

379

```

380

381

## Cloud Storage

382

383

### Amazon S3

384

385

```scala

386

// Configure S3 access

387

val hadoopConf = sc.hadoopConfiguration

388

hadoopConf.set("fs.s3a.access.key", "ACCESS_KEY")

389

hadoopConf.set("fs.s3a.secret.key", "SECRET_KEY")

390

391

// Read from S3

392

val s3Data = sc.textFile("s3a://my-bucket/path/to/data.txt")

393

394

// Write to S3

395

data.saveAsTextFile("s3a://my-bucket/output/")

396

```

397

398

### Azure Blob Storage

399

400

```scala

401

// Configure Azure access

402

hadoopConf.set("fs.azure.account.key.mystorageaccount.blob.core.windows.net", "ACCOUNT_KEY")

403

404

// Read from Azure

405

val azureData = sc.textFile("wasb://container@mystorageaccount.blob.core.windows.net/path/to/file")

406

```

407

408

## File Formats and Compression

409

410

### Supported Compression Codecs

411

412

```scala { .api }

413

import org.apache.hadoop.io.compress.{

414

GzipCodec, // .gz files

415

BZip2Codec, // .bz2 files

416

SnappyCodec, // .snappy files

417

LzopCodec, // .lzo files

418

DefaultCodec // Default compression

419

}

420

```

421

422

### Reading Compressed Files

423

424

Spark automatically detects compression based on file extension:

425

426

```scala

427

// Automatically decompressed

428

val gzipData = sc.textFile("hdfs://path/to/file.txt.gz")

429

val bzip2Data = sc.textFile("hdfs://path/to/file.txt.bz2")

430

val snappyData = sc.textFile("hdfs://path/to/file.txt.snappy")

431

432

// Mixed compression in directory

433

val mixedData = sc.textFile("hdfs://path/to/directory/*") // Handles multiple formats

434

```

435

436

### Writing Compressed Files

437

438

```scala

439

val data = sc.parallelize(Array("line1", "line2", "line3"))

440

441

// Save with different compression

442

data.saveAsTextFile("output-gzip", classOf[GzipCodec])

443

data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])

444

data.saveAsTextFile("output-snappy", classOf[SnappyCodec])

445

```

446

447

## Performance Considerations

448

449

### Partitioning

450

451

```scala

452

// Control number of partitions when reading

453

val data = sc.textFile("large-file.txt", minPartitions = 100)

454

455

// Repartition after reading if needed

456

val repartitioned = data.repartition(50)

457

```

458

459

### File Size Optimization

460

461

```scala

462

// For small files, use wholeTextFiles and then repartition

463

val smallFiles = sc.wholeTextFiles("hdfs://path/to/small-files/")

464

.values // Extract just the content

465

.repartition(10) // Reduce number of partitions

466

```

467

468

### Caching Frequently Accessed Data

469

470

```scala

471

val frequentlyUsed = sc.textFile("hdfs://path/to/data")

472

.filter(_.contains("important"))

473

.cache() // Cache in memory

474

475

// Multiple actions on cached data

476

val count1 = frequentlyUsed.count()

477

val count2 = frequentlyUsed.filter(_.length > 100).count()

478

```

479

480

## Error Handling and Validation

481

482

```scala

483

// Validate file existence before reading

484

import org.apache.hadoop.fs.{FileSystem, Path}

485

486

val fs = FileSystem.get(sc.hadoopConfiguration)

487

val path = new Path("hdfs://path/to/file")

488

489

if (fs.exists(path)) {

490

val data = sc.textFile(path.toString)

491

// Process data

492

} else {

493

println(s"File not found: $path")

494

}

495

496

// Handle malformed data

497

val safeData = sc.textFile("data.txt").mapPartitions { lines =>

498

lines.flatMap { line =>

499

try {

500

Some(processLine(line))

501

} catch {

502

case e: Exception =>

503

println(s"Error processing line: $line, Error: ${e.getMessage}")

504

None

505

}

506

}

507

}

508

```

509

510

This comprehensive coverage of data sources provides the foundation for reading and writing data in various formats and storage systems with Apache Spark.