Apache Spark Streaming integration with Apache Flume for collecting, aggregating, and moving large amounts of log data
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume-2-11@2.2.0Apache Spark Streaming integration with Apache Flume provides comprehensive real-time data ingestion capabilities. This library offers both push-based and pull-based approaches for integrating Flume data streams with Spark Streaming applications, supporting reliable, fault-tolerant data processing pipelines.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.2.3</version>
</dependency>import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.storage.StorageLevelFor Java API:
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(conf, Seconds(10))
// Create stream that receives data from Flume agent
val flumeStream = FlumeUtils.createStream(
ssc,
"localhost", // hostname where Spark receiver will listen
9999, // port where Spark receiver will listen
StorageLevel.MEMORY_AND_DISK_SER_2
)
// Process the events
flumeStream.map(_.event.getBody.array()).print()import java.net.InetSocketAddress
// Using the same ssc from above example
// Create polling stream that pulls data from SparkSink
val pollingStream = FlumeUtils.createPollingStream(
ssc,
Seq(new InetSocketAddress("flume-host", 9988)), // SparkSink addresses
StorageLevel.MEMORY_AND_DISK_SER_2
)
// Process the events
pollingStream.map(_.event.getBody.array()).print()The Spark Streaming Flume integration provides two distinct data ingestion patterns:
Push-based (FlumeInputDStream): Flume agents push data to Spark Streaming receivers configured as Avro agents. Simple setup but less reliable.
Pull-based (FlumePollingInputDStream): Spark Streaming polls custom SparkSink deployed on Flume agents. More reliable with transaction support and better fault tolerance.
Creates input streams where Flume acts as client pushing data to Spark receivers.
// Scala API with default storage level
def createStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]
// Scala API with compression support
def createStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent]// Java API with default storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port
)
// Java API with custom storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel
)
// Java API with compression support
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel,
boolean enableDecompression
)Creates input streams that poll SparkSink for data with better reliability guarantees.
// Scala API with single address
def createPollingStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]
// Scala API with multiple addresses
def createPollingStream(
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent]
// Scala API with full configuration
def createPollingStream(
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent]// Java API with single address
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
String hostname,
int port
)
// Java API with custom storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel
)
// Java API with multiple addresses
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
Array[InetSocketAddress] addresses,
StorageLevel storageLevel
)
// Java API with full configuration
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
Array[InetSocketAddress] addresses,
StorageLevel storageLevel,
int maxBatchSize,
int parallelism
)Process SparkFlumeEvent objects received from Flume agents.
class SparkFlumeEvent() extends Externalizable {
var event: AvroFlumeEvent
def readExternal(in: ObjectInput): Unit
def writeExternal(out: ObjectOutput): Unit
}
object SparkFlumeEvent {
def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
}// Extract event body as byte array
flumeStream.map(sparkEvent => {
val body = sparkEvent.event.getBody.array()
new String(body, "UTF-8")
})
// Extract event headers
flumeStream.map(sparkEvent => {
val headers = sparkEvent.event.getHeaders
headers.asScala.toMap
})
// Process both headers and body
flumeStream.map(sparkEvent => {
val event = sparkEvent.event
val bodyString = new String(event.getBody.array(), "UTF-8")
val headerMap = event.getHeaders.asScala.toMap
(bodyString, headerMap)
})import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
// Flume-specific types
import org.apache.flume.source.avro.AvroFlumeEvent
import java.net.InetSocketAddress
import java.io.{Externalizable, ObjectInput, ObjectOutput}Choose appropriate storage levels based on your reliability and performance requirements:
import org.apache.spark.storage.StorageLevel
// Default - serialized, replicated, disk + memory
StorageLevel.MEMORY_AND_DISK_SER_2
// Memory only, replicated
StorageLevel.MEMORY_ONLY_2
// Disk only, replicated
StorageLevel.DISK_ONLY_2
// Memory and disk, not serialized, replicated
StorageLevel.MEMORY_AND_DISK_2The pull-based streaming API uses the following default values:
// Default constants from FlumeUtils
DEFAULT_POLLING_BATCH_SIZE = 1000 // events per batch
DEFAULT_POLLING_PARALLELISM = 5 // concurrent connections// High throughput configuration
FlumeUtils.createPollingStream(
ssc,
addresses,
StorageLevel.MEMORY_AND_DISK_SER_2,
maxBatchSize = 2000, // Larger batches
parallelism = 10 // More concurrent connections
)
// Conservative configuration
FlumeUtils.createPollingStream(
ssc,
addresses,
StorageLevel.MEMORY_AND_DISK_SER_2,
maxBatchSize = 500, // Smaller batches
parallelism = 2 // Fewer connections
)Both stream types handle failures at different levels:
// Add error handling for stream processing
flumeStream.foreachRDD { rdd =>
try {
rdd.collect().foreach { sparkEvent =>
// Process event
processEvent(sparkEvent)
}
} catch {
case e: Exception =>
logError("Failed to process Flume events", e)
// Handle error appropriately
}
}