0
# Apache Spark Streaming Flume Sink
1
2
A specialized Flume sink implementation that provides an Avro RPC server for Apache Spark streaming applications to poll data from Flume agents using a reliable, transaction-based approach.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-flume-sink_2.11
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Version**: 2.4.8
10
11
### Installation
12
13
**Maven:**
14
```xml
15
<dependency>
16
<groupId>org.apache.spark</groupId>
17
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
18
<version>2.4.8</version>
19
</dependency>
20
```
21
22
**SBT:**
23
```scala
24
libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.4.8"
25
```
26
27
## Core Imports
28
29
```scala
30
// Main sink class
31
import org.apache.spark.streaming.flume.sink.SparkSink
32
33
// Flume configuration
34
import org.apache.flume.Context
35
import org.apache.flume.conf.Configurable
36
import org.apache.flume.sink.AbstractSink
37
import org.apache.flume.Sink.Status
38
39
// Avro protocol (generated from sparkflume.avdl)
40
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol
41
import org.apache.spark.streaming.flume.sink.SparkSinkEvent
42
import org.apache.spark.streaming.flume.sink.EventBatch
43
44
// For Avro RPC client usage
45
import org.apache.avro.ipc.NettyTransceiver
46
import org.apache.avro.ipc.specific.SpecificRequestor
47
48
// Java concurrency for testing/monitoring
49
import java.util.concurrent.CountDownLatch
50
```
51
52
## Basic Usage
53
54
The SparkSink is deployed as a Flume sink component and configured through Flume's configuration system. It creates an Avro RPC server that Spark streaming applications connect to for polling events.
55
56
### Flume Configuration Example
57
58
```properties
59
# Flume agent configuration
60
agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
61
agent.sinks.spark-sink.hostname = 0.0.0.0
62
agent.sinks.spark-sink.port = 9999
63
agent.sinks.spark-sink.threads = 5
64
agent.sinks.spark-sink.timeout = 30
65
agent.sinks.spark-sink.backoffInterval = 200
66
agent.sinks.spark-sink.channel = memory-channel
67
```
68
69
### Spark Streaming Integration
70
71
```scala
72
import org.apache.spark.streaming.flume.FlumeUtils
73
import org.apache.spark.streaming.StreamingContext
74
75
val ssc = new StreamingContext(sparkConf, Seconds(1))
76
val flumeStream = FlumeUtils.createPollingStream(ssc, "hostname", 9999)
77
```
78
79
## Architecture
80
81
The SparkSink implements a polling-based architecture instead of the traditional push-based Flume sink pattern:
82
83
1. **Avro RPC Server**: SparkSink creates an Avro server that listens for connections from Spark
84
2. **Transaction Management**: Each batch request creates a Flume channel transaction
85
3. **Thread Pool**: Configurable thread pool processes concurrent batch requests
86
4. **Acknowledgment Protocol**: Spark acknowledges successful/failed batch processing
87
5. **Timeout Handling**: Automatic transaction rollback on timeout or failure
88
89
## Configuration
90
91
### SparkSink Configuration Parameters
92
93
```scala { .api }
94
object SparkSinkConfig {
95
val THREADS: String = "threads"
96
val DEFAULT_THREADS: Int = 10
97
98
val CONF_TRANSACTION_TIMEOUT: String = "timeout"
99
val DEFAULT_TRANSACTION_TIMEOUT: Int = 60
100
101
val CONF_HOSTNAME: String = "hostname"
102
val DEFAULT_HOSTNAME: String = "0.0.0.0"
103
104
val CONF_PORT: String = "port"
105
// No default - must be specified
106
107
val CONF_BACKOFF_INTERVAL: String = "backoffInterval"
108
val DEFAULT_BACKOFF_INTERVAL: Int = 200
109
}
110
```
111
112
### Configuration Parameters
113
114
- **hostname** (optional): IP address or hostname to bind the server to. Default: "0.0.0.0"
115
- **port** (required): Port number for the Avro RPC server. No default value.
116
- **threads** (optional): Number of threads for processing batch requests. Default: 10
117
- **timeout** (optional): Transaction timeout in seconds. Default: 60 seconds
118
- **backoffInterval** (optional): Sleep interval in milliseconds when no events are available. Default: 200ms
119
120
## Core API Components
121
122
### SparkSink Class
123
124
```scala { .api }
125
class SparkSink extends AbstractSink with Logging with Configurable {
126
def start(): Unit
127
def stop(): Unit
128
def configure(ctx: Context): Unit
129
def process(): Status
130
131
// Package-private methods for testing
132
private[flume] def getPort(): Int
133
private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit
134
}
135
```
136
137
The main Flume sink implementation that:
138
- Extends Flume's `AbstractSink` for integration with Flume framework
139
- Implements `Configurable` for parameter configuration
140
- Provides `Logging` capabilities for operational monitoring
141
142
**Key Methods:**
143
- `start()`: Initializes and starts the Avro RPC server
144
- `stop()`: Shuts down the server and releases all resources
145
- `configure(ctx: Context)`: Configures the sink from Flume context parameters
146
- `process()`: Blocks the Flume framework thread (required by Flume sink interface)
147
- `getPort()`: Returns the actual port the server is listening on
148
149
### Avro Protocol Interface
150
151
```java { .api }
152
// Generated from sparkflume.avdl - Avro protocol interface
153
public interface SparkFlumeProtocol {
154
EventBatch getEventBatch(int n);
155
Void ack(CharSequence sequenceNumber);
156
Void nack(CharSequence sequenceNumber);
157
}
158
```
159
160
The Avro RPC protocol that Spark clients use to interact with the sink:
161
- `getEventBatch(n)`: Requests up to n events from the Flume channel
162
- `ack(sequenceNumber)`: Acknowledges successful processing of a batch
163
- `nack(sequenceNumber)`: Signals failed processing, triggering rollback
164
165
### Data Types
166
167
#### SparkSinkEvent
168
```java { .api }
169
// Generated from sparkflume.avdl - Avro record
170
public class SparkSinkEvent {
171
// Constructors
172
public SparkSinkEvent();
173
public SparkSinkEvent(java.util.Map<CharSequence, CharSequence> headers, java.nio.ByteBuffer body);
174
175
// Headers accessors
176
public java.util.Map<CharSequence, CharSequence> getHeaders();
177
public void setHeaders(java.util.Map<CharSequence, CharSequence> headers);
178
179
// Body accessors
180
public java.nio.ByteBuffer getBody();
181
public void setBody(java.nio.ByteBuffer body);
182
}
183
```
184
185
Represents a single Flume event with:
186
- Constructor takes `headers` (event metadata) and `body` (event payload)
187
- `getHeaders()`: Returns event metadata as key-value pairs
188
- `getBody()`: Returns event payload as binary data
189
190
#### EventBatch
191
```java { .api }
192
// Generated from sparkflume.avdl - Avro record
193
public class EventBatch {
194
// Constructors
195
public EventBatch();
196
public EventBatch(CharSequence errorMsg, CharSequence sequenceNumber, java.util.List<SparkSinkEvent> events);
197
198
// Error message accessors
199
public CharSequence getErrorMsg(); // Empty string indicates success
200
public void setErrorMsg(CharSequence errorMsg);
201
202
// Sequence number accessors
203
public CharSequence getSequenceNumber(); // Unique transaction identifier
204
public void setSequenceNumber(CharSequence sequenceNumber);
205
206
// Events list accessors
207
public java.util.List<SparkSinkEvent> getEvents();
208
public void setEvents(java.util.List<SparkSinkEvent> events);
209
}
210
```
211
212
Container for a batch of events returned to Spark:
213
- Constructor takes `errorMsg`, `sequenceNumber`, and `events` list
214
- `getErrorMsg()`: Returns error message if batch creation failed, empty string for success
215
- `getSequenceNumber()`: Returns unique identifier for transaction tracking and acknowledgment
216
- `getEvents()`: Returns list of SparkSinkEvent objects in the batch
217
- `setErrorMsg(errorMsg)`: Sets the error message for this batch
218
219
## Error Handling
220
221
### Transaction States
222
- **Success**: Spark calls `ack(sequenceNumber)` → transaction commits
223
- **Failure**: Spark calls `nack(sequenceNumber)` → transaction rolls back
224
- **Timeout**: No response within timeout period → automatic rollback
225
226
### Error Conditions
227
- **No Events Available**: Returns EventBatch with empty events list
228
- **Channel Errors**: Returns EventBatch with non-empty errorMsg
229
- **Connection Issues**: Avro RPC exceptions propagated to client
230
- **Configuration Errors**: ConfigurationException thrown during setup
231
232
### Retry Behavior
233
- Failed transactions (nack/timeout) leave events in Flume channel for retry
234
- Configurable backoff interval prevents excessive polling when no events available
235
- Thread pool sizing controls concurrent request handling capacity
236
237
## Threading Model
238
239
The SparkSink uses a multi-threaded architecture:
240
241
1. **Main Thread**: Handles Flume framework lifecycle (start/stop/configure)
242
2. **Avro Server Threads**: Netty-based server handles RPC connections
243
3. **Transaction Processor Threads**: Configurable pool processes batch requests
244
4. **Channel Transaction Threads**: Each Flume transaction must execute in its originating thread
245
246
### Thread Safety
247
- Transaction processors are isolated per request
248
- Sequence number generation uses atomic counters
249
- Shutdown coordination prevents resource leaks
250
- Flume channel transactions are thread-local by design
251
252
## Integration Patterns
253
254
### With Spark Streaming
255
```scala
256
// Spark side - polling from multiple sinks
257
val flumeStreams = (1 to numSinks).map { i =>
258
FlumeUtils.createPollingStream(ssc, hostnames(i), ports(i))
259
}
260
val unionStream = ssc.union(flumeStreams)
261
```
262
263
### High Availability Setup
264
```properties
265
# Multiple sink configuration for failover
266
agent.sinks.spark-sink1.hostname = host1
267
agent.sinks.spark-sink1.port = 9999
268
269
agent.sinks.spark-sink2.hostname = host2
270
agent.sinks.spark-sink2.port = 9999
271
272
agent.sinkgroups.spark-group.sinks = spark-sink1 spark-sink2
273
agent.sinkgroups.spark-group.processor.type = failover
274
```
275
276
### Performance Tuning
277
- **Thread Pool Size**: Match to expected concurrent Spark receivers
278
- **Transaction Timeout**: Balance between reliability and throughput
279
- **Batch Size**: Configured on Spark side, affects memory usage
280
- **Backoff Interval**: Reduce CPU usage when channels are empty
281
282
## Utility Components
283
284
### SparkSinkUtils
285
286
```scala { .api }
287
object SparkSinkUtils {
288
def isErrorBatch(batch: EventBatch): Boolean
289
}
290
```
291
292
Utility methods for working with event batches:
293
- `isErrorBatch(batch)`: Returns true if the batch represents an error condition (non-empty error message)
294
295
## Monitoring and Observability
296
297
The sink provides extensive logging through SLF4J:
298
- Connection establishment and teardown
299
- Transaction lifecycle events
300
- Error conditions and timeouts
301
- Performance metrics (batch sizes, processing times)
302
- Configuration parameter validation
303
304
Log levels can be configured to control verbosity:
305
- INFO: Operational events, configuration
306
- DEBUG: Detailed transaction flow
307
- WARN: Recoverable errors, timeouts
308
- ERROR: Unrecoverable failures