0
# Spark Streaming Flume Sink
1
2
Apache Spark Streaming custom Flume sink that enables reliable, pull-based data ingestion from Apache Flume into Spark Streaming applications. The sink implements a transactional protocol using Avro RPC to ensure fault-tolerance and data durability through proper transaction management and acknowledgment handling.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-flume-sink_2.10
7
- **Package Type**: Maven
8
- **Language**: Scala
9
- **Installation**: Add dependency to your Maven pom.xml:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.spark</groupId>
14
<artifactId>spark-streaming-flume-sink_2.10</artifactId>
15
<version>1.6.3</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
The sink is deployed as a Flume sink component - no direct imports in application code are typically needed. However, for advanced integration or testing:
22
23
```scala { .api }
24
import org.apache.spark.streaming.flume.sink.SparkSink
25
import org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler
26
import org.apache.spark.streaming.flume.sink.SparkSinkUtils
27
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol
28
// Note: SparkSinkEvent and EventBatch are generated from Avro IDL and imported from the protocol
29
```
30
31
## Basic Usage
32
33
### Flume Configuration
34
35
Configure the sink in your Flume agent configuration:
36
37
```properties
38
# Configure sink
39
agent.sinks = sparkSink
40
agent.sinks.sparkSink.type = org.apache.spark.streaming.flume.sink.SparkSink
41
agent.sinks.sparkSink.hostname = 0.0.0.0
42
agent.sinks.sparkSink.port = 9999
43
agent.sinks.sparkSink.channel = memoryChannel
44
agent.sinks.sparkSink.threads = 10
45
agent.sinks.sparkSink.timeout = 60
46
agent.sinks.sparkSink.backoffInterval = 200
47
```
48
49
### Spark Streaming Integration
50
51
Connect to the sink from Spark Streaming:
52
53
```scala
54
import org.apache.spark.streaming.flume._
55
56
val flumeStream = FlumeUtils.createPollingStream(
57
streamingContext,
58
"hostname",
59
9999
60
)
61
```
62
63
## Architecture
64
65
The Spark Streaming Flume Sink operates using a pull-based architecture:
66
67
- **SparkSink**: Main Flume sink that manages the Avro RPC server lifecycle
68
- **Avro RPC Protocol**: Defines communication between Spark and Flume using SparkFlumeProtocol
69
- **Transaction Management**: Ensures data durability through transactional event batching
70
- **Thread Pool**: Manages concurrent transaction processors for handling multiple Spark requests
71
- **Timeout Handling**: Automatic rollback for transactions that don't receive acknowledgments
72
73
## Capabilities
74
75
### Sink Configuration
76
77
Configure the SparkSink with Flume context parameters.
78
79
```scala { .api }
80
class SparkSink extends AbstractSink with Logging with Configurable {
81
/**
82
* Configure the sink with Flume context parameters
83
* @param ctx Flume context containing configuration properties
84
*/
85
def configure(ctx: Context): Unit
86
87
/**
88
* Start the sink and initialize Avro RPC server
89
*/
90
override def start(): Unit
91
92
/**
93
* Stop the sink and shutdown resources
94
*/
95
override def stop(): Unit
96
97
/**
98
* Main processing method (blocks until shutdown)
99
* @return Processing status
100
*/
101
override def process(): Status
102
103
/**
104
* Get the port the server is listening on
105
* @return Port number (package-private method for testing)
106
*/
107
private[flume] def getPort(): Int
108
109
/**
110
* Testing utility to count down when batches are received
111
* @param latch CountDownLatch to count down on batch receipt
112
*/
113
private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit
114
}
115
```
116
117
### Avro RPC Protocol
118
119
Protocol definition for communication between Spark Streaming and the Flume sink (defined in sparkflume.avdl).
120
121
```avro { .api }
122
@namespace("org.apache.spark.streaming.flume.sink")
123
protocol SparkFlumeProtocol {
124
/**
125
* Request a batch of events from the sink
126
* @param n Maximum number of events to return
127
* @return EventBatch containing events and sequence number
128
*/
129
EventBatch getEventBatch(int n);
130
131
/**
132
* Acknowledge successful processing of an event batch
133
* @param sequenceNumber Sequence number of the batch to acknowledge
134
*/
135
void ack(string sequenceNumber);
136
137
/**
138
* Signal failed processing of an event batch
139
* @param sequenceNumber Sequence number of the batch that failed
140
*/
141
void nack(string sequenceNumber);
142
}
143
```
144
145
### Protocol Implementation
146
147
The protocol is implemented by the SparkAvroCallbackHandler class:
148
149
```scala { .api }
150
private[flume] class SparkAvroCallbackHandler(
151
val threads: Int,
152
val channel: Channel,
153
val transactionTimeout: Int,
154
val backOffInterval: Int
155
) extends SparkFlumeProtocol with Logging {
156
157
/**
158
* Returns a batch of events to Spark over Avro RPC
159
* @param n Maximum number of events to return in a batch
160
* @return EventBatch instance with sequence number and events
161
*/
162
override def getEventBatch(n: Int): EventBatch
163
164
/**
165
* Called by Spark to indicate successful commit of a batch
166
* @param sequenceNumber The sequence number of the successful batch
167
*/
168
override def ack(sequenceNumber: CharSequence): Void
169
170
/**
171
* Called by Spark to indicate failed commit of a batch
172
* @param sequenceNumber The sequence number of the failed batch
173
*/
174
override def nack(sequenceNumber: CharSequence): Void
175
176
/**
177
* Shutdown the handler and executor threads
178
*/
179
def shutdown(): Unit
180
}
181
182
### Event Data Structures
183
184
Core data structures for event transmission (defined in Avro IDL and generated as Scala classes).
185
186
```avro { .api }
187
// Avro record definitions
188
record SparkSinkEvent {
189
map<string> headers;
190
bytes body;
191
}
192
193
record EventBatch {
194
string errorMsg = ""; // Empty indicates success, non-empty indicates error
195
string sequenceNumber; // Unique identifier for transaction tracking
196
array<SparkSinkEvent> events;
197
}
198
```
199
200
The Avro compiler generates these classes from the IDL definition. While the classes are used internally by the sink, they are not typically accessed directly by client code since the sink is configured through Flume agent properties rather than programmatic instantiation.
201
202
### Configuration Parameters
203
204
Available configuration parameters for the SparkSink.
205
206
```scala { .api }
207
private[flume] object SparkSinkConfig {
208
// Thread pool configuration
209
val THREADS = "threads"
210
val DEFAULT_THREADS = 10
211
212
// Transaction timeout configuration
213
val CONF_TRANSACTION_TIMEOUT = "timeout"
214
val DEFAULT_TRANSACTION_TIMEOUT = 60 // seconds
215
216
// Network binding configuration
217
val CONF_HOSTNAME = "hostname"
218
val DEFAULT_HOSTNAME = "0.0.0.0"
219
val CONF_PORT = "port" // No default - mandatory
220
221
// Backoff configuration
222
val CONF_BACKOFF_INTERVAL = "backoffInterval"
223
val DEFAULT_BACKOFF_INTERVAL = 200 // milliseconds
224
}
225
```
226
227
### Transaction Processing
228
229
Internal transaction management handled by the TransactionProcessor class:
230
231
```scala { .api }
232
private class TransactionProcessor(
233
val channel: Channel,
234
val seqNum: String,
235
var maxBatchSize: Int,
236
val transactionTimeout: Int,
237
val backOffInterval: Int,
238
val parent: SparkAvroCallbackHandler
239
) extends Callable[Void] with Logging {
240
241
/**
242
* Get an event batch from the channel (blocks until available)
243
* @return EventBatch instance with events or error message
244
*/
245
def getEventBatch: EventBatch
246
247
/**
248
* Called when ACK or NACK received from Spark
249
* @param success true for ACK, false for NACK
250
*/
251
def batchProcessed(success: Boolean): Unit
252
253
/**
254
* Shutdown the transaction processor
255
*/
256
def shutdown(): Unit
257
258
/**
259
* Main execution method (implements Callable)
260
*/
261
override def call(): Void
262
}
263
264
### Utility Functions
265
266
Helper functions for working with event batches.
267
268
```scala { .api }
269
object SparkSinkUtils {
270
/**
271
* Determine if an event batch represents an error condition
272
* Checks if errorMsg is non-empty using !batch.getErrorMsg.toString.equals("")
273
* @param batch The EventBatch to check
274
* @return true if the batch contains an error, false otherwise
275
*/
276
def isErrorBatch(batch: EventBatch): Boolean
277
}
278
```
279
280
### Supporting Classes
281
282
Additional utility classes used internally:
283
284
```scala { .api }
285
/**
286
* Thread factory for creating daemon threads with specific naming
287
*/
288
private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
289
/**
290
* Create a new daemon thread with formatted name
291
* @param r Runnable to execute in the thread
292
* @return New daemon Thread instance
293
*/
294
override def newThread(r: Runnable): Thread
295
}
296
297
/**
298
* Logging trait providing SLF4J-based logging functionality
299
*/
300
private[sink] trait Logging {
301
protected def logInfo(msg: => String): Unit
302
protected def logDebug(msg: => String): Unit
303
protected def logWarning(msg: => String): Unit
304
protected def logError(msg: => String): Unit
305
protected def logTrace(msg: => String): Unit
306
307
// Overloaded versions that accept Throwable
308
protected def logInfo(msg: => String, throwable: Throwable): Unit
309
protected def logDebug(msg: => String, throwable: Throwable): Unit
310
protected def logWarning(msg: => String, throwable: Throwable): Unit
311
protected def logError(msg: => String, throwable: Throwable): Unit
312
protected def logTrace(msg: => String, throwable: Throwable): Unit
313
314
protected def isTraceEnabled(): Boolean
315
}
316
```
317
318
## Configuration Options
319
320
### Required Parameters
321
322
- **port**: Port number for the Avro RPC server (no default)
323
324
### Optional Parameters
325
326
- **hostname**: Hostname to bind to (default: "0.0.0.0")
327
- **threads**: Number of processing threads (default: 10)
328
- **timeout**: Transaction timeout in seconds (default: 60)
329
- **backoffInterval**: Backoff interval in milliseconds when no events available (default: 200)
330
331
## Error Handling
332
333
The sink provides comprehensive error handling:
334
335
- **Transaction Timeouts**: Automatic rollback if Spark doesn't acknowledge within timeout period
336
- **NACK Handling**: Transaction rollback when Spark signals processing failure
337
- **Error Batches**: Invalid batches indicated by non-empty errorMsg field
338
- **Resource Cleanup**: Proper shutdown of threads and network resources
339
- **Channel Errors**: Graceful handling of Flume channel communication failures
340
341
Common error scenarios:
342
343
- **No Events Available**: Returns error batch when channel has no events after multiple attempts
344
- **Channel Transaction Failure**: Error batch when unable to create Flume transaction
345
- **Network Connectivity**: Server startup failures logged with appropriate error messages
346
- **Thread Pool Exhaustion**: Blocks until threads become available rather than failing
347
348
## Thread Safety
349
350
The sink ensures thread safety through:
351
352
- **Thread-Local Transactions**: Flume transactions handled in dedicated threads
353
- **Synchronized Access**: Protected access to shared transaction processor map
354
- **Atomic Counters**: Thread-safe sequence number generation
355
- **Proper Shutdown**: Coordinated shutdown of thread pools and network resources