class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
// Format specification
def format(source: String): DataStreamReader
// Schema definition
def schema(schema: StructType): DataStreamReader
def schema(schemaString: String): DataStreamReader
// Options configuration
def option(key: String, value: String): DataStreamReader
def option(key: String, value: Boolean): DataStreamReader
def option(key: String, value: Long): DataStreamReader
def option(key: String, value: Double): DataStreamReader
def options(options: scala.collection.Map[String, String]): DataStreamReader
def options(options: java.util.Map[String, String]): DataStreamReader
// Load operations
def load(): DataFrame
def load(path: String): DataFrame
// Format-specific loaders
def json(path: String): DataFrame
def csv(path: String): DataFrame
def parquet(path: String): DataFrame
def orc(path: String): DataFrame
def text(path: String): DataFrame
def textFile(path: String): Dataset[String]
// Kafka loader
def kafka(): DataFrame
// Socket and rate sources
def socket(host: String, port: Int): DataFrame
def socket(host: String, port: Int, includeTimestamp: Boolean): DataFrame
def rate(rowsPerSecond: Long): DataFrame
def rate(rowsPerSecond: Long, rampUpTime: Long): DataFrame
def rate(rowsPerSecond: Long, rampUpTime: Long, numPartitions: Int): DataFrame
}class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
// Format specification
def format(source: String): DataStreamWriter[T]
// Output mode configuration
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
def outputMode(outputMode: String): DataStreamWriter[T]
// Trigger configuration
def trigger(trigger: Trigger): DataStreamWriter[T]
// Options configuration
def option(key: String, value: String): DataStreamWriter[T]
def option(key: String, value: Boolean): DataStreamWriter[T]
def option(key: String, value: Long): DataStreamWriter[T]
def option(key: String, value: Double): DataStreamWriter[T]
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
// Partitioning and ordering
def partitionBy(colNames: String*): DataStreamWriter[T]
def partitionBy(cols: Seq[String]): DataStreamWriter[T]
// Query naming and checkpointing
def queryName(queryName: String): DataStreamWriter[T]
def queryTimeout(timeoutMs: Long): DataStreamWriter[T]
// Start operations
def start(): StreamingQuery
def start(path: String): StreamingQuery
// Format-specific writers
def json(path: String): StreamingQuery
def csv(path: String): StreamingQuery
def parquet(path: String): StreamingQuery
def orc(path: String): StreamingQuery
def text(path: String): StreamingQuery
// Special sinks
def console(): StreamingQuery
def console(numRows: Int): StreamingQuery
def console(numRows: Int, truncate: Boolean): StreamingQuery
def memory(queryName: String): StreamingQuery
def kafka(): StreamingQuery
// Custom sink
def foreach(writer: ForeachWriter[T]): StreamingQuery
def foreachBatch(function: (Dataset[T], Long) => Unit): StreamingQuery
def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): StreamingQuery
}trait StreamingQuery {
// Query identification
def id: UUID
def runId: UUID
def name: String
// Query control
def start(): StreamingQuery
def stop(): Unit
def stop(stopGracefully: Boolean): Unit
def processAllAvailable(): Unit
// Query state
def isActive: Boolean
def awaitTermination(): Unit
def awaitTermination(timeoutMs: Long): Boolean
def exception: Option[StreamingQueryException]
// Progress monitoring
def lastProgress: StreamingQueryProgress
def recentProgress: Array[StreamingQueryProgress]
def status: StreamingQueryStatus
// Explain plans
def explain(): Unit
def explain(extended: Boolean): Unit
}abstract class StreamingQueryManager {
// Active queries management
def active: Array[StreamingQuery]
def get(id: UUID): StreamingQuery
def get(name: String): StreamingQuery
// Termination handling
def awaitAnyTermination(): Unit
def awaitAnyTermination(timeoutMs: Long): Boolean
def resetTerminated(): Unit
// Listeners
def addListener(listener: StreamingQueryListener): Unit
def removeListener(listener: StreamingQueryListener): Unit
}// Base trigger trait
sealed trait Trigger
// Processing time trigger
case class ProcessingTimeTrigger(interval: Long) extends Trigger
object Trigger {
// Factory methods
def ProcessingTime(interval: String): Trigger
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
def ProcessingTime(interval: Duration): Trigger
def Once(): Trigger
def Continuous(interval: String): Trigger
def Continuous(interval: Long, unit: TimeUnit): Trigger
def Continuous(interval: Duration): Trigger
def AvailableNow(): Trigger
}
// Specific trigger implementations
case object OnceTrigger extends Trigger
case class ContinuousTrigger(interval: Long) extends Trigger
case object AvailableNowTrigger extends Triggersealed trait OutputMode
object OutputMode {
case object Append extends OutputMode
case object Complete extends OutputMode
case object Update extends OutputMode
def apply(outputMode: String): OutputMode
}// Group state for stateful operations
abstract class GroupState[S] extends Serializable {
// State access
def exists: Boolean
def get: S
def getOption: Option[S]
def update(newState: S): Unit
def remove(): Unit
// Timeout management
def setTimeoutDuration(durationMs: Long): Unit
def setTimeoutDuration(duration: String): Unit
def setTimeoutTimestamp(timestampMs: Long): Unit
def setTimeoutTimestamp(timestamp: Date): Unit
def getCurrentWatermarkMs(): Long
def getCurrentProcessingTimeMs(): Long
def hasTimedOut: Boolean
}
// State timeout configuration
sealed trait GroupStateTimeout
case object NoTimeout extends GroupStateTimeout
case object ProcessingTimeTimeout extends GroupStateTimeout
case object EventTimeTimeout extends GroupStateTimeout// Query progress information
class StreamingQueryProgress private[sql] (
val id: UUID,
val runId: UUID,
val name: String,
val timestamp: String,
val batchId: Long,
val batchDuration: Long,
val durationMs: Map[String, Long],
val eventTime: Map[String, String],
val stateOperators: Array[StateOperatorProgress],
val sources: Array[SourceProgress],
val sink: SinkProgress,
val observedMetrics: Map[String, Row]) extends Serializable {
def inputRowsPerSecond: Double
def processedRowsPerSecond: Double
def prettyJson: String
def json: String
}
// Query status information
class StreamingQueryStatus private[sql] (
val message: String,
val isDataAvailable: Boolean,
val isTriggerActive: Boolean) extends Serializable
// State operator progress
class StateOperatorProgress private[sql] (
val operatorName: String,
val numRowsTotal: Long,
val numRowsUpdated: Long,
val memoryUsedBytes: Long,
val customMetrics: Map[String, Long] = Map.empty) extends Serializable
// Source progress
class SourceProgress private[sql] (
val description: String,
val startOffset: String,
val endOffset: String,
val numInputRows: Long,
val inputRowsPerSecond: Double,
val processedRowsPerSecond: Double,
val metrics: Map[String, String] = Map.empty) extends Serializable
// Sink progress
class SinkProgress private[sql] (
val description: String,
val numOutputRows: Long,
val metrics: Map[String, String] = Map.empty) extends Serializable// ForeachWriter for custom output
abstract class ForeachWriter[T] extends Serializable {
// Lifecycle methods
def open(partitionId: Long, epochId: Long): Boolean
def process(value: T): Unit
def close(errorOrNull: Throwable): Unit
}
// Simplified batch writer
abstract class ForeachBatchSink[T] extends Serializable {
def process(batchDF: Dataset[T], batchId: Long): Unit
}// Watermark functions (in Dataset class)
class Dataset[T] {
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
}
// Watermark utilities
object Functions {
def window(timeColumn: Column, windowDuration: String): Column
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
def session_window(timeColumn: Column, gapDuration: String): Column
}import org.apache.spark.sql.{SparkSession, Dataset, DataFrame}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val spark = SparkSession.builder()
.appName("Streaming Example")
.config("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint")
.getOrCreate()
// Define schema for streaming data
val userActivitySchema = StructType(Array(
StructField("user_id", StringType, nullable = false),
StructField("action", StringType, nullable = false),
StructField("timestamp", TimestampType, nullable = false),
StructField("session_id", StringType, nullable = false),
StructField("page_url", StringType, nullable = true),
StructField("duration", IntegerType, nullable = true)
))
// Read from JSON files
val inputStream = spark.readStream
.format("json")
.schema(userActivitySchema)
.option("maxFilesPerTrigger", "1")
.load("/path/to/streaming/data")
// Basic transformations
val processedStream = inputStream
.filter($"action" =!= "heartbeat")
.withColumn("hour", hour($"timestamp"))
.withColumn("date", to_date($"timestamp"))// CSV streaming
val csvStream = spark.readStream
.format("csv")
.option("header", "true")
.option("inferSchema", "false")
.schema(userActivitySchema)
.load("/path/to/csv/files")
// Parquet streaming
val parquetStream = spark.readStream
.format("parquet")
.schema(userActivitySchema)
.load("/path/to/parquet/files")
// Text file streaming
val textStream = spark.readStream
.format("text")
.load("/path/to/text/files")
.select(
regexp_extract($"value", """(\w+)\s+(.+)""", 1).as("level"),
regexp_extract($"value", """(\w+)\s+(.+)""", 2).as("message"),
current_timestamp().as("processed_time")
)// Read from Kafka
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-activity,page-views")
.option("startingOffsets", "latest") // or "earliest"
.option("failOnDataLoss", "false")
.load()
// Parse Kafka messages
val parsedKafkaStream = kafkaStream.select(
$"topic",
$"partition",
$"offset",
$"timestamp",
$"key".cast(StringType).as("message_key"),
from_json($"value".cast(StringType), userActivitySchema).as("data")
).select($"topic", $"partition", $"offset", $"timestamp", $"message_key", $"data.*")
// Write to Kafka
val kafkaOutput = processedStream.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "processed-activity")
.option("checkpointLocation", "/path/to/kafka/checkpoint")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()// Simple aggregations
val userCounts = inputStream
.groupBy($"user_id")
.count()
// Time-based windowing
val windowedCounts = inputStream
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "5 minutes", "1 minute"),
$"action"
)
.agg(
count("*").as("action_count"),
countDistinct("user_id").as("unique_users"),
avg("duration").as("avg_duration")
)
// Session windows
val sessionAnalysis = inputStream
.withWatermark("timestamp", "30 minutes")
.groupBy(
$"user_id",
session_window($"timestamp", "10 minutes")
)
.agg(
count("*").as("actions_in_session"),
sum("duration").as("total_session_time"),
collect_list("action").as("session_actions")
)
// Complex aggregations with multiple time windows
val multiWindowAnalysis = inputStream
.withWatermark("timestamp", "1 hour")
.groupBy(
$"user_id",
window($"timestamp", "1 hour"),
$"action"
)
.agg(
count("*").as("hourly_count"),
first("timestamp").as("first_action_time"),
last("timestamp").as("last_action_time")
)
.groupBy($"user_id", $"window")
.agg(
sum("hourly_count").as("total_actions"),
collect_map("action", "hourly_count").as("action_breakdown")
)case class UserSession(
userId: String,
startTime: Long,
lastActivity: Long,
actionCount: Int,
totalDuration: Int
)
// Stateful session tracking
val sessionTracking = inputStream
.groupByKey(_.getString(0)) // Group by user_id
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout) {
(userId: String, values: Iterator[Row], state: GroupState[UserSession]) =>
val events = values.toSeq
val now = System.currentTimeMillis()
// Get or create session state
val session = if (state.exists) {
state.get
} else {
UserSession(userId, now, now, 0, 0)
}
// Update session with new events
val updatedSession = events.foldLeft(session) { (s, event) =>
val eventTime = event.getAs[java.sql.Timestamp]("timestamp").getTime
val duration = Option(event.getAs[Integer]("duration")).map(_.toInt).getOrElse(0)
s.copy(
lastActivity = math.max(s.lastActivity, eventTime),
actionCount = s.actionCount + 1,
totalDuration = s.totalDuration + duration
)
}
// Set timeout for 30 minutes of inactivity
state.setTimeoutDuration("30 minutes")
// Check for session timeout
if (state.hasTimedOut) {
state.remove()
("session_expired", updatedSession)
} else {
state.update(updatedSession)
("session_active", updatedSession)
}
}// Define two streams
val clickStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "clicks")
.load()
.select(
from_json($"value".cast("string"), clickSchema).as("data")
)
.select($"data.*")
.withWatermark("timestamp", "2 hours")
val impressionStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "impressions")
.load()
.select(
from_json($"value".cast("string"), impressionSchema).as("data")
)
.select($"data.*")
.withWatermark("timestamp", "3 hours")
// Join streams with time constraints
val joinedStream = impressionStream.join(
clickStream,
expr("""
impression_id = click_impression_id AND
click_timestamp >= impression_timestamp AND
click_timestamp <= impression_timestamp + interval 1 hour
"""),
joinType = "leftOuter"
)
// Aggregate joined data
val conversionAnalysis = joinedStream
.withWatermark("impression_timestamp", "1 hour")
.groupBy(
window($"impression_timestamp", "10 minutes"),
$"campaign_id"
)
.agg(
count("impression_id").as("impressions"),
count("click_impression_id").as("clicks"),
(count("click_impression_id") * 100.0 / count("impression_id")).as("ctr")
)// Append mode - only new rows
val appendQuery = processedStream.writeStream
.outputMode(OutputMode.Append)
.format("parquet")
.option("path", "/path/to/output")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("1 minute"))
.start()
// Complete mode - entire result table
val completeQuery = userCounts.writeStream
.outputMode(OutputMode.Complete)
.format("memory")
.queryName("user_counts_table")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
// Update mode - only changed rows
val updateQuery = windowedCounts.writeStream
.outputMode(OutputMode.Update)
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("2 minutes"))
.start()
// Once trigger - single micro-batch
val onceQuery = processedStream.writeStream
.outputMode(OutputMode.Append)
.format("json")
.option("path", "/path/to/batch/output")
.trigger(Trigger.Once)
.start()
// Available now trigger - process all available data
val availableNowQuery = processedStream.writeStream
.outputMode(OutputMode.Append)
.format("delta")
.option("path", "/path/to/delta/table")
.trigger(Trigger.AvailableNow)
.start()
// Continuous processing (experimental)
val continuousQuery = inputStream
.filter($"action" === "purchase")
.writeStream
.outputMode(OutputMode.Append)
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()// Custom ForeachWriter
class DatabaseWriter extends ForeachWriter[Row] {
var connection: java.sql.Connection = _
var statement: java.sql.PreparedStatement = _
override def open(partitionId: Long, epochId: Long): Boolean = {
// Initialize database connection
connection = java.sql.DriverManager.getConnection(
"jdbc:postgresql://localhost/mydb", "user", "password"
)
statement = connection.prepareStatement(
"INSERT INTO user_activity (user_id, action, timestamp) VALUES (?, ?, ?)"
)
true
}
override def process(value: Row): Unit = {
statement.setString(1, value.getString(0))
statement.setString(2, value.getString(1))
statement.setTimestamp(3, value.getTimestamp(2))
statement.executeUpdate()
}
override def close(errorOrNull: Throwable): Unit = {
if (statement != null) statement.close()
if (connection != null) connection.close()
}
}
// Use custom writer
val customSinkQuery = processedStream.writeStream
.foreach(new DatabaseWriter())
.option("checkpointLocation", "/path/to/custom/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
// ForeachBatch for batch processing
val foreachBatchQuery = processedStream.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
println(s"Processing batch $batchId with ${batchDF.count()} records")
// Custom processing logic
batchDF.cache()
// Write to multiple sinks
batchDF.write
.mode("append")
.parquet(s"/path/to/archive/batch_$batchId")
batchDF.write
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/mydb")
.option("dbtable", "processed_activity")
.mode("append")
.save()
batchDF.unpersist()
}
.option("checkpointLocation", "/path/to/batch/checkpoint")
.start()// Start multiple queries
val queries = Array(appendQuery, completeQuery, updateQuery)
// Monitor query status
queries.foreach { query =>
println(s"Query ${query.name} - Active: ${query.isActive}")
if (query.lastProgress != null) {
println(s"Batch ${query.lastProgress.batchId} processed ${query.lastProgress.inputRowsPerSecond} rows/sec")
}
}
// Wait for termination
spark.streams.awaitAnyTermination()
// Query manager operations
val activeQueries = spark.streams.active
println(s"Number of active queries: ${activeQueries.length}")
// Get specific query by name
val specificQuery = spark.streams.get("user_counts_table")
// Add streaming query listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println(s"Query started: ${queryStarted.name}")
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println(s"Query terminated: ${queryTerminated.id}")
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
val progress = queryProgress.progress
println(s"Query ${progress.name}: processed ${progress.inputRowsPerSecond} rows/sec")
}
})
// Graceful shutdown
sys.addShutdownHook {
println("Stopping streaming queries...")
spark.streams.active.foreach(_.stop())
spark.stop()
}