or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-flume-sink-2-10

Apache Spark Streaming custom Flume sink for reliable data ingestion

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-flume-sink_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume-sink-2-10@1.6.0

index.mddocs/

Spark Streaming Flume Sink

Apache Spark Streaming custom Flume sink that enables reliable, pull-based data ingestion from Apache Flume into Spark Streaming applications. The sink implements a transactional protocol using Avro RPC to ensure fault-tolerance and data durability through proper transaction management and acknowledgment handling.

Package Information

  • Package Name: spark-streaming-flume-sink_2.10
  • Package Type: Maven
  • Language: Scala
  • Installation: Add dependency to your Maven pom.xml:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume-sink_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

Core Imports

The sink is deployed as a Flume sink component - no direct imports in application code are typically needed. However, for advanced integration or testing:

import org.apache.spark.streaming.flume.sink.SparkSink
import org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler
import org.apache.spark.streaming.flume.sink.SparkSinkUtils
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol
// Note: SparkSinkEvent and EventBatch are generated from Avro IDL and imported from the protocol

Basic Usage

Flume Configuration

Configure the sink in your Flume agent configuration:

# Configure sink
agent.sinks = sparkSink
agent.sinks.sparkSink.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.sparkSink.hostname = 0.0.0.0
agent.sinks.sparkSink.port = 9999
agent.sinks.sparkSink.channel = memoryChannel
agent.sinks.sparkSink.threads = 10
agent.sinks.sparkSink.timeout = 60
agent.sinks.sparkSink.backoffInterval = 200

Spark Streaming Integration

Connect to the sink from Spark Streaming:

import org.apache.spark.streaming.flume._

val flumeStream = FlumeUtils.createPollingStream(
  streamingContext,
  "hostname",
  9999
)

Architecture

The Spark Streaming Flume Sink operates using a pull-based architecture:

  • SparkSink: Main Flume sink that manages the Avro RPC server lifecycle
  • Avro RPC Protocol: Defines communication between Spark and Flume using SparkFlumeProtocol
  • Transaction Management: Ensures data durability through transactional event batching
  • Thread Pool: Manages concurrent transaction processors for handling multiple Spark requests
  • Timeout Handling: Automatic rollback for transactions that don't receive acknowledgments

Capabilities

Sink Configuration

Configure the SparkSink with Flume context parameters.

class SparkSink extends AbstractSink with Logging with Configurable {
  /**
   * Configure the sink with Flume context parameters
   * @param ctx Flume context containing configuration properties
   */
  def configure(ctx: Context): Unit
  
  /**
   * Start the sink and initialize Avro RPC server
   */
  override def start(): Unit
  
  /**
   * Stop the sink and shutdown resources
   */
  override def stop(): Unit
  
  /**
   * Main processing method (blocks until shutdown)
   * @return Processing status
   */
  override def process(): Status
  
  /**
   * Get the port the server is listening on
   * @return Port number (package-private method for testing)
   */
  private[flume] def getPort(): Int
  
  /**
   * Testing utility to count down when batches are received
   * @param latch CountDownLatch to count down on batch receipt
   */
  private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit
}

Avro RPC Protocol

Protocol definition for communication between Spark Streaming and the Flume sink (defined in sparkflume.avdl).

@namespace("org.apache.spark.streaming.flume.sink")
protocol SparkFlumeProtocol {
  /**
   * Request a batch of events from the sink
   * @param n Maximum number of events to return
   * @return EventBatch containing events and sequence number
   */
  EventBatch getEventBatch(int n);
  
  /**
   * Acknowledge successful processing of an event batch
   * @param sequenceNumber Sequence number of the batch to acknowledge
   */
  void ack(string sequenceNumber);
  
  /**
   * Signal failed processing of an event batch  
   * @param sequenceNumber Sequence number of the batch that failed
   */
  void nack(string sequenceNumber);
}

Protocol Implementation

The protocol is implemented by the SparkAvroCallbackHandler class:

private[flume] class SparkAvroCallbackHandler(
  val threads: Int,
  val channel: Channel,
  val transactionTimeout: Int,
  val backOffInterval: Int
) extends SparkFlumeProtocol with Logging {
  
  /**
   * Returns a batch of events to Spark over Avro RPC
   * @param n Maximum number of events to return in a batch
   * @return EventBatch instance with sequence number and events
   */
  override def getEventBatch(n: Int): EventBatch
  
  /**
   * Called by Spark to indicate successful commit of a batch
   * @param sequenceNumber The sequence number of the successful batch
   */
  override def ack(sequenceNumber: CharSequence): Void
  
  /**
   * Called by Spark to indicate failed commit of a batch
   * @param sequenceNumber The sequence number of the failed batch
   */
  override def nack(sequenceNumber: CharSequence): Void
  
  /**
   * Shutdown the handler and executor threads
   */
  def shutdown(): Unit
}

### Event Data Structures

Core data structures for event transmission (defined in Avro IDL and generated as Scala classes).

```avro { .api }
// Avro record definitions
record SparkSinkEvent {
  map<string> headers;
  bytes body;
}

record EventBatch {
  string errorMsg = "";  // Empty indicates success, non-empty indicates error
  string sequenceNumber; // Unique identifier for transaction tracking
  array<SparkSinkEvent> events;
}

The Avro compiler generates these classes from the IDL definition. While the classes are used internally by the sink, they are not typically accessed directly by client code since the sink is configured through Flume agent properties rather than programmatic instantiation.

Configuration Parameters

Available configuration parameters for the SparkSink.

private[flume] object SparkSinkConfig {
  // Thread pool configuration
  val THREADS = "threads"
  val DEFAULT_THREADS = 10
  
  // Transaction timeout configuration  
  val CONF_TRANSACTION_TIMEOUT = "timeout"
  val DEFAULT_TRANSACTION_TIMEOUT = 60  // seconds
  
  // Network binding configuration
  val CONF_HOSTNAME = "hostname"
  val DEFAULT_HOSTNAME = "0.0.0.0"
  val CONF_PORT = "port"  // No default - mandatory
  
  // Backoff configuration
  val CONF_BACKOFF_INTERVAL = "backoffInterval"
  val DEFAULT_BACKOFF_INTERVAL = 200  // milliseconds
}

Transaction Processing

Internal transaction management handled by the TransactionProcessor class:

private class TransactionProcessor(
  val channel: Channel,
  val seqNum: String,
  var maxBatchSize: Int,
  val transactionTimeout: Int,
  val backOffInterval: Int,
  val parent: SparkAvroCallbackHandler
) extends Callable[Void] with Logging {
  
  /**
   * Get an event batch from the channel (blocks until available)
   * @return EventBatch instance with events or error message
   */
  def getEventBatch: EventBatch
  
  /**
   * Called when ACK or NACK received from Spark
   * @param success true for ACK, false for NACK
   */
  def batchProcessed(success: Boolean): Unit
  
  /**
   * Shutdown the transaction processor
   */
  def shutdown(): Unit
  
  /**
   * Main execution method (implements Callable)
   */
  override def call(): Void
}

### Utility Functions

Helper functions for working with event batches.

```scala { .api }
object SparkSinkUtils {
  /**
   * Determine if an event batch represents an error condition
   * Checks if errorMsg is non-empty using !batch.getErrorMsg.toString.equals("")
   * @param batch The EventBatch to check
   * @return true if the batch contains an error, false otherwise
   */
  def isErrorBatch(batch: EventBatch): Boolean
}

Supporting Classes

Additional utility classes used internally:

/**
 * Thread factory for creating daemon threads with specific naming
 */
private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
  /**
   * Create a new daemon thread with formatted name
   * @param r Runnable to execute in the thread
   * @return New daemon Thread instance
   */
  override def newThread(r: Runnable): Thread
}

/**
 * Logging trait providing SLF4J-based logging functionality
 */
private[sink] trait Logging {
  protected def logInfo(msg: => String): Unit
  protected def logDebug(msg: => String): Unit
  protected def logWarning(msg: => String): Unit
  protected def logError(msg: => String): Unit
  protected def logTrace(msg: => String): Unit
  
  // Overloaded versions that accept Throwable
  protected def logInfo(msg: => String, throwable: Throwable): Unit
  protected def logDebug(msg: => String, throwable: Throwable): Unit
  protected def logWarning(msg: => String, throwable: Throwable): Unit
  protected def logError(msg: => String, throwable: Throwable): Unit
  protected def logTrace(msg: => String, throwable: Throwable): Unit
  
  protected def isTraceEnabled(): Boolean
}

Configuration Options

Required Parameters

  • port: Port number for the Avro RPC server (no default)

Optional Parameters

  • hostname: Hostname to bind to (default: "0.0.0.0")
  • threads: Number of processing threads (default: 10)
  • timeout: Transaction timeout in seconds (default: 60)
  • backoffInterval: Backoff interval in milliseconds when no events available (default: 200)

Error Handling

The sink provides comprehensive error handling:

  • Transaction Timeouts: Automatic rollback if Spark doesn't acknowledge within timeout period
  • NACK Handling: Transaction rollback when Spark signals processing failure
  • Error Batches: Invalid batches indicated by non-empty errorMsg field
  • Resource Cleanup: Proper shutdown of threads and network resources
  • Channel Errors: Graceful handling of Flume channel communication failures

Common error scenarios:

  • No Events Available: Returns error batch when channel has no events after multiple attempts
  • Channel Transaction Failure: Error batch when unable to create Flume transaction
  • Network Connectivity: Server startup failures logged with appropriate error messages
  • Thread Pool Exhaustion: Blocks until threads become available rather than failing

Thread Safety

The sink ensures thread safety through:

  • Thread-Local Transactions: Flume transactions handled in dedicated threads
  • Synchronized Access: Protected access to shared transaction processor map
  • Atomic Counters: Thread-safe sequence number generation
  • Proper Shutdown: Coordinated shutdown of thread pools and network resources