Spark Streaming integration with Apache Flume for real-time data ingestion from Flume agents
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume_2-10@1.6.00
# Spark Streaming Flume
1
2
Spark Streaming Flume provides seamless integration between Apache Spark Streaming and Apache Flume for real-time data ingestion. It offers two complementary patterns: push-based receivers where Flume agents push data directly to Spark, and pull-based polling where Spark actively pulls data from Flume sinks for enhanced reliability.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-flume_2.10
7
- **Package Type**: Maven
8
- **Language**: Scala with Java interop
9
- **Installation**: `org.apache.spark:spark-streaming-flume_2.10:1.6.3`
10
- **Dependencies**: Requires Apache Flume libraries for event processing
11
12
## Core Imports
13
14
Scala:
15
```scala
16
import org.apache.spark.streaming.flume.FlumeUtils
17
import org.apache.spark.streaming.flume.SparkFlumeEvent
18
import org.apache.spark.storage.StorageLevel
19
import java.net.InetSocketAddress
20
import scala.collection.JavaConverters._
21
22
// For direct event manipulation (from Apache Flume dependency)
23
import org.apache.flume.source.avro.AvroFlumeEvent
24
```
25
26
Java:
27
```java
28
import org.apache.spark.streaming.flume.FlumeUtils;
29
import org.apache.spark.streaming.flume.SparkFlumeEvent;
30
import org.apache.spark.storage.StorageLevel;
31
import java.net.InetSocketAddress;
32
33
// For direct event manipulation (from Apache Flume dependency)
34
import org.apache.flume.source.avro.AvroFlumeEvent;
35
```
36
37
## Basic Usage
38
39
### Push-based Stream (Receiver Pattern)
40
41
```scala
42
import org.apache.spark.streaming.{StreamingContext, Seconds}
43
import org.apache.spark.streaming.flume.FlumeUtils
44
import org.apache.spark.storage.StorageLevel
45
46
val ssc = new StreamingContext(sparkConf, Seconds(5))
47
48
// Create Flume stream - Flume pushes data to this receiver
49
val flumeStream = FlumeUtils.createStream(
50
ssc,
51
"localhost", // hostname where receiver listens
52
9999, // port where receiver listens
53
StorageLevel.MEMORY_AND_DISK_SER_2
54
)
55
56
// Process the stream
57
flumeStream.map(sparkFlumeEvent => {
58
val event = sparkFlumeEvent.event
59
new String(event.getBody.array())
60
}).print()
61
62
ssc.start()
63
ssc.awaitTermination()
64
```
65
66
### Pull-based Stream (Polling Pattern)
67
68
```scala
69
import java.net.InetSocketAddress
70
import org.apache.spark.streaming.flume.FlumeUtils
71
72
val ssc = new StreamingContext(sparkConf, Seconds(5))
73
74
// Create polling stream - Spark pulls data from Flume sink
75
val pollingStream = FlumeUtils.createPollingStream(
76
ssc,
77
Seq(new InetSocketAddress("flume-host", 9090)), // Flume sink addresses
78
StorageLevel.MEMORY_AND_DISK_SER_2,
79
1000, // maxBatchSize
80
5 // parallelism
81
)
82
83
// Process the stream
84
pollingStream.map(sparkFlumeEvent => {
85
val headers = sparkFlumeEvent.event.getHeaders
86
val body = new String(sparkFlumeEvent.event.getBody.array())
87
s"Headers: $headers, Body: $body"
88
}).print()
89
90
ssc.start()
91
ssc.awaitTermination()
92
```
93
94
## Architecture
95
96
The Spark Streaming Flume integration is built around several key components:
97
98
- **FlumeUtils**: Main factory class providing stream creation methods for both patterns
99
- **SparkFlumeEvent**: Serializable wrapper for Flume events that can be processed in Spark
100
- **Push Pattern**: Flume agents use Avro RPC to push events to Spark receivers
101
- **Pull Pattern**: Spark receivers poll Flume sinks using custom Avro protocol with transaction support
102
- **Storage Integration**: Configurable storage levels for fault tolerance and performance tuning
103
104
## Capabilities
105
106
### Push-based Stream Creation
107
108
Creates input streams where Flume agents push data directly to Spark Streaming receivers using Avro RPC protocol.
109
110
```scala { .api }
111
object FlumeUtils {
112
/**
113
* Create a push-based input stream from a Flume source with default storage level
114
* @param ssc StreamingContext object
115
* @param hostname Hostname where the receiver will listen
116
* @param port Port where the receiver will listen
117
* @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
118
* @return ReceiverInputDStream of SparkFlumeEvent objects
119
*/
120
def createStream(
121
ssc: StreamingContext,
122
hostname: String,
123
port: Int,
124
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
125
): ReceiverInputDStream[SparkFlumeEvent]
126
127
/**
128
* Create a push-based input stream with compression support
129
* @param ssc StreamingContext object
130
* @param hostname Hostname where the receiver will listen
131
* @param port Port where the receiver will listen
132
* @param storageLevel Storage level for received objects
133
* @param enableDecompression Enable Netty decompression for incoming data
134
* @return ReceiverInputDStream of SparkFlumeEvent objects
135
*/
136
def createStream(
137
ssc: StreamingContext,
138
hostname: String,
139
port: Int,
140
storageLevel: StorageLevel,
141
enableDecompression: Boolean
142
): ReceiverInputDStream[SparkFlumeEvent]
143
}
144
```
145
146
**Java API:**
147
148
```java { .api }
149
/**
150
* Create a push-based input stream from a Flume source (Java API)
151
* @param jssc JavaStreamingContext object
152
* @param hostname Hostname where the receiver will listen
153
* @param port Port where the receiver will listen
154
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
155
*/
156
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
157
JavaStreamingContext jssc,
158
String hostname,
159
int port
160
);
161
162
/**
163
* Create a push-based input stream with custom storage level (Java API)
164
* @param jssc JavaStreamingContext object
165
* @param hostname Hostname where the receiver will listen
166
* @param port Port where the receiver will listen
167
* @param storageLevel Storage level for received objects
168
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
169
*/
170
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
171
JavaStreamingContext jssc,
172
String hostname,
173
int port,
174
StorageLevel storageLevel
175
);
176
177
/**
178
* Create a push-based input stream with compression support (Java API)
179
* @param jssc JavaStreamingContext object
180
* @param hostname Hostname where the receiver will listen
181
* @param port Port where the receiver will listen
182
* @param storageLevel Storage level for received objects
183
* @param enableDecompression Enable Netty decompression for incoming data
184
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
185
*/
186
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
187
JavaStreamingContext jssc,
188
String hostname,
189
int port,
190
StorageLevel storageLevel,
191
boolean enableDecompression
192
);
193
```
194
195
### Pull-based Stream Creation
196
197
Creates input streams that actively poll Flume sinks for data, providing better reliability through transaction support and acknowledgments.
198
199
```scala { .api }
200
/**
201
* Create a pull-based polling stream with default batch size and parallelism
202
* @param ssc StreamingContext object
203
* @param hostname Address of the host running the Spark Sink
204
* @param port Port where the Spark Sink is listening
205
* @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)
206
* @return ReceiverInputDStream of SparkFlumeEvent objects
207
*/
208
def createPollingStream(
209
ssc: StreamingContext,
210
hostname: String,
211
port: Int,
212
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
213
): ReceiverInputDStream[SparkFlumeEvent]
214
215
/**
216
* Create a pull-based polling stream with multiple sink addresses
217
* @param ssc StreamingContext object
218
* @param addresses List of InetSocketAddress representing Spark Sink hosts
219
* @param storageLevel Storage level for received objects
220
* @return ReceiverInputDStream of SparkFlumeEvent objects
221
*/
222
def createPollingStream(
223
ssc: StreamingContext,
224
addresses: Seq[InetSocketAddress],
225
storageLevel: StorageLevel
226
): ReceiverInputDStream[SparkFlumeEvent]
227
228
/**
229
* Create a pull-based polling stream with full configuration
230
* @param ssc StreamingContext object
231
* @param addresses List of InetSocketAddress representing Spark Sink hosts
232
* @param storageLevel Storage level for received objects
233
* @param maxBatchSize Maximum number of events per RPC call (default: 1000)
234
* @param parallelism Number of concurrent requests to the sink (default: 5)
235
* @return ReceiverInputDStream of SparkFlumeEvent objects
236
*/
237
def createPollingStream(
238
ssc: StreamingContext,
239
addresses: Seq[InetSocketAddress],
240
storageLevel: StorageLevel,
241
maxBatchSize: Int,
242
parallelism: Int
243
): ReceiverInputDStream[SparkFlumeEvent]
244
```
245
246
**Java API:**
247
248
```java { .api }
249
/**
250
* Create a pull-based polling stream (Java API)
251
* @param jssc JavaStreamingContext object
252
* @param hostname Address of the host running the Spark Sink
253
* @param port Port where the Spark Sink is listening
254
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
255
*/
256
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
257
JavaStreamingContext jssc,
258
String hostname,
259
int port
260
);
261
262
/**
263
* Create a pull-based polling stream with custom storage level (Java API)
264
* @param jssc JavaStreamingContext object
265
* @param hostname Address of the host running the Spark Sink
266
* @param port Port where the Spark Sink is listening
267
* @param storageLevel Storage level for received objects
268
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
269
*/
270
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
271
JavaStreamingContext jssc,
272
String hostname,
273
int port,
274
StorageLevel storageLevel
275
);
276
277
/**
278
* Create a pull-based polling stream with multiple sinks (Java API)
279
* @param jssc JavaStreamingContext object
280
* @param addresses Array of InetSocketAddress representing Spark Sink hosts
281
* @param storageLevel Storage level for received objects
282
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
283
*/
284
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
285
JavaStreamingContext jssc,
286
InetSocketAddress[] addresses,
287
StorageLevel storageLevel
288
);
289
290
/**
291
* Create a pull-based polling stream with full configuration (Java API)
292
* @param jssc JavaStreamingContext object
293
* @param addresses Array of InetSocketAddress representing Spark Sink hosts
294
* @param storageLevel Storage level for received objects
295
* @param maxBatchSize Maximum number of events per RPC call
296
* @param parallelism Number of concurrent requests to the sink
297
* @return JavaReceiverInputDStream of SparkFlumeEvent objects
298
*/
299
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
300
JavaStreamingContext jssc,
301
InetSocketAddress[] addresses,
302
StorageLevel storageLevel,
303
int maxBatchSize,
304
int parallelism
305
);
306
```
307
308
## Types
309
310
### SparkFlumeEvent
311
312
Serializable wrapper for Flume events that can be processed in Spark transformations.
313
314
```scala { .api }
315
/**
316
* Serializable wrapper for AvroFlumeEvent with custom serialization
317
*/
318
class SparkFlumeEvent extends Externalizable {
319
/** The wrapped Flume event containing headers and body (mutable) */
320
var event: AvroFlumeEvent = new AvroFlumeEvent()
321
322
/** Deserialize from ObjectInput */
323
def readExternal(in: ObjectInput): Unit
324
325
/** Serialize to ObjectOutput */
326
def writeExternal(out: ObjectOutput): Unit
327
}
328
329
object SparkFlumeEvent {
330
/**
331
* Create SparkFlumeEvent from AvroFlumeEvent
332
* @param in AvroFlumeEvent to wrap
333
* @return SparkFlumeEvent instance
334
*/
335
def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
336
}
337
```
338
339
### AvroFlumeEvent
340
341
Flume's standard event structure containing headers and body data. This class is provided by Apache Flume and imported from `org.apache.flume.source.avro.AvroFlumeEvent`.
342
343
```scala { .api }
344
/**
345
* Flume event structure (from Apache Flume library)
346
* Import: org.apache.flume.source.avro.AvroFlumeEvent
347
*/
348
class AvroFlumeEvent {
349
/** Get event headers as a Map */
350
def getHeaders(): java.util.Map[CharSequence, CharSequence]
351
352
/** Set event headers */
353
def setHeaders(headers: java.util.Map[CharSequence, CharSequence]): Unit
354
355
/** Get event body as ByteBuffer */
356
def getBody(): java.nio.ByteBuffer
357
358
/** Set event body */
359
def setBody(body: java.nio.ByteBuffer): Unit
360
}
361
```
362
363
### Common Usage Patterns
364
365
```scala
366
// Extract body as string
367
val bodyText = new String(sparkFlumeEvent.event.getBody.array())
368
369
// Extract specific header
370
val timestamp = sparkFlumeEvent.event.getHeaders.get("timestamp")
371
372
// Process headers and body together
373
val processedData = sparkFlumeEvent.event match {
374
case event =>
375
val headers = event.getHeaders.asScala.toMap
376
val body = new String(event.getBody.array())
377
(headers, body)
378
}
379
```
380
381
## Error Handling
382
383
Both integration patterns provide different reliability guarantees:
384
385
**Push-based streams**: Use at-least-once delivery semantics. If the receiver fails, Flume will retry sending events, potentially causing duplicates.
386
387
**Pull-based streams**: Provide exactly-once semantics through transaction support with ACK/NACK responses. Failed batches are automatically retried by Flume.
388
389
Common error scenarios:
390
- Network connectivity issues between Flume and Spark
391
- Receiver memory pressure causing event loss
392
- Flume agent failures during event transmission
393
- Serialization errors for malformed events
394
395
## Performance Tuning
396
397
Key configuration parameters for optimal performance:
398
399
**Storage Levels**:
400
- `MEMORY_ONLY`: Fastest access, risk of data loss
401
- `MEMORY_AND_DISK_SER_2`: Balanced performance and fault tolerance (default)
402
- `MEMORY_AND_DISK_SER`: Alternative serialized storage with single replication
403
- `DISK_ONLY`: Maximum fault tolerance, slower access
404
405
**Polling Configuration**:
406
- `maxBatchSize`: Larger batches reduce RPC overhead but increase memory usage (default: 1000)
407
- `parallelism`: Higher parallelism increases throughput but uses more resources (default: 5)
408
- Multiple sink addresses: Distribute load across multiple Flume sinks for scalability