CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-flume-sink-2-11

A Flume sink implementation that provides an Avro RPC server for Spark streaming applications to poll data from Flume agents

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Apache Spark Streaming Flume Sink

A specialized Flume sink implementation that provides an Avro RPC server for Apache Spark streaming applications to poll data from Flume agents using a reliable, transaction-based approach.

Package Information

  • Package Name: spark-streaming-flume-sink_2.11
  • Package Type: maven
  • Language: Scala
  • Version: 2.4.8

Installation

Maven:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume-sink_2.11</artifactId>
    <version>2.4.8</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.4.8"

Core Imports

// Main sink class
import org.apache.spark.streaming.flume.sink.SparkSink

// Flume configuration
import org.apache.flume.Context
import org.apache.flume.conf.Configurable
import org.apache.flume.sink.AbstractSink
import org.apache.flume.Sink.Status

// Avro protocol (generated from sparkflume.avdl)
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol
import org.apache.spark.streaming.flume.sink.SparkSinkEvent
import org.apache.spark.streaming.flume.sink.EventBatch

// For Avro RPC client usage
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor

// Java concurrency for testing/monitoring
import java.util.concurrent.CountDownLatch

Basic Usage

The SparkSink is deployed as a Flume sink component and configured through Flume's configuration system. It creates an Avro RPC server that Spark streaming applications connect to for polling events.

Flume Configuration Example

# Flume agent configuration
agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark-sink.hostname = 0.0.0.0
agent.sinks.spark-sink.port = 9999  
agent.sinks.spark-sink.threads = 5
agent.sinks.spark-sink.timeout = 30
agent.sinks.spark-sink.backoffInterval = 200
agent.sinks.spark-sink.channel = memory-channel

Spark Streaming Integration

import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.StreamingContext

val ssc = new StreamingContext(sparkConf, Seconds(1))
val flumeStream = FlumeUtils.createPollingStream(ssc, "hostname", 9999)

Architecture

The SparkSink implements a polling-based architecture instead of the traditional push-based Flume sink pattern:

  1. Avro RPC Server: SparkSink creates an Avro server that listens for connections from Spark
  2. Transaction Management: Each batch request creates a Flume channel transaction
  3. Thread Pool: Configurable thread pool processes concurrent batch requests
  4. Acknowledgment Protocol: Spark acknowledges successful/failed batch processing
  5. Timeout Handling: Automatic transaction rollback on timeout or failure

Configuration

SparkSink Configuration Parameters

object SparkSinkConfig {
  val THREADS: String = "threads"
  val DEFAULT_THREADS: Int = 10

  val CONF_TRANSACTION_TIMEOUT: String = "timeout"
  val DEFAULT_TRANSACTION_TIMEOUT: Int = 60

  val CONF_HOSTNAME: String = "hostname"
  val DEFAULT_HOSTNAME: String = "0.0.0.0"

  val CONF_PORT: String = "port"
  // No default - must be specified

  val CONF_BACKOFF_INTERVAL: String = "backoffInterval"
  val DEFAULT_BACKOFF_INTERVAL: Int = 200
}

Configuration Parameters

  • hostname (optional): IP address or hostname to bind the server to. Default: "0.0.0.0"
  • port (required): Port number for the Avro RPC server. No default value.
  • threads (optional): Number of threads for processing batch requests. Default: 10
  • timeout (optional): Transaction timeout in seconds. Default: 60 seconds
  • backoffInterval (optional): Sleep interval in milliseconds when no events are available. Default: 200ms

Core API Components

SparkSink Class

class SparkSink extends AbstractSink with Logging with Configurable {
  def start(): Unit
  def stop(): Unit
  def configure(ctx: Context): Unit
  def process(): Status
  
  // Package-private methods for testing
  private[flume] def getPort(): Int
  private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit
}

The main Flume sink implementation that:

  • Extends Flume's AbstractSink for integration with Flume framework
  • Implements Configurable for parameter configuration
  • Provides Logging capabilities for operational monitoring

Key Methods:

  • start(): Initializes and starts the Avro RPC server
  • stop(): Shuts down the server and releases all resources
  • configure(ctx: Context): Configures the sink from Flume context parameters
  • process(): Blocks the Flume framework thread (required by Flume sink interface)
  • getPort(): Returns the actual port the server is listening on

Avro Protocol Interface

// Generated from sparkflume.avdl - Avro protocol interface
public interface SparkFlumeProtocol {
  EventBatch getEventBatch(int n);
  Void ack(CharSequence sequenceNumber);
  Void nack(CharSequence sequenceNumber);
}

The Avro RPC protocol that Spark clients use to interact with the sink:

  • getEventBatch(n): Requests up to n events from the Flume channel
  • ack(sequenceNumber): Acknowledges successful processing of a batch
  • nack(sequenceNumber): Signals failed processing, triggering rollback

Data Types

SparkSinkEvent

// Generated from sparkflume.avdl - Avro record
public class SparkSinkEvent {
  // Constructors
  public SparkSinkEvent();
  public SparkSinkEvent(java.util.Map<CharSequence, CharSequence> headers, java.nio.ByteBuffer body);
  
  // Headers accessors
  public java.util.Map<CharSequence, CharSequence> getHeaders();
  public void setHeaders(java.util.Map<CharSequence, CharSequence> headers);
  
  // Body accessors
  public java.nio.ByteBuffer getBody();
  public void setBody(java.nio.ByteBuffer body);
}

Represents a single Flume event with:

  • Constructor takes headers (event metadata) and body (event payload)
  • getHeaders(): Returns event metadata as key-value pairs
  • getBody(): Returns event payload as binary data

EventBatch

// Generated from sparkflume.avdl - Avro record
public class EventBatch {
  // Constructors
  public EventBatch();
  public EventBatch(CharSequence errorMsg, CharSequence sequenceNumber, java.util.List<SparkSinkEvent> events);
  
  // Error message accessors
  public CharSequence getErrorMsg();  // Empty string indicates success
  public void setErrorMsg(CharSequence errorMsg);
  
  // Sequence number accessors  
  public CharSequence getSequenceNumber();  // Unique transaction identifier
  public void setSequenceNumber(CharSequence sequenceNumber);
  
  // Events list accessors
  public java.util.List<SparkSinkEvent> getEvents();
  public void setEvents(java.util.List<SparkSinkEvent> events);
}

Container for a batch of events returned to Spark:

  • Constructor takes errorMsg, sequenceNumber, and events list
  • getErrorMsg(): Returns error message if batch creation failed, empty string for success
  • getSequenceNumber(): Returns unique identifier for transaction tracking and acknowledgment
  • getEvents(): Returns list of SparkSinkEvent objects in the batch
  • setErrorMsg(errorMsg): Sets the error message for this batch

Error Handling

Transaction States

  • Success: Spark calls ack(sequenceNumber) → transaction commits
  • Failure: Spark calls nack(sequenceNumber) → transaction rolls back
  • Timeout: No response within timeout period → automatic rollback

Error Conditions

  • No Events Available: Returns EventBatch with empty events list
  • Channel Errors: Returns EventBatch with non-empty errorMsg
  • Connection Issues: Avro RPC exceptions propagated to client
  • Configuration Errors: ConfigurationException thrown during setup

Retry Behavior

  • Failed transactions (nack/timeout) leave events in Flume channel for retry
  • Configurable backoff interval prevents excessive polling when no events available
  • Thread pool sizing controls concurrent request handling capacity

Threading Model

The SparkSink uses a multi-threaded architecture:

  1. Main Thread: Handles Flume framework lifecycle (start/stop/configure)
  2. Avro Server Threads: Netty-based server handles RPC connections
  3. Transaction Processor Threads: Configurable pool processes batch requests
  4. Channel Transaction Threads: Each Flume transaction must execute in its originating thread

Thread Safety

  • Transaction processors are isolated per request
  • Sequence number generation uses atomic counters
  • Shutdown coordination prevents resource leaks
  • Flume channel transactions are thread-local by design

Integration Patterns

With Spark Streaming

// Spark side - polling from multiple sinks
val flumeStreams = (1 to numSinks).map { i =>
  FlumeUtils.createPollingStream(ssc, hostnames(i), ports(i))
}
val unionStream = ssc.union(flumeStreams)

High Availability Setup

# Multiple sink configuration for failover
agent.sinks.spark-sink1.hostname = host1
agent.sinks.spark-sink1.port = 9999

agent.sinks.spark-sink2.hostname = host2  
agent.sinks.spark-sink2.port = 9999

agent.sinkgroups.spark-group.sinks = spark-sink1 spark-sink2
agent.sinkgroups.spark-group.processor.type = failover

Performance Tuning

  • Thread Pool Size: Match to expected concurrent Spark receivers
  • Transaction Timeout: Balance between reliability and throughput
  • Batch Size: Configured on Spark side, affects memory usage
  • Backoff Interval: Reduce CPU usage when channels are empty

Utility Components

SparkSinkUtils

object SparkSinkUtils {
  def isErrorBatch(batch: EventBatch): Boolean
}

Utility methods for working with event batches:

  • isErrorBatch(batch): Returns true if the batch represents an error condition (non-empty error message)

Monitoring and Observability

The sink provides extensive logging through SLF4J:

  • Connection establishment and teardown
  • Transaction lifecycle events
  • Error conditions and timeouts
  • Performance metrics (batch sizes, processing times)
  • Configuration parameter validation

Log levels can be configured to control verbosity:

  • INFO: Operational events, configuration
  • DEBUG: Detailed transaction flow
  • WARN: Recoverable errors, timeouts
  • ERROR: Unrecoverable failures

docs

index.md

tile.json