or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

index.mddocs/

Spark Streaming Flume Integration

Apache Spark Streaming integration with Apache Flume provides comprehensive real-time data ingestion capabilities. This library offers both push-based and pull-based approaches for integrating Flume data streams with Spark Streaming applications, supporting reliable, fault-tolerant data processing pipelines.

Package Information

  • Package Name: spark-streaming-flume_2.11
  • Package Type: maven
  • Language: Scala (with Java API support)
  • Installation: Add to Maven dependencies:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>2.2.3</version>
    </dependency>

Core Imports

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.storage.StorageLevel

For Java API:

import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;

Basic Usage

Push-based Approach (Flume as Client)

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(conf, Seconds(10))

// Create stream that receives data from Flume agent
val flumeStream = FlumeUtils.createStream(
  ssc,
  "localhost", // hostname where Spark receiver will listen
  9999,        // port where Spark receiver will listen
  StorageLevel.MEMORY_AND_DISK_SER_2
)

// Process the events
flumeStream.map(_.event.getBody.array()).print()

Pull-based Approach (Spark as Client)

import java.net.InetSocketAddress

// Using the same ssc from above example
// Create polling stream that pulls data from SparkSink
val pollingStream = FlumeUtils.createPollingStream(
  ssc,
  Seq(new InetSocketAddress("flume-host", 9988)), // SparkSink addresses
  StorageLevel.MEMORY_AND_DISK_SER_2
)

// Process the events
pollingStream.map(_.event.getBody.array()).print()

Architecture

The Spark Streaming Flume integration provides two distinct data ingestion patterns:

  1. Push-based (FlumeInputDStream): Flume agents push data to Spark Streaming receivers configured as Avro agents. Simple setup but less reliable.

  2. Pull-based (FlumePollingInputDStream): Spark Streaming polls custom SparkSink deployed on Flume agents. More reliable with transaction support and better fault tolerance.

Capabilities

Stream Creation - Push-based

Creates input streams where Flume acts as client pushing data to Spark receivers.

// Scala API with default storage level
def createStream(
  ssc: StreamingContext,
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]

// Scala API with compression support
def createStream(
  ssc: StreamingContext,
  hostname: String,
  port: Int,
  storageLevel: StorageLevel,
  enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent]
// Java API with default storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port
)

// Java API with custom storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel
)

// Java API with compression support
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel,
  boolean enableDecompression
)

Stream Creation - Pull-based

Creates input streams that poll SparkSink for data with better reliability guarantees.

// Scala API with single address
def createPollingStream(
  ssc: StreamingContext,
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]

// Scala API with multiple addresses
def createPollingStream(
  ssc: StreamingContext,
  addresses: Seq[InetSocketAddress],
  storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent]

// Scala API with full configuration
def createPollingStream(
  ssc: StreamingContext,
  addresses: Seq[InetSocketAddress],
  storageLevel: StorageLevel,
  maxBatchSize: Int,
  parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent]
// Java API with single address
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  String hostname,
  int port
)

// Java API with custom storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel
)

// Java API with multiple addresses
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  Array[InetSocketAddress] addresses,
  StorageLevel storageLevel
)

// Java API with full configuration
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  Array[InetSocketAddress] addresses,
  StorageLevel storageLevel,
  int maxBatchSize,
  int parallelism
)

Event Processing

Process SparkFlumeEvent objects received from Flume agents.

class SparkFlumeEvent() extends Externalizable {
  var event: AvroFlumeEvent
  def readExternal(in: ObjectInput): Unit
  def writeExternal(out: ObjectOutput): Unit
}

object SparkFlumeEvent {
  def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
}

Usage Examples

// Extract event body as byte array
flumeStream.map(sparkEvent => {
  val body = sparkEvent.event.getBody.array()
  new String(body, "UTF-8")
})

// Extract event headers
flumeStream.map(sparkEvent => {
  val headers = sparkEvent.event.getHeaders
  headers.asScala.toMap
})

// Process both headers and body
flumeStream.map(sparkEvent => {
  val event = sparkEvent.event
  val bodyString = new String(event.getBody.array(), "UTF-8")
  val headerMap = event.getHeaders.asScala.toMap
  (bodyString, headerMap)
})

Types

Core Types

import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.JavaStreamingContext

// Flume-specific types
import org.apache.flume.source.avro.AvroFlumeEvent
import java.net.InetSocketAddress
import java.io.{Externalizable, ObjectInput, ObjectOutput}

Configuration Patterns

Storage Levels

Choose appropriate storage levels based on your reliability and performance requirements:

import org.apache.spark.storage.StorageLevel

// Default - serialized, replicated, disk + memory
StorageLevel.MEMORY_AND_DISK_SER_2

// Memory only, replicated
StorageLevel.MEMORY_ONLY_2

// Disk only, replicated  
StorageLevel.DISK_ONLY_2

// Memory and disk, not serialized, replicated
StorageLevel.MEMORY_AND_DISK_2

Default Configuration Constants

The pull-based streaming API uses the following default values:

// Default constants from FlumeUtils
DEFAULT_POLLING_BATCH_SIZE = 1000      // events per batch
DEFAULT_POLLING_PARALLELISM = 5        // concurrent connections

Pull-based Configuration

// High throughput configuration
FlumeUtils.createPollingStream(
  ssc,
  addresses,
  StorageLevel.MEMORY_AND_DISK_SER_2,
  maxBatchSize = 2000,    // Larger batches
  parallelism = 10        // More concurrent connections
)

// Conservative configuration
FlumeUtils.createPollingStream(
  ssc,
  addresses, 
  StorageLevel.MEMORY_AND_DISK_SER_2,
  maxBatchSize = 500,     // Smaller batches
  parallelism = 2         // Fewer connections
)

Error Handling

Both stream types handle failures at different levels:

  • Push-based: Network failures result in data loss unless Flume is configured with reliable channels
  • Pull-based: Provides transaction support; failed batches are rolled back and can be retried
// Add error handling for stream processing
flumeStream.foreachRDD { rdd =>
  try {
    rdd.collect().foreach { sparkEvent =>
      // Process event
      processEvent(sparkEvent)
    }
  } catch {
    case e: Exception =>
      logError("Failed to process Flume events", e)
      // Handle error appropriately
  }
}