or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-flume_2-10

Spark Streaming integration with Apache Flume for real-time data ingestion from Flume agents

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-flume_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume_2-10@1.6.0

index.mddocs/

Spark Streaming Flume

Spark Streaming Flume provides seamless integration between Apache Spark Streaming and Apache Flume for real-time data ingestion. It offers two complementary patterns: push-based receivers where Flume agents push data directly to Spark, and pull-based polling where Spark actively pulls data from Flume sinks for enhanced reliability.

Package Information

  • Package Name: spark-streaming-flume_2.10
  • Package Type: Maven
  • Language: Scala with Java interop
  • Installation: org.apache.spark:spark-streaming-flume_2.10:1.6.3
  • Dependencies: Requires Apache Flume libraries for event processing

Core Imports

Scala:

import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.flume.SparkFlumeEvent
import org.apache.spark.storage.StorageLevel
import java.net.InetSocketAddress
import scala.collection.JavaConverters._

// For direct event manipulation (from Apache Flume dependency)
import org.apache.flume.source.avro.AvroFlumeEvent

Java:

import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;
import java.net.InetSocketAddress;

// For direct event manipulation (from Apache Flume dependency)
import org.apache.flume.source.avro.AvroFlumeEvent;

Basic Usage

Push-based Stream (Receiver Pattern)

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(sparkConf, Seconds(5))

// Create Flume stream - Flume pushes data to this receiver
val flumeStream = FlumeUtils.createStream(
  ssc,
  "localhost",  // hostname where receiver listens
  9999,         // port where receiver listens
  StorageLevel.MEMORY_AND_DISK_SER_2
)

// Process the stream
flumeStream.map(sparkFlumeEvent => {
  val event = sparkFlumeEvent.event
  new String(event.getBody.array())
}).print()

ssc.start()
ssc.awaitTermination()

Pull-based Stream (Polling Pattern)

import java.net.InetSocketAddress
import org.apache.spark.streaming.flume.FlumeUtils

val ssc = new StreamingContext(sparkConf, Seconds(5))

// Create polling stream - Spark pulls data from Flume sink
val pollingStream = FlumeUtils.createPollingStream(
  ssc,
  Seq(new InetSocketAddress("flume-host", 9090)), // Flume sink addresses
  StorageLevel.MEMORY_AND_DISK_SER_2,
  1000,  // maxBatchSize
  5      // parallelism
)

// Process the stream
pollingStream.map(sparkFlumeEvent => {
  val headers = sparkFlumeEvent.event.getHeaders
  val body = new String(sparkFlumeEvent.event.getBody.array())
  s"Headers: $headers, Body: $body"
}).print()

ssc.start()
ssc.awaitTermination()

Architecture

The Spark Streaming Flume integration is built around several key components:

  • FlumeUtils: Main factory class providing stream creation methods for both patterns
  • SparkFlumeEvent: Serializable wrapper for Flume events that can be processed in Spark
  • Push Pattern: Flume agents use Avro RPC to push events to Spark receivers
  • Pull Pattern: Spark receivers poll Flume sinks using custom Avro protocol with transaction support
  • Storage Integration: Configurable storage levels for fault tolerance and performance tuning

Capabilities

Push-based Stream Creation

Creates input streams where Flume agents push data directly to Spark Streaming receivers using Avro RPC protocol.

object FlumeUtils {
  /**
   * Create a push-based input stream from a Flume source with default storage level
   * @param ssc StreamingContext object
   * @param hostname Hostname where the receiver will listen
   * @param port Port where the receiver will listen
   * @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
   * @return ReceiverInputDStream of SparkFlumeEvent objects
   */
  def createStream(
    ssc: StreamingContext,
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[SparkFlumeEvent]

  /**
   * Create a push-based input stream with compression support
   * @param ssc StreamingContext object
   * @param hostname Hostname where the receiver will listen
   * @param port Port where the receiver will listen
   * @param storageLevel Storage level for received objects
   * @param enableDecompression Enable Netty decompression for incoming data
   * @return ReceiverInputDStream of SparkFlumeEvent objects
   */
  def createStream(
    ssc: StreamingContext,
    hostname: String,
    port: Int,
    storageLevel: StorageLevel,
    enableDecompression: Boolean
  ): ReceiverInputDStream[SparkFlumeEvent]
}

Java API:

/**
 * Create a push-based input stream from a Flume source (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Hostname where the receiver will listen
 * @param port Port where the receiver will listen
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port
);

/**
 * Create a push-based input stream with custom storage level (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Hostname where the receiver will listen
 * @param port Port where the receiver will listen
 * @param storageLevel Storage level for received objects
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel
);

/**
 * Create a push-based input stream with compression support (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Hostname where the receiver will listen
 * @param port Port where the receiver will listen
 * @param storageLevel Storage level for received objects
 * @param enableDecompression Enable Netty decompression for incoming data
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel,
  boolean enableDecompression
);

Pull-based Stream Creation

Creates input streams that actively poll Flume sinks for data, providing better reliability through transaction support and acknowledgments.

/**
 * Create a pull-based polling stream with default batch size and parallelism
 * @param ssc StreamingContext object
 * @param hostname Address of the host running the Spark Sink
 * @param port Port where the Spark Sink is listening
 * @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
 * @return ReceiverInputDStream of SparkFlumeEvent objects
 */
def createPollingStream(
  ssc: StreamingContext,
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]

/**
 * Create a pull-based polling stream with multiple sink addresses
 * @param ssc StreamingContext object
 * @param addresses List of InetSocketAddress representing Spark Sink hosts
 * @param storageLevel Storage level for received objects
 * @return ReceiverInputDStream of SparkFlumeEvent objects
 */
def createPollingStream(
  ssc: StreamingContext,
  addresses: Seq[InetSocketAddress],
  storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent]

/**
 * Create a pull-based polling stream with full configuration
 * @param ssc StreamingContext object
 * @param addresses List of InetSocketAddress representing Spark Sink hosts
 * @param storageLevel Storage level for received objects
 * @param maxBatchSize Maximum number of events per RPC call (default: 1000)
 * @param parallelism Number of concurrent requests to the sink (default: 5)
 * @return ReceiverInputDStream of SparkFlumeEvent objects
 */
def createPollingStream(
  ssc: StreamingContext,
  addresses: Seq[InetSocketAddress],
  storageLevel: StorageLevel,
  maxBatchSize: Int,
  parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent]

Java API:

/**
 * Create a pull-based polling stream (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Address of the host running the Spark Sink
 * @param port Port where the Spark Sink is listening
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  String hostname,
  int port
);

/**
 * Create a pull-based polling stream with custom storage level (Java API)
 * @param jssc JavaStreamingContext object
 * @param hostname Address of the host running the Spark Sink
 * @param port Port where the Spark Sink is listening
 * @param storageLevel Storage level for received objects
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel
);

/**
 * Create a pull-based polling stream with multiple sinks (Java API)
 * @param jssc JavaStreamingContext object
 * @param addresses Array of InetSocketAddress representing Spark Sink hosts
 * @param storageLevel Storage level for received objects
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  InetSocketAddress[] addresses,
  StorageLevel storageLevel
);

/**
 * Create a pull-based polling stream with full configuration (Java API)
 * @param jssc JavaStreamingContext object
 * @param addresses Array of InetSocketAddress representing Spark Sink hosts
 * @param storageLevel Storage level for received objects
 * @param maxBatchSize Maximum number of events per RPC call
 * @param parallelism Number of concurrent requests to the sink
 * @return JavaReceiverInputDStream of SparkFlumeEvent objects
 */
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  InetSocketAddress[] addresses,
  StorageLevel storageLevel,
  int maxBatchSize,
  int parallelism
);

Types

SparkFlumeEvent

Serializable wrapper for Flume events that can be processed in Spark transformations.

/**
 * Serializable wrapper for AvroFlumeEvent with custom serialization
 */
class SparkFlumeEvent extends Externalizable {
  /** The wrapped Flume event containing headers and body (mutable) */
  var event: AvroFlumeEvent = new AvroFlumeEvent()
  
  /** Deserialize from ObjectInput */
  def readExternal(in: ObjectInput): Unit
  
  /** Serialize to ObjectOutput */
  def writeExternal(out: ObjectOutput): Unit
}

object SparkFlumeEvent {
  /**
   * Create SparkFlumeEvent from AvroFlumeEvent
   * @param in AvroFlumeEvent to wrap
   * @return SparkFlumeEvent instance
   */
  def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
}

AvroFlumeEvent

Flume's standard event structure containing headers and body data. This class is provided by Apache Flume and imported from org.apache.flume.source.avro.AvroFlumeEvent.

/**
 * Flume event structure (from Apache Flume library)
 * Import: org.apache.flume.source.avro.AvroFlumeEvent
 */
class AvroFlumeEvent {
  /** Get event headers as a Map */
  def getHeaders(): java.util.Map[CharSequence, CharSequence]
  
  /** Set event headers */
  def setHeaders(headers: java.util.Map[CharSequence, CharSequence]): Unit
  
  /** Get event body as ByteBuffer */
  def getBody(): java.nio.ByteBuffer
  
  /** Set event body */
  def setBody(body: java.nio.ByteBuffer): Unit
}

Common Usage Patterns

// Extract body as string
val bodyText = new String(sparkFlumeEvent.event.getBody.array())

// Extract specific header
val timestamp = sparkFlumeEvent.event.getHeaders.get("timestamp")

// Process headers and body together
val processedData = sparkFlumeEvent.event match {
  case event => 
    val headers = event.getHeaders.asScala.toMap
    val body = new String(event.getBody.array())
    (headers, body)
}

Error Handling

Both integration patterns provide different reliability guarantees:

Push-based streams: Use at-least-once delivery semantics. If the receiver fails, Flume will retry sending events, potentially causing duplicates.

Pull-based streams: Provide exactly-once semantics through transaction support with ACK/NACK responses. Failed batches are automatically retried by Flume.

Common error scenarios:

  • Network connectivity issues between Flume and Spark
  • Receiver memory pressure causing event loss
  • Flume agent failures during event transmission
  • Serialization errors for malformed events

Performance Tuning

Key configuration parameters for optimal performance:

Storage Levels:

  • MEMORY_ONLY: Fastest access, risk of data loss
  • MEMORY_AND_DISK_SER_2: Balanced performance and fault tolerance (default)
  • MEMORY_AND_DISK_SER: Alternative serialized storage with single replication
  • DISK_ONLY: Maximum fault tolerance, slower access

Polling Configuration:

  • maxBatchSize: Larger batches reduce RPC overhead but increase memory usage (default: 1000)
  • parallelism: Higher parallelism increases throughput but uses more resources (default: 5)
  • Multiple sink addresses: Distribute load across multiple Flume sinks for scalability