A Flume sink implementation that provides an Avro RPC server for Spark streaming applications to poll data from Flume agents
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume-sink_2-11@2.4.0A specialized Flume sink implementation that provides an Avro RPC server for Apache Spark streaming applications to poll data from Flume agents using a reliable, transaction-based approach.
Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>2.4.8</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.4.8"// Main sink class
import org.apache.spark.streaming.flume.sink.SparkSink
// Flume configuration
import org.apache.flume.Context
import org.apache.flume.conf.Configurable
import org.apache.flume.sink.AbstractSink
import org.apache.flume.Sink.Status
// Avro protocol (generated from sparkflume.avdl)
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol
import org.apache.spark.streaming.flume.sink.SparkSinkEvent
import org.apache.spark.streaming.flume.sink.EventBatch
// For Avro RPC client usage
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
// Java concurrency for testing/monitoring
import java.util.concurrent.CountDownLatchThe SparkSink is deployed as a Flume sink component and configured through Flume's configuration system. It creates an Avro RPC server that Spark streaming applications connect to for polling events.
# Flume agent configuration
agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark-sink.hostname = 0.0.0.0
agent.sinks.spark-sink.port = 9999
agent.sinks.spark-sink.threads = 5
agent.sinks.spark-sink.timeout = 30
agent.sinks.spark-sink.backoffInterval = 200
agent.sinks.spark-sink.channel = memory-channelimport org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(1))
val flumeStream = FlumeUtils.createPollingStream(ssc, "hostname", 9999)The SparkSink implements a polling-based architecture instead of the traditional push-based Flume sink pattern:
object SparkSinkConfig {
val THREADS: String = "threads"
val DEFAULT_THREADS: Int = 10
val CONF_TRANSACTION_TIMEOUT: String = "timeout"
val DEFAULT_TRANSACTION_TIMEOUT: Int = 60
val CONF_HOSTNAME: String = "hostname"
val DEFAULT_HOSTNAME: String = "0.0.0.0"
val CONF_PORT: String = "port"
// No default - must be specified
val CONF_BACKOFF_INTERVAL: String = "backoffInterval"
val DEFAULT_BACKOFF_INTERVAL: Int = 200
}class SparkSink extends AbstractSink with Logging with Configurable {
def start(): Unit
def stop(): Unit
def configure(ctx: Context): Unit
def process(): Status
// Package-private methods for testing
private[flume] def getPort(): Int
private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit
}The main Flume sink implementation that:
AbstractSink for integration with Flume frameworkConfigurable for parameter configurationLogging capabilities for operational monitoringKey Methods:
start(): Initializes and starts the Avro RPC serverstop(): Shuts down the server and releases all resourcesconfigure(ctx: Context): Configures the sink from Flume context parametersprocess(): Blocks the Flume framework thread (required by Flume sink interface)getPort(): Returns the actual port the server is listening on// Generated from sparkflume.avdl - Avro protocol interface
public interface SparkFlumeProtocol {
EventBatch getEventBatch(int n);
Void ack(CharSequence sequenceNumber);
Void nack(CharSequence sequenceNumber);
}The Avro RPC protocol that Spark clients use to interact with the sink:
getEventBatch(n): Requests up to n events from the Flume channelack(sequenceNumber): Acknowledges successful processing of a batchnack(sequenceNumber): Signals failed processing, triggering rollback// Generated from sparkflume.avdl - Avro record
public class SparkSinkEvent {
// Constructors
public SparkSinkEvent();
public SparkSinkEvent(java.util.Map<CharSequence, CharSequence> headers, java.nio.ByteBuffer body);
// Headers accessors
public java.util.Map<CharSequence, CharSequence> getHeaders();
public void setHeaders(java.util.Map<CharSequence, CharSequence> headers);
// Body accessors
public java.nio.ByteBuffer getBody();
public void setBody(java.nio.ByteBuffer body);
}Represents a single Flume event with:
headers (event metadata) and body (event payload)getHeaders(): Returns event metadata as key-value pairsgetBody(): Returns event payload as binary data// Generated from sparkflume.avdl - Avro record
public class EventBatch {
// Constructors
public EventBatch();
public EventBatch(CharSequence errorMsg, CharSequence sequenceNumber, java.util.List<SparkSinkEvent> events);
// Error message accessors
public CharSequence getErrorMsg(); // Empty string indicates success
public void setErrorMsg(CharSequence errorMsg);
// Sequence number accessors
public CharSequence getSequenceNumber(); // Unique transaction identifier
public void setSequenceNumber(CharSequence sequenceNumber);
// Events list accessors
public java.util.List<SparkSinkEvent> getEvents();
public void setEvents(java.util.List<SparkSinkEvent> events);
}Container for a batch of events returned to Spark:
errorMsg, sequenceNumber, and events listgetErrorMsg(): Returns error message if batch creation failed, empty string for successgetSequenceNumber(): Returns unique identifier for transaction tracking and acknowledgmentgetEvents(): Returns list of SparkSinkEvent objects in the batchsetErrorMsg(errorMsg): Sets the error message for this batchack(sequenceNumber) → transaction commitsnack(sequenceNumber) → transaction rolls backThe SparkSink uses a multi-threaded architecture:
// Spark side - polling from multiple sinks
val flumeStreams = (1 to numSinks).map { i =>
FlumeUtils.createPollingStream(ssc, hostnames(i), ports(i))
}
val unionStream = ssc.union(flumeStreams)# Multiple sink configuration for failover
agent.sinks.spark-sink1.hostname = host1
agent.sinks.spark-sink1.port = 9999
agent.sinks.spark-sink2.hostname = host2
agent.sinks.spark-sink2.port = 9999
agent.sinkgroups.spark-group.sinks = spark-sink1 spark-sink2
agent.sinkgroups.spark-group.processor.type = failoverobject SparkSinkUtils {
def isErrorBatch(batch: EventBatch): Boolean
}Utility methods for working with event batches:
isErrorBatch(batch): Returns true if the batch represents an error condition (non-empty error message)The sink provides extensive logging through SLF4J:
Log levels can be configured to control verbosity: