Spark Streaming integration with Apache Flume for real-time data ingestion from Flume agents
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume_2-10@1.6.0Spark Streaming Flume provides seamless integration between Apache Spark Streaming and Apache Flume for real-time data ingestion. It offers two complementary patterns: push-based receivers where Flume agents push data directly to Spark, and pull-based polling where Spark actively pulls data from Flume sinks for enhanced reliability.
org.apache.spark:spark-streaming-flume_2.10:1.6.3Scala:
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.flume.SparkFlumeEvent
import org.apache.spark.storage.StorageLevel
import java.net.InetSocketAddress
import scala.collection.JavaConverters._
// For direct event manipulation (from Apache Flume dependency)
import org.apache.flume.source.avro.AvroFlumeEventJava:
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;
import java.net.InetSocketAddress;
// For direct event manipulation (from Apache Flume dependency)
import org.apache.flume.source.avro.AvroFlumeEvent;import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sparkConf, Seconds(5))
// Create Flume stream - Flume pushes data to this receiver
val flumeStream = FlumeUtils.createStream(
ssc,
"localhost", // hostname where receiver listens
9999, // port where receiver listens
StorageLevel.MEMORY_AND_DISK_SER_2
)
// Process the stream
flumeStream.map(sparkFlumeEvent => {
val event = sparkFlumeEvent.event
new String(event.getBody.array())
}).print()
ssc.start()
ssc.awaitTermination()import java.net.InetSocketAddress
import org.apache.spark.streaming.flume.FlumeUtils
val ssc = new StreamingContext(sparkConf, Seconds(5))
// Create polling stream - Spark pulls data from Flume sink
val pollingStream = FlumeUtils.createPollingStream(
ssc,
Seq(new InetSocketAddress("flume-host", 9090)), // Flume sink addresses
StorageLevel.MEMORY_AND_DISK_SER_2,
1000, // maxBatchSize
5 // parallelism
)
// Process the stream
pollingStream.map(sparkFlumeEvent => {
val headers = sparkFlumeEvent.event.getHeaders
val body = new String(sparkFlumeEvent.event.getBody.array())
s"Headers: $headers, Body: $body"
}).print()
ssc.start()
ssc.awaitTermination()The Spark Streaming Flume integration is built around several key components:
Creates input streams where Flume agents push data directly to Spark Streaming receivers using Avro RPC protocol.
object FlumeUtils {
/**
* Create a push-based input stream from a Flume source with default storage level
* @param ssc StreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]
/**
* Create a push-based input stream with compression support
* @param ssc StreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @param storageLevel Storage level for received objects
* @param enableDecompression Enable Netty decompression for incoming data
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent]
}Java API:
/**
* Create a push-based input stream from a Flume source (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port
);
/**
* Create a push-based input stream with custom storage level (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @param storageLevel Storage level for received objects
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel
);
/**
* Create a push-based input stream with compression support (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @param storageLevel Storage level for received objects
* @param enableDecompression Enable Netty decompression for incoming data
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel,
boolean enableDecompression
);Creates input streams that actively poll Flume sinks for data, providing better reliability through transaction support and acknowledgments.
/**
* Create a pull-based polling stream with default batch size and parallelism
* @param ssc StreamingContext object
* @param hostname Address of the host running the Spark Sink
* @param port Port where the Spark Sink is listening
* @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createPollingStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]
/**
* Create a pull-based polling stream with multiple sink addresses
* @param ssc StreamingContext object
* @param addresses List of InetSocketAddress representing Spark Sink hosts
* @param storageLevel Storage level for received objects
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createPollingStream(
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent]
/**
* Create a pull-based polling stream with full configuration
* @param ssc StreamingContext object
* @param addresses List of InetSocketAddress representing Spark Sink hosts
* @param storageLevel Storage level for received objects
* @param maxBatchSize Maximum number of events per RPC call (default: 1000)
* @param parallelism Number of concurrent requests to the sink (default: 5)
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createPollingStream(
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent]Java API:
/**
* Create a pull-based polling stream (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Address of the host running the Spark Sink
* @param port Port where the Spark Sink is listening
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
String hostname,
int port
);
/**
* Create a pull-based polling stream with custom storage level (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Address of the host running the Spark Sink
* @param port Port where the Spark Sink is listening
* @param storageLevel Storage level for received objects
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel
);
/**
* Create a pull-based polling stream with multiple sinks (Java API)
* @param jssc JavaStreamingContext object
* @param addresses Array of InetSocketAddress representing Spark Sink hosts
* @param storageLevel Storage level for received objects
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
InetSocketAddress[] addresses,
StorageLevel storageLevel
);
/**
* Create a pull-based polling stream with full configuration (Java API)
* @param jssc JavaStreamingContext object
* @param addresses Array of InetSocketAddress representing Spark Sink hosts
* @param storageLevel Storage level for received objects
* @param maxBatchSize Maximum number of events per RPC call
* @param parallelism Number of concurrent requests to the sink
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
InetSocketAddress[] addresses,
StorageLevel storageLevel,
int maxBatchSize,
int parallelism
);Serializable wrapper for Flume events that can be processed in Spark transformations.
/**
* Serializable wrapper for AvroFlumeEvent with custom serialization
*/
class SparkFlumeEvent extends Externalizable {
/** The wrapped Flume event containing headers and body (mutable) */
var event: AvroFlumeEvent = new AvroFlumeEvent()
/** Deserialize from ObjectInput */
def readExternal(in: ObjectInput): Unit
/** Serialize to ObjectOutput */
def writeExternal(out: ObjectOutput): Unit
}
object SparkFlumeEvent {
/**
* Create SparkFlumeEvent from AvroFlumeEvent
* @param in AvroFlumeEvent to wrap
* @return SparkFlumeEvent instance
*/
def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
}Flume's standard event structure containing headers and body data. This class is provided by Apache Flume and imported from org.apache.flume.source.avro.AvroFlumeEvent.
/**
* Flume event structure (from Apache Flume library)
* Import: org.apache.flume.source.avro.AvroFlumeEvent
*/
class AvroFlumeEvent {
/** Get event headers as a Map */
def getHeaders(): java.util.Map[CharSequence, CharSequence]
/** Set event headers */
def setHeaders(headers: java.util.Map[CharSequence, CharSequence]): Unit
/** Get event body as ByteBuffer */
def getBody(): java.nio.ByteBuffer
/** Set event body */
def setBody(body: java.nio.ByteBuffer): Unit
}// Extract body as string
val bodyText = new String(sparkFlumeEvent.event.getBody.array())
// Extract specific header
val timestamp = sparkFlumeEvent.event.getHeaders.get("timestamp")
// Process headers and body together
val processedData = sparkFlumeEvent.event match {
case event =>
val headers = event.getHeaders.asScala.toMap
val body = new String(event.getBody.array())
(headers, body)
}Both integration patterns provide different reliability guarantees:
Push-based streams: Use at-least-once delivery semantics. If the receiver fails, Flume will retry sending events, potentially causing duplicates.
Pull-based streams: Provide exactly-once semantics through transaction support with ACK/NACK responses. Failed batches are automatically retried by Flume.
Common error scenarios:
Key configuration parameters for optimal performance:
Storage Levels:
MEMORY_ONLY: Fastest access, risk of data lossMEMORY_AND_DISK_SER_2: Balanced performance and fault tolerance (default)MEMORY_AND_DISK_SER: Alternative serialized storage with single replicationDISK_ONLY: Maximum fault tolerance, slower accessPolling Configuration:
maxBatchSize: Larger batches reduce RPC overhead but increase memory usage (default: 1000)parallelism: Higher parallelism increases throughput but uses more resources (default: 5)