or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-flume-sink_2-11

A Flume sink implementation that provides an Avro RPC server for Spark streaming applications to poll data from Flume agents

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-flume-sink_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume-sink_2-11@2.4.0

index.mddocs/

Apache Spark Streaming Flume Sink

A specialized Flume sink implementation that provides an Avro RPC server for Apache Spark streaming applications to poll data from Flume agents using a reliable, transaction-based approach.

Package Information

  • Package Name: spark-streaming-flume-sink_2.11
  • Package Type: maven
  • Language: Scala
  • Version: 2.4.8

Installation

Maven:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume-sink_2.11</artifactId>
    <version>2.4.8</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.4.8"

Core Imports

// Main sink class
import org.apache.spark.streaming.flume.sink.SparkSink

// Flume configuration
import org.apache.flume.Context
import org.apache.flume.conf.Configurable
import org.apache.flume.sink.AbstractSink
import org.apache.flume.Sink.Status

// Avro protocol (generated from sparkflume.avdl)
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol
import org.apache.spark.streaming.flume.sink.SparkSinkEvent
import org.apache.spark.streaming.flume.sink.EventBatch

// For Avro RPC client usage
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor

// Java concurrency for testing/monitoring
import java.util.concurrent.CountDownLatch

Basic Usage

The SparkSink is deployed as a Flume sink component and configured through Flume's configuration system. It creates an Avro RPC server that Spark streaming applications connect to for polling events.

Flume Configuration Example

# Flume agent configuration
agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark-sink.hostname = 0.0.0.0
agent.sinks.spark-sink.port = 9999  
agent.sinks.spark-sink.threads = 5
agent.sinks.spark-sink.timeout = 30
agent.sinks.spark-sink.backoffInterval = 200
agent.sinks.spark-sink.channel = memory-channel

Spark Streaming Integration

import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.StreamingContext

val ssc = new StreamingContext(sparkConf, Seconds(1))
val flumeStream = FlumeUtils.createPollingStream(ssc, "hostname", 9999)

Architecture

The SparkSink implements a polling-based architecture instead of the traditional push-based Flume sink pattern:

  1. Avro RPC Server: SparkSink creates an Avro server that listens for connections from Spark
  2. Transaction Management: Each batch request creates a Flume channel transaction
  3. Thread Pool: Configurable thread pool processes concurrent batch requests
  4. Acknowledgment Protocol: Spark acknowledges successful/failed batch processing
  5. Timeout Handling: Automatic transaction rollback on timeout or failure

Configuration

SparkSink Configuration Parameters

object SparkSinkConfig {
  val THREADS: String = "threads"
  val DEFAULT_THREADS: Int = 10

  val CONF_TRANSACTION_TIMEOUT: String = "timeout"
  val DEFAULT_TRANSACTION_TIMEOUT: Int = 60

  val CONF_HOSTNAME: String = "hostname"
  val DEFAULT_HOSTNAME: String = "0.0.0.0"

  val CONF_PORT: String = "port"
  // No default - must be specified

  val CONF_BACKOFF_INTERVAL: String = "backoffInterval"
  val DEFAULT_BACKOFF_INTERVAL: Int = 200
}

Configuration Parameters

  • hostname (optional): IP address or hostname to bind the server to. Default: "0.0.0.0"
  • port (required): Port number for the Avro RPC server. No default value.
  • threads (optional): Number of threads for processing batch requests. Default: 10
  • timeout (optional): Transaction timeout in seconds. Default: 60 seconds
  • backoffInterval (optional): Sleep interval in milliseconds when no events are available. Default: 200ms

Core API Components

SparkSink Class

class SparkSink extends AbstractSink with Logging with Configurable {
  def start(): Unit
  def stop(): Unit
  def configure(ctx: Context): Unit
  def process(): Status
  
  // Package-private methods for testing
  private[flume] def getPort(): Int
  private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit
}

The main Flume sink implementation that:

  • Extends Flume's AbstractSink for integration with Flume framework
  • Implements Configurable for parameter configuration
  • Provides Logging capabilities for operational monitoring

Key Methods:

  • start(): Initializes and starts the Avro RPC server
  • stop(): Shuts down the server and releases all resources
  • configure(ctx: Context): Configures the sink from Flume context parameters
  • process(): Blocks the Flume framework thread (required by Flume sink interface)
  • getPort(): Returns the actual port the server is listening on

Avro Protocol Interface

// Generated from sparkflume.avdl - Avro protocol interface
public interface SparkFlumeProtocol {
  EventBatch getEventBatch(int n);
  Void ack(CharSequence sequenceNumber);
  Void nack(CharSequence sequenceNumber);
}

The Avro RPC protocol that Spark clients use to interact with the sink:

  • getEventBatch(n): Requests up to n events from the Flume channel
  • ack(sequenceNumber): Acknowledges successful processing of a batch
  • nack(sequenceNumber): Signals failed processing, triggering rollback

Data Types

SparkSinkEvent

// Generated from sparkflume.avdl - Avro record
public class SparkSinkEvent {
  // Constructors
  public SparkSinkEvent();
  public SparkSinkEvent(java.util.Map<CharSequence, CharSequence> headers, java.nio.ByteBuffer body);
  
  // Headers accessors
  public java.util.Map<CharSequence, CharSequence> getHeaders();
  public void setHeaders(java.util.Map<CharSequence, CharSequence> headers);
  
  // Body accessors
  public java.nio.ByteBuffer getBody();
  public void setBody(java.nio.ByteBuffer body);
}

Represents a single Flume event with:

  • Constructor takes headers (event metadata) and body (event payload)
  • getHeaders(): Returns event metadata as key-value pairs
  • getBody(): Returns event payload as binary data

EventBatch

// Generated from sparkflume.avdl - Avro record
public class EventBatch {
  // Constructors
  public EventBatch();
  public EventBatch(CharSequence errorMsg, CharSequence sequenceNumber, java.util.List<SparkSinkEvent> events);
  
  // Error message accessors
  public CharSequence getErrorMsg();  // Empty string indicates success
  public void setErrorMsg(CharSequence errorMsg);
  
  // Sequence number accessors  
  public CharSequence getSequenceNumber();  // Unique transaction identifier
  public void setSequenceNumber(CharSequence sequenceNumber);
  
  // Events list accessors
  public java.util.List<SparkSinkEvent> getEvents();
  public void setEvents(java.util.List<SparkSinkEvent> events);
}

Container for a batch of events returned to Spark:

  • Constructor takes errorMsg, sequenceNumber, and events list
  • getErrorMsg(): Returns error message if batch creation failed, empty string for success
  • getSequenceNumber(): Returns unique identifier for transaction tracking and acknowledgment
  • getEvents(): Returns list of SparkSinkEvent objects in the batch
  • setErrorMsg(errorMsg): Sets the error message for this batch

Error Handling

Transaction States

  • Success: Spark calls ack(sequenceNumber) → transaction commits
  • Failure: Spark calls nack(sequenceNumber) → transaction rolls back
  • Timeout: No response within timeout period → automatic rollback

Error Conditions

  • No Events Available: Returns EventBatch with empty events list
  • Channel Errors: Returns EventBatch with non-empty errorMsg
  • Connection Issues: Avro RPC exceptions propagated to client
  • Configuration Errors: ConfigurationException thrown during setup

Retry Behavior

  • Failed transactions (nack/timeout) leave events in Flume channel for retry
  • Configurable backoff interval prevents excessive polling when no events available
  • Thread pool sizing controls concurrent request handling capacity

Threading Model

The SparkSink uses a multi-threaded architecture:

  1. Main Thread: Handles Flume framework lifecycle (start/stop/configure)
  2. Avro Server Threads: Netty-based server handles RPC connections
  3. Transaction Processor Threads: Configurable pool processes batch requests
  4. Channel Transaction Threads: Each Flume transaction must execute in its originating thread

Thread Safety

  • Transaction processors are isolated per request
  • Sequence number generation uses atomic counters
  • Shutdown coordination prevents resource leaks
  • Flume channel transactions are thread-local by design

Integration Patterns

With Spark Streaming

// Spark side - polling from multiple sinks
val flumeStreams = (1 to numSinks).map { i =>
  FlumeUtils.createPollingStream(ssc, hostnames(i), ports(i))
}
val unionStream = ssc.union(flumeStreams)

High Availability Setup

# Multiple sink configuration for failover
agent.sinks.spark-sink1.hostname = host1
agent.sinks.spark-sink1.port = 9999

agent.sinks.spark-sink2.hostname = host2  
agent.sinks.spark-sink2.port = 9999

agent.sinkgroups.spark-group.sinks = spark-sink1 spark-sink2
agent.sinkgroups.spark-group.processor.type = failover

Performance Tuning

  • Thread Pool Size: Match to expected concurrent Spark receivers
  • Transaction Timeout: Balance between reliability and throughput
  • Batch Size: Configured on Spark side, affects memory usage
  • Backoff Interval: Reduce CPU usage when channels are empty

Utility Components

SparkSinkUtils

object SparkSinkUtils {
  def isErrorBatch(batch: EventBatch): Boolean
}

Utility methods for working with event batches:

  • isErrorBatch(batch): Returns true if the batch represents an error condition (non-empty error message)

Monitoring and Observability

The sink provides extensive logging through SLF4J:

  • Connection establishment and teardown
  • Transaction lifecycle events
  • Error conditions and timeouts
  • Performance metrics (batch sizes, processing times)
  • Configuration parameter validation

Log levels can be configured to control verbosity:

  • INFO: Operational events, configuration
  • DEBUG: Detailed transaction flow
  • WARN: Recoverable errors, timeouts
  • ERROR: Unrecoverable failures