CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-flume-2-10

Spark Streaming integration with Apache Flume for real-time data ingestion from Flume agents

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Spark Streaming Flume

Spark Streaming Flume provides seamless integration between Apache Spark Streaming and Apache Flume for real-time data ingestion. It offers two complementary patterns: push-based receivers where Flume agents push data directly to Spark, and pull-based polling where Spark actively pulls data from Flume sinks for enhanced reliability.

Package Information

  • Package Name: spark-streaming-flume_2.10
  • Package Type: Maven
  • Language: Scala with Java interop
  • Installation: org.apache.spark:spark-streaming-flume_2.10:1.6.3
  • Dependencies: Requires Apache Flume libraries for event processing

Core Imports

Scala:

import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.flume.SparkFlumeEvent
import org.apache.spark.storage.StorageLevel
import java.net.InetSocketAddress
import scala.collection.JavaConverters._

// For direct event manipulation (from Apache Flume dependency)
import org.apache.flume.source.avro.AvroFlumeEvent

Java:

import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;
import java.net.InetSocketAddress;

// For direct event manipulation (from Apache Flume dependency)
import org.apache.flume.source.avro.AvroFlumeEvent;

Basic Usage

Push-based Stream (Receiver Pattern)

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(sparkConf, Seconds(5))

// Create Flume stream - Flume pushes data to this receiver
val flumeStream = FlumeUtils.createStream(
  ssc,
  "localhost",  // hostname where receiver listens
  9999,         // port where receiver listens
  StorageLevel.MEMORY_AND_DISK_SER_2
)

// Process the stream
flumeStream.map(sparkFlumeEvent => {
  val event = sparkFlumeEvent.event
  new String(event.getBody.array())
}).print()

ssc.start()
ssc.awaitTermination()

Pull-based Stream (Polling Pattern)

import java.net.InetSocketAddress
import org.apache.spark.streaming.flume.FlumeUtils

val ssc = new StreamingContext(sparkConf, Seconds(5))

// Create polling stream - Spark pulls data from Flume sink
val pollingStream = FlumeUtils.createPollingStream(
  ssc,
  Seq(new InetSocketAddress("flume-host", 9090)), // Flume sink addresses
  StorageLevel.MEMORY_AND_DISK_SER_2,
  1000,  // maxBatchSize
  5      // parallelism
)

// Process the stream
pollingStream.map(sparkFlumeEvent => {
  val headers = sparkFlumeEvent.event.getHeaders
  val body = new String(sparkFlumeEvent.event.getBody.array())
  s"Headers: $headers, Body: $body"
}).print()

ssc.start()
ssc.awaitTermination()

Architecture

The Spark Streaming Flume integration is built around several key components:

  • FlumeUtils: Main factory class providing stream creation methods for both patterns
  • SparkFlumeEvent: Serializable wrapper for Flume events that can be processed in Spark
  • Push Pattern: Flume agents use Avro RPC to push events to Spark receivers
  • Pull Pattern: Spark receivers poll Flume sinks using custom Avro protocol with transaction support
  • Storage Integration: Configurable storage levels for fault tolerance and performance tuning

Capabilities

Push-based Stream Creation

Creates input streams where Flume agents push data directly to Spark Streaming receivers using Avro RPC protocol.

object FlumeUtils {
  /**
   * Create a push-based input stream from a Flume source with default storage level
   * @param ssc StreamingContext object
   * @param hostname Hostname where the receiver will listen
   * @param port Port where the receiver will listen
   * @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
   * @return ReceiverInputDStream of SparkFlumeEvent objects
   */
  def createStream(
    ssc: StreamingContext,
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[SparkFlumeEvent]

  /**
   * Create a push-based input stream with compression support
   * @param ssc StreamingContext object
   * @param hostname Hostname where the receiver will listen
   * @param port Port where the receiver will listen
   * @param storageLevel Storage level for received objects
   * @param enableDecompression Enable Netty decompression for incoming data
   * @return ReceiverInputDStream of SparkFlumeEvent objects
   */
  def createStream(
    ssc: StreamingContext,
    hostname: String,
    port: Int,
    storageLevel: StorageLevel,
    enableDecompression: Boolean
  ): ReceiverInputDStream[SparkFlumeEvent]
}

Java API:

/**
 * Create a push-based input stream from a Flume source (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Hostname where the receiver will listen
 * @param port Port where the receiver will listen
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port
);

/**
 * Create a push-based input stream with custom storage level (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Hostname where the receiver will listen
 * @param port Port where the receiver will listen
 * @param storageLevel Storage level for received objects
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel
);

/**
 * Create a push-based input stream with compression support (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Hostname where the receiver will listen
 * @param port Port where the receiver will listen
 * @param storageLevel Storage level for received objects
 * @param enableDecompression Enable Netty decompression for incoming data
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel,
  boolean enableDecompression
);

Pull-based Stream Creation

Creates input streams that actively poll Flume sinks for data, providing better reliability through transaction support and acknowledgments.

/**
 * Create a pull-based polling stream with default batch size and parallelism
 * @param ssc StreamingContext object
 * @param hostname Address of the host running the Spark Sink
 * @param port Port where the Spark Sink is listening
 * @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
 * @return ReceiverInputDStream of SparkFlumeEvent objects
 */
def createPollingStream(
  ssc: StreamingContext,
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]

/**
 * Create a pull-based polling stream with multiple sink addresses
 * @param ssc StreamingContext object
 * @param addresses List of InetSocketAddress representing Spark Sink hosts
 * @param storageLevel Storage level for received objects
 * @return ReceiverInputDStream of SparkFlumeEvent objects
 */
def createPollingStream(
  ssc: StreamingContext,
  addresses: Seq[InetSocketAddress],
  storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent]

/**
 * Create a pull-based polling stream with full configuration
 * @param ssc StreamingContext object
 * @param addresses List of InetSocketAddress representing Spark Sink hosts
 * @param storageLevel Storage level for received objects
 * @param maxBatchSize Maximum number of events per RPC call (default: 1000)
 * @param parallelism Number of concurrent requests to the sink (default: 5)
 * @return ReceiverInputDStream of SparkFlumeEvent objects
 */
def createPollingStream(
  ssc: StreamingContext,
  addresses: Seq[InetSocketAddress],
  storageLevel: StorageLevel,
  maxBatchSize: Int,
  parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent]

Java API:

/**
 * Create a pull-based polling stream (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Address of the host running the Spark Sink
 * @param port Port where the Spark Sink is listening
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  String hostname,
  int port
);

/**
 * Create a pull-based polling stream with custom storage level (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Address of the host running the Spark Sink
 * @param port Port where the Spark Sink is listening
 * @param storageLevel Storage level for received objects
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel
);

/**
 * Create a pull-based polling stream with multiple sinks (Java API)
 * @param jssc JavaStreamingContext object
 * @param addresses Array of InetSocketAddress representing Spark Sink hosts
 * @param storageLevel Storage level for received objects
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  InetSocketAddress[] addresses,
  StorageLevel storageLevel
);

/**
 * Create a pull-based polling stream with full configuration (Java API)
 * @param jssc JavaStreamingContext object
 * @param addresses Array of InetSocketAddress representing Spark Sink hosts
 * @param storageLevel Storage level for received objects
 * @param maxBatchSize Maximum number of events per RPC call
 * @param parallelism Number of concurrent requests to the sink
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  InetSocketAddress[] addresses,
  StorageLevel storageLevel,
  int maxBatchSize,
  int parallelism
);

Types

SparkFlumeEvent

Serializable wrapper for Flume events that can be processed in Spark transformations.

/**
 * Serializable wrapper for AvroFlumeEvent with custom serialization
 */
class SparkFlumeEvent extends Externalizable {
  /** The wrapped Flume event containing headers and body (mutable) */
  var event: AvroFlumeEvent = new AvroFlumeEvent()
  
  /** Deserialize from ObjectInput */
  def readExternal(in: ObjectInput): Unit
  
  /** Serialize to ObjectOutput */
  def writeExternal(out: ObjectOutput): Unit
}

object SparkFlumeEvent {
  /**
   * Create SparkFlumeEvent from AvroFlumeEvent
   * @param in AvroFlumeEvent to wrap
   * @return SparkFlumeEvent instance
   */
  def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
}

AvroFlumeEvent

Flume's standard event structure containing headers and body data. This class is provided by Apache Flume and imported from org.apache.flume.source.avro.AvroFlumeEvent.

/**
 * Flume event structure (from Apache Flume library)
 * Import: org.apache.flume.source.avro.AvroFlumeEvent
 */
class AvroFlumeEvent {
  /** Get event headers as a Map */
  def getHeaders(): java.util.Map[CharSequence, CharSequence]
  
  /** Set event headers */
  def setHeaders(headers: java.util.Map[CharSequence, CharSequence]): Unit
  
  /** Get event body as ByteBuffer */
  def getBody(): java.nio.ByteBuffer
  
  /** Set event body */
  def setBody(body: java.nio.ByteBuffer): Unit
}

Common Usage Patterns

// Extract body as string
val bodyText = new String(sparkFlumeEvent.event.getBody.array())

// Extract specific header
val timestamp = sparkFlumeEvent.event.getHeaders.get("timestamp")

// Process headers and body together
val processedData = sparkFlumeEvent.event match {
  case event => 
    val headers = event.getHeaders.asScala.toMap
    val body = new String(event.getBody.array())
    (headers, body)
}

Error Handling

Both integration patterns provide different reliability guarantees:

Push-based streams: Use at-least-once delivery semantics. If the receiver fails, Flume will retry sending events, potentially causing duplicates.

Pull-based streams: Provide exactly-once semantics through transaction support with ACK/NACK responses. Failed batches are automatically retried by Flume.

Common error scenarios:

  • Network connectivity issues between Flume and Spark
  • Receiver memory pressure causing event loss
  • Flume agent failures during event transmission
  • Serialization errors for malformed events

Performance Tuning

Key configuration parameters for optimal performance:

Storage Levels:

  • MEMORY_ONLY: Fastest access, risk of data loss
  • MEMORY_AND_DISK_SER_2: Balanced performance and fault tolerance (default)
  • MEMORY_AND_DISK_SER: Alternative serialized storage with single replication
  • DISK_ONLY: Maximum fault tolerance, slower access

Polling Configuration:

  • maxBatchSize: Larger batches reduce RPC overhead but increase memory usage (default: 1000)
  • parallelism: Higher parallelism increases throughput but uses more resources (default: 5)
  • Multiple sink addresses: Distribute load across multiple Flume sinks for scalability

docs

index.md

tile.json