or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-operations.mdindex.mdinput-sources.mdjava-api.mdoutput-operations.mdstateful-operations.mdtransformations.md

output-operations.mddocs/

0

# Output Operations

1

2

Methods for writing processed stream data to external systems, triggering computations, and performing actions on DStreams.

3

4

## Basic Output Operations

5

6

### Print Operations

7

8

Print first 10 elements of each RDD:

9

```scala { .api }

10

def print(): Unit

11

```

12

13

Print first num elements of each RDD:

14

```scala { .api }

15

def print(num: Int): Unit

16

```

17

18

Example print operations:

19

```scala

20

val wordCounts = ssc.socketTextStream("localhost", 9999)

21

.flatMap(_.split("\\s+"))

22

.map((_, 1))

23

.reduceByKey(_ + _)

24

25

wordCounts.print() // Print first 10 elements

26

wordCounts.print(20) // Print first 20 elements

27

```

28

29

### ForEach Operations

30

31

Apply function to each RDD:

32

```scala { .api }

33

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

34

```

35

36

Apply function to each RDD with time information:

37

```scala { .api }

38

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

39

```

40

41

Example forEach operations:

42

```scala

43

val lines = ssc.socketTextStream("localhost", 9999)

44

45

// Process each RDD

46

lines.foreachRDD { rdd =>

47

if (!rdd.isEmpty()) {

48

println(s"Processing ${rdd.count()} lines")

49

rdd.collect().foreach(println)

50

}

51

}

52

53

// Process with time information

54

lines.foreachRDD { (rdd, time) =>

55

println(s"Processing batch at ${time.toString()}")

56

rdd.foreach(line => processLine(line, time))

57

}

58

```

59

60

## File Output Operations

61

62

### Text File Output

63

64

Save as text files with prefix:

65

```scala { .api }

66

def saveAsTextFiles(prefix: String, suffix: String = null): Unit

67

```

68

69

Example text file output:

70

```scala

71

val processedLines = ssc.socketTextStream("localhost", 9999)

72

.map(_.toUpperCase)

73

74

// Save to files like output-123456789.txt

75

processedLines.saveAsTextFiles("output", "txt")

76

77

// Save to files like processed-123456789

78

processedLines.saveAsTextFiles("processed")

79

```

80

81

### Object File Output

82

83

Save as serialized object files:

84

```scala { .api }

85

def saveAsObjectFiles(prefix: String, suffix: String = null): Unit

86

```

87

88

Example object file output:

89

```scala

90

case class LogEntry(timestamp: Long, level: String, message: String)

91

92

val logEntries = ssc.socketTextStream("localhost", 9999)

93

.map(parseLogEntry)

94

95

logEntries.saveAsObjectFiles("logs", "obj")

96

```

97

98

### Hadoop File Output

99

100

Save using Hadoop OutputFormat:

101

```scala { .api }

102

def saveAsHadoopFiles[F <: OutputFormat[K, V]](

103

prefix: String,

104

suffix: String = null

105

)(implicit fm: ClassTag[F]): Unit // On DStream[(K, V)]

106

```

107

108

Save using new Hadoop API:

109

```scala { .api }

110

def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](

111

prefix: String,

112

suffix: String = null,

113

conf: Configuration = ssc.sparkContext.hadoopConfiguration

114

)(implicit fm: ClassTag[F]): Unit // On DStream[(K, V)]

115

```

116

117

Example Hadoop output:

118

```scala

119

import org.apache.hadoop.io.{Text, IntWritable}

120

import org.apache.hadoop.mapred.TextOutputFormat

121

import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

122

123

val keyValuePairs = ssc.socketTextStream("localhost", 9999)

124

.map(line => (new Text(line), new IntWritable(1)))

125

126

// Using old Hadoop API

127

keyValuePairs.saveAsHadoopFiles[TextOutputFormat[Text, IntWritable]]("hadoop-output")

128

129

// Using new Hadoop API

130

keyValuePairs.saveAsNewAPIHadoopFiles[NewTextOutputFormat[Text, IntWritable]]("new-hadoop-output")

131

```

132

133

## Database Output Operations

134

135

### JDBC Output

136

137

Example JDBC output using foreachRDD:

138

```scala

139

import java.sql.{Connection, DriverManager, PreparedStatement}

140

141

val processedData = ssc.socketTextStream("localhost", 9999)

142

.map(parseData)

143

144

processedData.foreachRDD { rdd =>

145

rdd.foreachPartition { partition =>

146

// Create connection per partition

147

val connection = DriverManager.getConnection("jdbc:mysql://localhost/db", "user", "password")

148

val statement = connection.prepareStatement("INSERT INTO table (col1, col2) VALUES (?, ?)")

149

150

partition.foreach { case (col1, col2) =>

151

statement.setString(1, col1)

152

statement.setString(2, col2)

153

statement.addBatch()

154

}

155

156

statement.executeBatch()

157

connection.close()

158

}

159

}

160

```

161

162

### NoSQL Database Output

163

164

Example MongoDB output:

165

```scala

166

import com.mongodb.spark.MongoSpark

167

168

val documents = ssc.socketTextStream("localhost", 9999)

169

.map(parseToDocument)

170

171

documents.foreachRDD { rdd =>

172

if (!rdd.isEmpty()) {

173

MongoSpark.save(rdd)

174

}

175

}

176

```

177

178

## Message Queue Output

179

180

### Kafka Output

181

182

Example Kafka producer output:

183

```scala

184

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

185

import java.util.Properties

186

187

val messages = ssc.socketTextStream("localhost", 9999)

188

189

messages.foreachRDD { rdd =>

190

rdd.foreachPartition { partition =>

191

val props = new Properties()

192

props.put("bootstrap.servers", "localhost:9092")

193

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")

194

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

195

196

val producer = new KafkaProducer[String, String](props)

197

198

partition.foreach { message =>

199

val record = new ProducerRecord[String, String]("output-topic", message)

200

producer.send(record)

201

}

202

203

producer.close()

204

}

205

}

206

```

207

208

## Advanced Output Patterns

209

210

### Batch-aware Output

211

212

Output with batch time information:

213

```scala

214

val timestampedData = ssc.socketTextStream("localhost", 9999)

215

216

timestampedData.foreachRDD { (rdd, time) =>

217

val batchTime = time.milliseconds

218

val outputPath = s"output/batch-$batchTime"

219

220

if (!rdd.isEmpty()) {

221

rdd.coalesce(1).saveAsTextFile(outputPath)

222

}

223

}

224

```

225

226

### Conditional Output

227

228

Output only when conditions are met:

229

```scala

230

val filteredData = ssc.socketTextStream("localhost", 9999)

231

.filter(_.nonEmpty)

232

233

filteredData.foreachRDD { rdd =>

234

val count = rdd.count()

235

236

if (count > 100) {

237

println(s"Processing large batch: $count records")

238

rdd.saveAsTextFile(s"large-batches/batch-${System.currentTimeMillis()}")

239

} else if (count > 0) {

240

println(s"Processing small batch: $count records")

241

rdd.collect().foreach(println)

242

}

243

}

244

```

245

246

### Multi-destination Output

247

248

Output to multiple destinations:

249

```scala

250

val processedData = ssc.socketTextStream("localhost", 9999)

251

.map(processLine)

252

253

processedData.foreachRDD { rdd =>

254

// Cache for multiple outputs

255

rdd.cache()

256

257

// Output to file system

258

if (!rdd.isEmpty()) {

259

rdd.saveAsTextFile(s"output/batch-${System.currentTimeMillis()}")

260

}

261

262

// Output to database

263

rdd.foreachPartition { partition =>

264

// Database write logic

265

writeToDatabase(partition)

266

}

267

268

// Output metrics

269

val count = rdd.count()

270

println(s"Processed $count records")

271

272

// Unpersist to free memory

273

rdd.unpersist()

274

}

275

```

276

277

## Output Operation Properties

278

279

### Exactly-once Semantics

280

281

Ensure exactly-once output semantics:

282

```scala

283

val idempotentOutput = ssc.socketTextStream("localhost", 9999)

284

285

idempotentOutput.foreachRDD { (rdd, time) =>

286

val batchId = time.milliseconds

287

288

// Check if batch already processed

289

if (!isProcessed(batchId)) {

290

rdd.foreachPartition { partition =>

291

// Atomic write operation

292

writeAtomically(partition, batchId)

293

}

294

markProcessed(batchId)

295

}

296

}

297

298

def isProcessed(batchId: Long): Boolean = {

299

// Check external system for processing status

300

checkProcessingStatus(batchId)

301

}

302

303

def markProcessed(batchId: Long): Unit = {

304

// Mark batch as processed in external system

305

updateProcessingStatus(batchId)

306

}

307

```

308

309

### Error Handling in Output

310

311

Handle output errors gracefully:

312

```scala

313

val reliableOutput = ssc.socketTextStream("localhost", 9999)

314

315

reliableOutput.foreachRDD { rdd =>

316

rdd.foreachPartition { partition =>

317

var retries = 3

318

var success = false

319

320

while (retries > 0 && !success) {

321

try {

322

writeToExternalSystem(partition)

323

success = true

324

} catch {

325

case e: Exception =>

326

retries -= 1

327

if (retries > 0) {

328

Thread.sleep(1000) // Wait before retry

329

} else {

330

// Log error and potentially write to dead letter queue

331

logError(e, partition)

332

}

333

}

334

}

335

}

336

}

337

```

338

339

### Performance Optimization

340

341

Optimize output operations:

342

```scala

343

val optimizedOutput = ssc.socketTextStream("localhost", 9999)

344

.filter(_.nonEmpty)

345

346

optimizedOutput.foreachRDD { rdd =>

347

// Coalesce to reduce number of output files

348

val coalescedRDD = if (rdd.getNumPartitions > 10) {

349

rdd.coalesce(10)

350

} else {

351

rdd

352

}

353

354

// Only process non-empty RDDs

355

if (!coalescedRDD.isEmpty()) {

356

coalescedRDD.saveAsTextFile(s"output/${System.currentTimeMillis()}")

357

}

358

}

359

```

360

361

## Output Operation Best Practices

362

363

### Connection Management

364

365

Manage external connections efficiently:

366

```scala

367

val dataStream = ssc.socketTextStream("localhost", 9999)

368

369

dataStream.foreachRDD { rdd =>

370

rdd.foreachPartition { partition =>

371

// Create connection per partition (not per record)

372

val connection = createConnection()

373

374

try {

375

partition.foreach { record =>

376

writeRecord(connection, record)

377

}

378

} finally {

379

connection.close()

380

}

381

}

382

}

383

```

384

385

### Batch Processing

386

387

Process records in batches for better performance:

388

```scala

389

val batchedOutput = ssc.socketTextStream("localhost", 9999)

390

391

batchedOutput.foreachRDD { rdd =>

392

rdd.foreachPartition { partition =>

393

val batchSize = 1000

394

val buffer = scala.collection.mutable.ArrayBuffer[String]()

395

396

partition.foreach { record =>

397

buffer += record

398

399

if (buffer.size >= batchSize) {

400

writeBatch(buffer.toList)

401

buffer.clear()

402

}

403

}

404

405

// Write remaining records

406

if (buffer.nonEmpty) {

407

writeBatch(buffer.toList)

408

}

409

}

410

}

411

```