0
# Output Operations
1
2
Methods for writing processed stream data to external systems, triggering computations, and performing actions on DStreams.
3
4
## Basic Output Operations
5
6
### Print Operations
7
8
Print first 10 elements of each RDD:
9
```scala { .api }
10
def print(): Unit
11
```
12
13
Print first num elements of each RDD:
14
```scala { .api }
15
def print(num: Int): Unit
16
```
17
18
Example print operations:
19
```scala
20
val wordCounts = ssc.socketTextStream("localhost", 9999)
21
.flatMap(_.split("\\s+"))
22
.map((_, 1))
23
.reduceByKey(_ + _)
24
25
wordCounts.print() // Print first 10 elements
26
wordCounts.print(20) // Print first 20 elements
27
```
28
29
### ForEach Operations
30
31
Apply function to each RDD:
32
```scala { .api }
33
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
34
```
35
36
Apply function to each RDD with time information:
37
```scala { .api }
38
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
39
```
40
41
Example forEach operations:
42
```scala
43
val lines = ssc.socketTextStream("localhost", 9999)
44
45
// Process each RDD
46
lines.foreachRDD { rdd =>
47
if (!rdd.isEmpty()) {
48
println(s"Processing ${rdd.count()} lines")
49
rdd.collect().foreach(println)
50
}
51
}
52
53
// Process with time information
54
lines.foreachRDD { (rdd, time) =>
55
println(s"Processing batch at ${time.toString()}")
56
rdd.foreach(line => processLine(line, time))
57
}
58
```
59
60
## File Output Operations
61
62
### Text File Output
63
64
Save as text files with prefix:
65
```scala { .api }
66
def saveAsTextFiles(prefix: String, suffix: String = null): Unit
67
```
68
69
Example text file output:
70
```scala
71
val processedLines = ssc.socketTextStream("localhost", 9999)
72
.map(_.toUpperCase)
73
74
// Save to files like output-123456789.txt
75
processedLines.saveAsTextFiles("output", "txt")
76
77
// Save to files like processed-123456789
78
processedLines.saveAsTextFiles("processed")
79
```
80
81
### Object File Output
82
83
Save as serialized object files:
84
```scala { .api }
85
def saveAsObjectFiles(prefix: String, suffix: String = null): Unit
86
```
87
88
Example object file output:
89
```scala
90
case class LogEntry(timestamp: Long, level: String, message: String)
91
92
val logEntries = ssc.socketTextStream("localhost", 9999)
93
.map(parseLogEntry)
94
95
logEntries.saveAsObjectFiles("logs", "obj")
96
```
97
98
### Hadoop File Output
99
100
Save using Hadoop OutputFormat:
101
```scala { .api }
102
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
103
prefix: String,
104
suffix: String = null
105
)(implicit fm: ClassTag[F]): Unit // On DStream[(K, V)]
106
```
107
108
Save using new Hadoop API:
109
```scala { .api }
110
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
111
prefix: String,
112
suffix: String = null,
113
conf: Configuration = ssc.sparkContext.hadoopConfiguration
114
)(implicit fm: ClassTag[F]): Unit // On DStream[(K, V)]
115
```
116
117
Example Hadoop output:
118
```scala
119
import org.apache.hadoop.io.{Text, IntWritable}
120
import org.apache.hadoop.mapred.TextOutputFormat
121
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
122
123
val keyValuePairs = ssc.socketTextStream("localhost", 9999)
124
.map(line => (new Text(line), new IntWritable(1)))
125
126
// Using old Hadoop API
127
keyValuePairs.saveAsHadoopFiles[TextOutputFormat[Text, IntWritable]]("hadoop-output")
128
129
// Using new Hadoop API
130
keyValuePairs.saveAsNewAPIHadoopFiles[NewTextOutputFormat[Text, IntWritable]]("new-hadoop-output")
131
```
132
133
## Database Output Operations
134
135
### JDBC Output
136
137
Example JDBC output using foreachRDD:
138
```scala
139
import java.sql.{Connection, DriverManager, PreparedStatement}
140
141
val processedData = ssc.socketTextStream("localhost", 9999)
142
.map(parseData)
143
144
processedData.foreachRDD { rdd =>
145
rdd.foreachPartition { partition =>
146
// Create connection per partition
147
val connection = DriverManager.getConnection("jdbc:mysql://localhost/db", "user", "password")
148
val statement = connection.prepareStatement("INSERT INTO table (col1, col2) VALUES (?, ?)")
149
150
partition.foreach { case (col1, col2) =>
151
statement.setString(1, col1)
152
statement.setString(2, col2)
153
statement.addBatch()
154
}
155
156
statement.executeBatch()
157
connection.close()
158
}
159
}
160
```
161
162
### NoSQL Database Output
163
164
Example MongoDB output:
165
```scala
166
import com.mongodb.spark.MongoSpark
167
168
val documents = ssc.socketTextStream("localhost", 9999)
169
.map(parseToDocument)
170
171
documents.foreachRDD { rdd =>
172
if (!rdd.isEmpty()) {
173
MongoSpark.save(rdd)
174
}
175
}
176
```
177
178
## Message Queue Output
179
180
### Kafka Output
181
182
Example Kafka producer output:
183
```scala
184
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
185
import java.util.Properties
186
187
val messages = ssc.socketTextStream("localhost", 9999)
188
189
messages.foreachRDD { rdd =>
190
rdd.foreachPartition { partition =>
191
val props = new Properties()
192
props.put("bootstrap.servers", "localhost:9092")
193
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
194
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
195
196
val producer = new KafkaProducer[String, String](props)
197
198
partition.foreach { message =>
199
val record = new ProducerRecord[String, String]("output-topic", message)
200
producer.send(record)
201
}
202
203
producer.close()
204
}
205
}
206
```
207
208
## Advanced Output Patterns
209
210
### Batch-aware Output
211
212
Output with batch time information:
213
```scala
214
val timestampedData = ssc.socketTextStream("localhost", 9999)
215
216
timestampedData.foreachRDD { (rdd, time) =>
217
val batchTime = time.milliseconds
218
val outputPath = s"output/batch-$batchTime"
219
220
if (!rdd.isEmpty()) {
221
rdd.coalesce(1).saveAsTextFile(outputPath)
222
}
223
}
224
```
225
226
### Conditional Output
227
228
Output only when conditions are met:
229
```scala
230
val filteredData = ssc.socketTextStream("localhost", 9999)
231
.filter(_.nonEmpty)
232
233
filteredData.foreachRDD { rdd =>
234
val count = rdd.count()
235
236
if (count > 100) {
237
println(s"Processing large batch: $count records")
238
rdd.saveAsTextFile(s"large-batches/batch-${System.currentTimeMillis()}")
239
} else if (count > 0) {
240
println(s"Processing small batch: $count records")
241
rdd.collect().foreach(println)
242
}
243
}
244
```
245
246
### Multi-destination Output
247
248
Output to multiple destinations:
249
```scala
250
val processedData = ssc.socketTextStream("localhost", 9999)
251
.map(processLine)
252
253
processedData.foreachRDD { rdd =>
254
// Cache for multiple outputs
255
rdd.cache()
256
257
// Output to file system
258
if (!rdd.isEmpty()) {
259
rdd.saveAsTextFile(s"output/batch-${System.currentTimeMillis()}")
260
}
261
262
// Output to database
263
rdd.foreachPartition { partition =>
264
// Database write logic
265
writeToDatabase(partition)
266
}
267
268
// Output metrics
269
val count = rdd.count()
270
println(s"Processed $count records")
271
272
// Unpersist to free memory
273
rdd.unpersist()
274
}
275
```
276
277
## Output Operation Properties
278
279
### Exactly-once Semantics
280
281
Ensure exactly-once output semantics:
282
```scala
283
val idempotentOutput = ssc.socketTextStream("localhost", 9999)
284
285
idempotentOutput.foreachRDD { (rdd, time) =>
286
val batchId = time.milliseconds
287
288
// Check if batch already processed
289
if (!isProcessed(batchId)) {
290
rdd.foreachPartition { partition =>
291
// Atomic write operation
292
writeAtomically(partition, batchId)
293
}
294
markProcessed(batchId)
295
}
296
}
297
298
def isProcessed(batchId: Long): Boolean = {
299
// Check external system for processing status
300
checkProcessingStatus(batchId)
301
}
302
303
def markProcessed(batchId: Long): Unit = {
304
// Mark batch as processed in external system
305
updateProcessingStatus(batchId)
306
}
307
```
308
309
### Error Handling in Output
310
311
Handle output errors gracefully:
312
```scala
313
val reliableOutput = ssc.socketTextStream("localhost", 9999)
314
315
reliableOutput.foreachRDD { rdd =>
316
rdd.foreachPartition { partition =>
317
var retries = 3
318
var success = false
319
320
while (retries > 0 && !success) {
321
try {
322
writeToExternalSystem(partition)
323
success = true
324
} catch {
325
case e: Exception =>
326
retries -= 1
327
if (retries > 0) {
328
Thread.sleep(1000) // Wait before retry
329
} else {
330
// Log error and potentially write to dead letter queue
331
logError(e, partition)
332
}
333
}
334
}
335
}
336
}
337
```
338
339
### Performance Optimization
340
341
Optimize output operations:
342
```scala
343
val optimizedOutput = ssc.socketTextStream("localhost", 9999)
344
.filter(_.nonEmpty)
345
346
optimizedOutput.foreachRDD { rdd =>
347
// Coalesce to reduce number of output files
348
val coalescedRDD = if (rdd.getNumPartitions > 10) {
349
rdd.coalesce(10)
350
} else {
351
rdd
352
}
353
354
// Only process non-empty RDDs
355
if (!coalescedRDD.isEmpty()) {
356
coalescedRDD.saveAsTextFile(s"output/${System.currentTimeMillis()}")
357
}
358
}
359
```
360
361
## Output Operation Best Practices
362
363
### Connection Management
364
365
Manage external connections efficiently:
366
```scala
367
val dataStream = ssc.socketTextStream("localhost", 9999)
368
369
dataStream.foreachRDD { rdd =>
370
rdd.foreachPartition { partition =>
371
// Create connection per partition (not per record)
372
val connection = createConnection()
373
374
try {
375
partition.foreach { record =>
376
writeRecord(connection, record)
377
}
378
} finally {
379
connection.close()
380
}
381
}
382
}
383
```
384
385
### Batch Processing
386
387
Process records in batches for better performance:
388
```scala
389
val batchedOutput = ssc.socketTextStream("localhost", 9999)
390
391
batchedOutput.foreachRDD { rdd =>
392
rdd.foreachPartition { partition =>
393
val batchSize = 1000
394
val buffer = scala.collection.mutable.ArrayBuffer[String]()
395
396
partition.foreach { record =>
397
buffer += record
398
399
if (buffer.size >= batchSize) {
400
writeBatch(buffer.toList)
401
buffer.clear()
402
}
403
}
404
405
// Write remaining records
406
if (buffer.nonEmpty) {
407
writeBatch(buffer.toList)
408
}
409
}
410
}
411
```