CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-flume-2-11

Apache Spark Streaming integration with Apache Flume for collecting, aggregating, and moving large amounts of log data

Overview
Eval results
Files

Spark Streaming Flume Integration

Apache Spark Streaming integration with Apache Flume provides comprehensive real-time data ingestion capabilities. This library offers both push-based and pull-based approaches for integrating Flume data streams with Spark Streaming applications, supporting reliable, fault-tolerant data processing pipelines.

Package Information

  • Package Name: spark-streaming-flume_2.11
  • Package Type: maven
  • Language: Scala (with Java API support)
  • Installation: Add to Maven dependencies:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>2.2.3</version>
    </dependency>

Core Imports

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.storage.StorageLevel

For Java API:

import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;

Basic Usage

Push-based Approach (Flume as Client)

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(conf, Seconds(10))

// Create stream that receives data from Flume agent
val flumeStream = FlumeUtils.createStream(
  ssc,
  "localhost", // hostname where Spark receiver will listen
  9999,        // port where Spark receiver will listen
  StorageLevel.MEMORY_AND_DISK_SER_2
)

// Process the events
flumeStream.map(_.event.getBody.array()).print()

Pull-based Approach (Spark as Client)

import java.net.InetSocketAddress

// Using the same ssc from above example
// Create polling stream that pulls data from SparkSink
val pollingStream = FlumeUtils.createPollingStream(
  ssc,
  Seq(new InetSocketAddress("flume-host", 9988)), // SparkSink addresses
  StorageLevel.MEMORY_AND_DISK_SER_2
)

// Process the events
pollingStream.map(_.event.getBody.array()).print()

Architecture

The Spark Streaming Flume integration provides two distinct data ingestion patterns:

  1. Push-based (FlumeInputDStream): Flume agents push data to Spark Streaming receivers configured as Avro agents. Simple setup but less reliable.

  2. Pull-based (FlumePollingInputDStream): Spark Streaming polls custom SparkSink deployed on Flume agents. More reliable with transaction support and better fault tolerance.

Capabilities

Stream Creation - Push-based

Creates input streams where Flume acts as client pushing data to Spark receivers.

// Scala API with default storage level
def createStream(
  ssc: StreamingContext,
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]

// Scala API with compression support
def createStream(
  ssc: StreamingContext,
  hostname: String,
  port: Int,
  storageLevel: StorageLevel,
  enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent]
// Java API with default storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port
)

// Java API with custom storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel
)

// Java API with compression support
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel,
  boolean enableDecompression
)

Stream Creation - Pull-based

Creates input streams that poll SparkSink for data with better reliability guarantees.

// Scala API with single address
def createPollingStream(
  ssc: StreamingContext,
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent]

// Scala API with multiple addresses
def createPollingStream(
  ssc: StreamingContext,
  addresses: Seq[InetSocketAddress],
  storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent]

// Scala API with full configuration
def createPollingStream(
  ssc: StreamingContext,
  addresses: Seq[InetSocketAddress],
  storageLevel: StorageLevel,
  maxBatchSize: Int,
  parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent]
// Java API with single address
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  String hostname,
  int port
)

// Java API with custom storage level
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  String hostname,
  int port,
  StorageLevel storageLevel
)

// Java API with multiple addresses
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  Array[InetSocketAddress] addresses,
  StorageLevel storageLevel
)

// Java API with full configuration
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
  JavaStreamingContext jssc,
  Array[InetSocketAddress] addresses,
  StorageLevel storageLevel,
  int maxBatchSize,
  int parallelism
)

Event Processing

Process SparkFlumeEvent objects received from Flume agents.

class SparkFlumeEvent() extends Externalizable {
  var event: AvroFlumeEvent
  def readExternal(in: ObjectInput): Unit
  def writeExternal(out: ObjectOutput): Unit
}

object SparkFlumeEvent {
  def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
}

Usage Examples

// Extract event body as byte array
flumeStream.map(sparkEvent => {
  val body = sparkEvent.event.getBody.array()
  new String(body, "UTF-8")
})

// Extract event headers
flumeStream.map(sparkEvent => {
  val headers = sparkEvent.event.getHeaders
  headers.asScala.toMap
})

// Process both headers and body
flumeStream.map(sparkEvent => {
  val event = sparkEvent.event
  val bodyString = new String(event.getBody.array(), "UTF-8")
  val headerMap = event.getHeaders.asScala.toMap
  (bodyString, headerMap)
})

Types

Core Types

import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.JavaStreamingContext

// Flume-specific types
import org.apache.flume.source.avro.AvroFlumeEvent
import java.net.InetSocketAddress
import java.io.{Externalizable, ObjectInput, ObjectOutput}

Configuration Patterns

Storage Levels

Choose appropriate storage levels based on your reliability and performance requirements:

import org.apache.spark.storage.StorageLevel

// Default - serialized, replicated, disk + memory
StorageLevel.MEMORY_AND_DISK_SER_2

// Memory only, replicated
StorageLevel.MEMORY_ONLY_2

// Disk only, replicated  
StorageLevel.DISK_ONLY_2

// Memory and disk, not serialized, replicated
StorageLevel.MEMORY_AND_DISK_2

Default Configuration Constants

The pull-based streaming API uses the following default values:

// Default constants from FlumeUtils
DEFAULT_POLLING_BATCH_SIZE = 1000      // events per batch
DEFAULT_POLLING_PARALLELISM = 5        // concurrent connections

Pull-based Configuration

// High throughput configuration
FlumeUtils.createPollingStream(
  ssc,
  addresses,
  StorageLevel.MEMORY_AND_DISK_SER_2,
  maxBatchSize = 2000,    // Larger batches
  parallelism = 10        // More concurrent connections
)

// Conservative configuration
FlumeUtils.createPollingStream(
  ssc,
  addresses, 
  StorageLevel.MEMORY_AND_DISK_SER_2,
  maxBatchSize = 500,     // Smaller batches
  parallelism = 2         // Fewer connections
)

Error Handling

Both stream types handle failures at different levels:

  • Push-based: Network failures result in data loss unless Flume is configured with reliable channels
  • Pull-based: Provides transaction support; failed batches are rolled back and can be retried
// Add error handling for stream processing
flumeStream.foreachRDD { rdd =>
  try {
    rdd.collect().foreach { sparkEvent =>
      // Process event
      processEvent(sparkEvent)
    }
  } catch {
    case e: Exception =>
      logError("Failed to process Flume events", e)
      // Handle error appropriately
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-flume-2-11
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-flume_2.11@2.2.x