Apache Spark Streaming custom Flume sink that enables reliable, pull-based data ingestion from Apache Flume into Spark Streaming applications. The sink implements a transactional protocol using Avro RPC to ensure fault-tolerance and data durability through proper transaction management and acknowledgment handling.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.10</artifactId>
<version>1.6.3</version>
</dependency>The sink is deployed as a Flume sink component - no direct imports in application code are typically needed. However, for advanced integration or testing:
import org.apache.spark.streaming.flume.sink.SparkSink
import org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler
import org.apache.spark.streaming.flume.sink.SparkSinkUtils
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol
// Note: SparkSinkEvent and EventBatch are generated from Avro IDL and imported from the protocolConfigure the sink in your Flume agent configuration:
# Configure sink
agent.sinks = sparkSink
agent.sinks.sparkSink.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.sparkSink.hostname = 0.0.0.0
agent.sinks.sparkSink.port = 9999
agent.sinks.sparkSink.channel = memoryChannel
agent.sinks.sparkSink.threads = 10
agent.sinks.sparkSink.timeout = 60
agent.sinks.sparkSink.backoffInterval = 200Connect to the sink from Spark Streaming:
import org.apache.spark.streaming.flume._
val flumeStream = FlumeUtils.createPollingStream(
streamingContext,
"hostname",
9999
)The Spark Streaming Flume Sink operates using a pull-based architecture:
Configure the SparkSink with Flume context parameters.
class SparkSink extends AbstractSink with Logging with Configurable {
/**
* Configure the sink with Flume context parameters
* @param ctx Flume context containing configuration properties
*/
def configure(ctx: Context): Unit
/**
* Start the sink and initialize Avro RPC server
*/
override def start(): Unit
/**
* Stop the sink and shutdown resources
*/
override def stop(): Unit
/**
* Main processing method (blocks until shutdown)
* @return Processing status
*/
override def process(): Status
/**
* Get the port the server is listening on
* @return Port number (package-private method for testing)
*/
private[flume] def getPort(): Int
/**
* Testing utility to count down when batches are received
* @param latch CountDownLatch to count down on batch receipt
*/
private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit
}Protocol definition for communication between Spark Streaming and the Flume sink (defined in sparkflume.avdl).
@namespace("org.apache.spark.streaming.flume.sink")
protocol SparkFlumeProtocol {
/**
* Request a batch of events from the sink
* @param n Maximum number of events to return
* @return EventBatch containing events and sequence number
*/
EventBatch getEventBatch(int n);
/**
* Acknowledge successful processing of an event batch
* @param sequenceNumber Sequence number of the batch to acknowledge
*/
void ack(string sequenceNumber);
/**
* Signal failed processing of an event batch
* @param sequenceNumber Sequence number of the batch that failed
*/
void nack(string sequenceNumber);
}The protocol is implemented by the SparkAvroCallbackHandler class:
private[flume] class SparkAvroCallbackHandler(
val threads: Int,
val channel: Channel,
val transactionTimeout: Int,
val backOffInterval: Int
) extends SparkFlumeProtocol with Logging {
/**
* Returns a batch of events to Spark over Avro RPC
* @param n Maximum number of events to return in a batch
* @return EventBatch instance with sequence number and events
*/
override def getEventBatch(n: Int): EventBatch
/**
* Called by Spark to indicate successful commit of a batch
* @param sequenceNumber The sequence number of the successful batch
*/
override def ack(sequenceNumber: CharSequence): Void
/**
* Called by Spark to indicate failed commit of a batch
* @param sequenceNumber The sequence number of the failed batch
*/
override def nack(sequenceNumber: CharSequence): Void
/**
* Shutdown the handler and executor threads
*/
def shutdown(): Unit
}
### Event Data Structures
Core data structures for event transmission (defined in Avro IDL and generated as Scala classes).
```avro { .api }
// Avro record definitions
record SparkSinkEvent {
map<string> headers;
bytes body;
}
record EventBatch {
string errorMsg = ""; // Empty indicates success, non-empty indicates error
string sequenceNumber; // Unique identifier for transaction tracking
array<SparkSinkEvent> events;
}The Avro compiler generates these classes from the IDL definition. While the classes are used internally by the sink, they are not typically accessed directly by client code since the sink is configured through Flume agent properties rather than programmatic instantiation.
Available configuration parameters for the SparkSink.
private[flume] object SparkSinkConfig {
// Thread pool configuration
val THREADS = "threads"
val DEFAULT_THREADS = 10
// Transaction timeout configuration
val CONF_TRANSACTION_TIMEOUT = "timeout"
val DEFAULT_TRANSACTION_TIMEOUT = 60 // seconds
// Network binding configuration
val CONF_HOSTNAME = "hostname"
val DEFAULT_HOSTNAME = "0.0.0.0"
val CONF_PORT = "port" // No default - mandatory
// Backoff configuration
val CONF_BACKOFF_INTERVAL = "backoffInterval"
val DEFAULT_BACKOFF_INTERVAL = 200 // milliseconds
}Internal transaction management handled by the TransactionProcessor class:
private class TransactionProcessor(
val channel: Channel,
val seqNum: String,
var maxBatchSize: Int,
val transactionTimeout: Int,
val backOffInterval: Int,
val parent: SparkAvroCallbackHandler
) extends Callable[Void] with Logging {
/**
* Get an event batch from the channel (blocks until available)
* @return EventBatch instance with events or error message
*/
def getEventBatch: EventBatch
/**
* Called when ACK or NACK received from Spark
* @param success true for ACK, false for NACK
*/
def batchProcessed(success: Boolean): Unit
/**
* Shutdown the transaction processor
*/
def shutdown(): Unit
/**
* Main execution method (implements Callable)
*/
override def call(): Void
}
### Utility Functions
Helper functions for working with event batches.
```scala { .api }
object SparkSinkUtils {
/**
* Determine if an event batch represents an error condition
* Checks if errorMsg is non-empty using !batch.getErrorMsg.toString.equals("")
* @param batch The EventBatch to check
* @return true if the batch contains an error, false otherwise
*/
def isErrorBatch(batch: EventBatch): Boolean
}Additional utility classes used internally:
/**
* Thread factory for creating daemon threads with specific naming
*/
private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
/**
* Create a new daemon thread with formatted name
* @param r Runnable to execute in the thread
* @return New daemon Thread instance
*/
override def newThread(r: Runnable): Thread
}
/**
* Logging trait providing SLF4J-based logging functionality
*/
private[sink] trait Logging {
protected def logInfo(msg: => String): Unit
protected def logDebug(msg: => String): Unit
protected def logWarning(msg: => String): Unit
protected def logError(msg: => String): Unit
protected def logTrace(msg: => String): Unit
// Overloaded versions that accept Throwable
protected def logInfo(msg: => String, throwable: Throwable): Unit
protected def logDebug(msg: => String, throwable: Throwable): Unit
protected def logWarning(msg: => String, throwable: Throwable): Unit
protected def logError(msg: => String, throwable: Throwable): Unit
protected def logTrace(msg: => String, throwable: Throwable): Unit
protected def isTraceEnabled(): Boolean
}The sink provides comprehensive error handling:
Common error scenarios:
The sink ensures thread safety through: