Spark Streaming integration with Apache Flume for real-time data ingestion from Flume agents
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Spark Streaming Flume provides seamless integration between Apache Spark Streaming and Apache Flume for real-time data ingestion. It offers two complementary patterns: push-based receivers where Flume agents push data directly to Spark, and pull-based polling where Spark actively pulls data from Flume sinks for enhanced reliability.
org.apache.spark:spark-streaming-flume_2.10:1.6.3Scala:
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.flume.SparkFlumeEvent
import org.apache.spark.storage.StorageLevel
import java.net.InetSocketAddress
import scala.collection.JavaConverters._
// For direct event manipulation (from Apache Flume dependency)
import org.apache.flume.source.avro.AvroFlumeEventJava:
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;
import java.net.InetSocketAddress;
// For direct event manipulation (from Apache Flume dependency)
import org.apache.flume.source.avro.AvroFlumeEvent;import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sparkConf, Seconds(5))
// Create Flume stream - Flume pushes data to this receiver
val flumeStream = FlumeUtils.createStream(
ssc,
"localhost", // hostname where receiver listens
9999, // port where receiver listens
StorageLevel.MEMORY_AND_DISK_SER_2
)
// Process the stream
flumeStream.map(sparkFlumeEvent => {
val event = sparkFlumeEvent.event
new String(event.getBody.array())
}).print()
ssc.start()
ssc.awaitTermination()import java.net.InetSocketAddress
import org.apache.spark.streaming.flume.FlumeUtils
val ssc = new StreamingContext(sparkConf, Seconds(5))
// Create polling stream - Spark pulls data from Flume sink
val pollingStream = FlumeUtils.createPollingStream(
ssc,
Seq(new InetSocketAddress("flume-host", 9090)), // Flume sink addresses
StorageLevel.MEMORY_AND_DISK_SER_2,
1000, // maxBatchSize
5 // parallelism
)
// Process the stream
pollingStream.map(sparkFlumeEvent => {
val headers = sparkFlumeEvent.event.getHeaders
val body = new String(sparkFlumeEvent.event.getBody.array())
s"Headers: $headers, Body: $body"
}).print()
ssc.start()
ssc.awaitTermination()The Spark Streaming Flume integration is built around several key components:
Creates input streams where Flume agents push data directly to Spark Streaming receivers using Avro RPC protocol.
object FlumeUtils {
/**
* Create a push-based input stream from a Flume source with default storage level
* @param ssc StreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]
/**
* Create a push-based input stream with compression support
* @param ssc StreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @param storageLevel Storage level for received objects
* @param enableDecompression Enable Netty decompression for incoming data
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent]
}Java API:
/**
* Create a push-based input stream from a Flume source (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port
);
/**
* Create a push-based input stream with custom storage level (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @param storageLevel Storage level for received objects
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel
);
/**
* Create a push-based input stream with compression support (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Hostname where the receiver will listen
* @param port Port where the receiver will listen
* @param storageLevel Storage level for received objects
* @param enableDecompression Enable Netty decompression for incoming data
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel,
boolean enableDecompression
);Creates input streams that actively poll Flume sinks for data, providing better reliability through transaction support and acknowledgments.
/**
* Create a pull-based polling stream with default batch size and parallelism
* @param ssc StreamingContext object
* @param hostname Address of the host running the Spark Sink
* @param port Port where the Spark Sink is listening
* @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createPollingStream(
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]
/**
* Create a pull-based polling stream with multiple sink addresses
* @param ssc StreamingContext object
* @param addresses List of InetSocketAddress representing Spark Sink hosts
* @param storageLevel Storage level for received objects
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createPollingStream(
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent]
/**
* Create a pull-based polling stream with full configuration
* @param ssc StreamingContext object
* @param addresses List of InetSocketAddress representing Spark Sink hosts
* @param storageLevel Storage level for received objects
* @param maxBatchSize Maximum number of events per RPC call (default: 1000)
* @param parallelism Number of concurrent requests to the sink (default: 5)
* @return ReceiverInputDStream of SparkFlumeEvent objects
*/
def createPollingStream(
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent]Java API:
/**
* Create a pull-based polling stream (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Address of the host running the Spark Sink
* @param port Port where the Spark Sink is listening
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
String hostname,
int port
);
/**
* Create a pull-based polling stream with custom storage level (Java API)
* @param jssc JavaStreamingContext object
* @param hostname Address of the host running the Spark Sink
* @param port Port where the Spark Sink is listening
* @param storageLevel Storage level for received objects
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
String hostname,
int port,
StorageLevel storageLevel
);
/**
* Create a pull-based polling stream with multiple sinks (Java API)
* @param jssc JavaStreamingContext object
* @param addresses Array of InetSocketAddress representing Spark Sink hosts
* @param storageLevel Storage level for received objects
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
InetSocketAddress[] addresses,
StorageLevel storageLevel
);
/**
* Create a pull-based polling stream with full configuration (Java API)
* @param jssc JavaStreamingContext object
* @param addresses Array of InetSocketAddress representing Spark Sink hosts
* @param storageLevel Storage level for received objects
* @param maxBatchSize Maximum number of events per RPC call
* @param parallelism Number of concurrent requests to the sink
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
*/
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
JavaStreamingContext jssc,
InetSocketAddress[] addresses,
StorageLevel storageLevel,
int maxBatchSize,
int parallelism
);Serializable wrapper for Flume events that can be processed in Spark transformations.
/**
* Serializable wrapper for AvroFlumeEvent with custom serialization
*/
class SparkFlumeEvent extends Externalizable {
/** The wrapped Flume event containing headers and body (mutable) */
var event: AvroFlumeEvent = new AvroFlumeEvent()
/** Deserialize from ObjectInput */
def readExternal(in: ObjectInput): Unit
/** Serialize to ObjectOutput */
def writeExternal(out: ObjectOutput): Unit
}
object SparkFlumeEvent {
/**
* Create SparkFlumeEvent from AvroFlumeEvent
* @param in AvroFlumeEvent to wrap
* @return SparkFlumeEvent instance
*/
def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
}Flume's standard event structure containing headers and body data. This class is provided by Apache Flume and imported from org.apache.flume.source.avro.AvroFlumeEvent.
/**
* Flume event structure (from Apache Flume library)
* Import: org.apache.flume.source.avro.AvroFlumeEvent
*/
class AvroFlumeEvent {
/** Get event headers as a Map */
def getHeaders(): java.util.Map[CharSequence, CharSequence]
/** Set event headers */
def setHeaders(headers: java.util.Map[CharSequence, CharSequence]): Unit
/** Get event body as ByteBuffer */
def getBody(): java.nio.ByteBuffer
/** Set event body */
def setBody(body: java.nio.ByteBuffer): Unit
}// Extract body as string
val bodyText = new String(sparkFlumeEvent.event.getBody.array())
// Extract specific header
val timestamp = sparkFlumeEvent.event.getHeaders.get("timestamp")
// Process headers and body together
val processedData = sparkFlumeEvent.event match {
case event =>
val headers = event.getHeaders.asScala.toMap
val body = new String(event.getBody.array())
(headers, body)
}Both integration patterns provide different reliability guarantees:
Push-based streams: Use at-least-once delivery semantics. If the receiver fails, Flume will retry sending events, potentially causing duplicates.
Pull-based streams: Provide exactly-once semantics through transaction support with ACK/NACK responses. Failed batches are automatically retried by Flume.
Common error scenarios:
Key configuration parameters for optimal performance:
Storage Levels:
MEMORY_ONLY: Fastest access, risk of data lossMEMORY_AND_DISK_SER_2: Balanced performance and fault tolerance (default)MEMORY_AND_DISK_SER: Alternative serialized storage with single replicationDISK_ONLY: Maximum fault tolerance, slower accessPolling Configuration:
maxBatchSize: Larger batches reduce RPC overhead but increase memory usage (default: 1000)parallelism: Higher parallelism increases throughput but uses more resources (default: 5)