Apache Spark Streaming integration with Apache Flume for collecting, aggregating, and moving large amounts of log data
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume_2.11@2.2.00
# Spark Streaming Flume Integration
1
2
Apache Spark Streaming integration with Apache Flume provides comprehensive real-time data ingestion capabilities. This library offers both push-based and pull-based approaches for integrating Flume data streams with Spark Streaming applications, supporting reliable, fault-tolerant data processing pipelines.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-flume_2.11
7
- **Package Type**: maven
8
- **Language**: Scala (with Java API support)
9
- **Installation**: Add to Maven dependencies:
10
```xml
11
<dependency>
12
<groupId>org.apache.spark</groupId>
13
<artifactId>spark-streaming-flume_2.11</artifactId>
14
<version>2.2.3</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```scala
21
import org.apache.spark.streaming.{StreamingContext, Seconds}
22
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
23
import org.apache.spark.storage.StorageLevel
24
```
25
26
For Java API:
27
28
```java
29
import org.apache.spark.streaming.StreamingContext;
30
import org.apache.spark.streaming.Seconds;
31
import org.apache.spark.streaming.flume.FlumeUtils;
32
import org.apache.spark.streaming.flume.SparkFlumeEvent;
33
import org.apache.spark.storage.StorageLevel;
34
```
35
36
## Basic Usage
37
38
### Push-based Approach (Flume as Client)
39
40
```scala
41
import org.apache.spark.streaming.{StreamingContext, Seconds}
42
import org.apache.spark.streaming.flume.FlumeUtils
43
import org.apache.spark.storage.StorageLevel
44
45
val ssc = new StreamingContext(conf, Seconds(10))
46
47
// Create stream that receives data from Flume agent
48
val flumeStream = FlumeUtils.createStream(
49
ssc,
50
"localhost", // hostname where Spark receiver will listen
51
9999, // port where Spark receiver will listen
52
StorageLevel.MEMORY_AND_DISK_SER_2
53
)
54
55
// Process the events
56
flumeStream.map(_.event.getBody.array()).print()
57
```
58
59
### Pull-based Approach (Spark as Client)
60
61
```scala
62
import java.net.InetSocketAddress
63
64
// Using the same ssc from above example
65
// Create polling stream that pulls data from SparkSink
66
val pollingStream = FlumeUtils.createPollingStream(
67
ssc,
68
Seq(new InetSocketAddress("flume-host", 9988)), // SparkSink addresses
69
StorageLevel.MEMORY_AND_DISK_SER_2
70
)
71
72
// Process the events
73
pollingStream.map(_.event.getBody.array()).print()
74
```
75
76
## Architecture
77
78
The Spark Streaming Flume integration provides two distinct data ingestion patterns:
79
80
1. **Push-based (FlumeInputDStream)**: Flume agents push data to Spark Streaming receivers configured as Avro agents. Simple setup but less reliable.
81
82
2. **Pull-based (FlumePollingInputDStream)**: Spark Streaming polls custom SparkSink deployed on Flume agents. More reliable with transaction support and better fault tolerance.
83
84
## Capabilities
85
86
### Stream Creation - Push-based
87
88
Creates input streams where Flume acts as client pushing data to Spark receivers.
89
90
```scala { .api }
91
// Scala API with default storage level
92
def createStream(
93
ssc: StreamingContext,
94
hostname: String,
95
port: Int,
96
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
97
): ReceiverInputDStream[SparkFlumeEvent]
98
99
// Scala API with compression support
100
def createStream(
101
ssc: StreamingContext,
102
hostname: String,
103
port: Int,
104
storageLevel: StorageLevel,
105
enableDecompression: Boolean
106
): ReceiverInputDStream[SparkFlumeEvent]
107
```
108
109
```java { .api }
110
// Java API with default storage level
111
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
112
JavaStreamingContext jssc,
113
String hostname,
114
int port
115
)
116
117
// Java API with custom storage level
118
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
119
JavaStreamingContext jssc,
120
String hostname,
121
int port,
122
StorageLevel storageLevel
123
)
124
125
// Java API with compression support
126
public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(
127
JavaStreamingContext jssc,
128
String hostname,
129
int port,
130
StorageLevel storageLevel,
131
boolean enableDecompression
132
)
133
```
134
135
### Stream Creation - Pull-based
136
137
Creates input streams that poll SparkSink for data with better reliability guarantees.
138
139
```scala { .api }
140
// Scala API with single address
141
def createPollingStream(
142
ssc: StreamingContext,
143
hostname: String,
144
port: Int,
145
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
146
): ReceiverInputDStream[SparkFlumeEvent]
147
148
// Scala API with multiple addresses
149
def createPollingStream(
150
ssc: StreamingContext,
151
addresses: Seq[InetSocketAddress],
152
storageLevel: StorageLevel
153
): ReceiverInputDStream[SparkFlumeEvent]
154
155
// Scala API with full configuration
156
def createPollingStream(
157
ssc: StreamingContext,
158
addresses: Seq[InetSocketAddress],
159
storageLevel: StorageLevel,
160
maxBatchSize: Int,
161
parallelism: Int
162
): ReceiverInputDStream[SparkFlumeEvent]
163
```
164
165
```java { .api }
166
// Java API with single address
167
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
168
JavaStreamingContext jssc,
169
String hostname,
170
int port
171
)
172
173
// Java API with custom storage level
174
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
175
JavaStreamingContext jssc,
176
String hostname,
177
int port,
178
StorageLevel storageLevel
179
)
180
181
// Java API with multiple addresses
182
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
183
JavaStreamingContext jssc,
184
Array[InetSocketAddress] addresses,
185
StorageLevel storageLevel
186
)
187
188
// Java API with full configuration
189
public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(
190
JavaStreamingContext jssc,
191
Array[InetSocketAddress] addresses,
192
StorageLevel storageLevel,
193
int maxBatchSize,
194
int parallelism
195
)
196
```
197
198
### Event Processing
199
200
Process SparkFlumeEvent objects received from Flume agents.
201
202
```scala { .api }
203
class SparkFlumeEvent() extends Externalizable {
204
var event: AvroFlumeEvent
205
def readExternal(in: ObjectInput): Unit
206
def writeExternal(out: ObjectOutput): Unit
207
}
208
209
object SparkFlumeEvent {
210
def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent
211
}
212
```
213
214
#### Usage Examples
215
216
```scala
217
// Extract event body as byte array
218
flumeStream.map(sparkEvent => {
219
val body = sparkEvent.event.getBody.array()
220
new String(body, "UTF-8")
221
})
222
223
// Extract event headers
224
flumeStream.map(sparkEvent => {
225
val headers = sparkEvent.event.getHeaders
226
headers.asScala.toMap
227
})
228
229
// Process both headers and body
230
flumeStream.map(sparkEvent => {
231
val event = sparkEvent.event
232
val bodyString = new String(event.getBody.array(), "UTF-8")
233
val headerMap = event.getHeaders.asScala.toMap
234
(bodyString, headerMap)
235
})
236
```
237
238
239
## Types
240
241
### Core Types
242
243
```scala { .api }
244
import org.apache.spark.streaming.dstream.ReceiverInputDStream
245
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
246
import org.apache.spark.storage.StorageLevel
247
import org.apache.spark.streaming.StreamingContext
248
import org.apache.spark.streaming.api.java.JavaStreamingContext
249
250
// Flume-specific types
251
import org.apache.flume.source.avro.AvroFlumeEvent
252
import java.net.InetSocketAddress
253
import java.io.{Externalizable, ObjectInput, ObjectOutput}
254
```
255
256
257
## Configuration Patterns
258
259
### Storage Levels
260
261
Choose appropriate storage levels based on your reliability and performance requirements:
262
263
```scala
264
import org.apache.spark.storage.StorageLevel
265
266
// Default - serialized, replicated, disk + memory
267
StorageLevel.MEMORY_AND_DISK_SER_2
268
269
// Memory only, replicated
270
StorageLevel.MEMORY_ONLY_2
271
272
// Disk only, replicated
273
StorageLevel.DISK_ONLY_2
274
275
// Memory and disk, not serialized, replicated
276
StorageLevel.MEMORY_AND_DISK_2
277
```
278
279
### Default Configuration Constants
280
281
The pull-based streaming API uses the following default values:
282
283
```scala
284
// Default constants from FlumeUtils
285
DEFAULT_POLLING_BATCH_SIZE = 1000 // events per batch
286
DEFAULT_POLLING_PARALLELISM = 5 // concurrent connections
287
```
288
289
### Pull-based Configuration
290
291
```scala
292
// High throughput configuration
293
FlumeUtils.createPollingStream(
294
ssc,
295
addresses,
296
StorageLevel.MEMORY_AND_DISK_SER_2,
297
maxBatchSize = 2000, // Larger batches
298
parallelism = 10 // More concurrent connections
299
)
300
301
// Conservative configuration
302
FlumeUtils.createPollingStream(
303
ssc,
304
addresses,
305
StorageLevel.MEMORY_AND_DISK_SER_2,
306
maxBatchSize = 500, // Smaller batches
307
parallelism = 2 // Fewer connections
308
)
309
```
310
311
## Error Handling
312
313
Both stream types handle failures at different levels:
314
315
- **Push-based**: Network failures result in data loss unless Flume is configured with reliable channels
316
- **Pull-based**: Provides transaction support; failed batches are rolled back and can be retried
317
318
```scala
319
// Add error handling for stream processing
320
flumeStream.foreachRDD { rdd =>
321
try {
322
rdd.collect().foreach { sparkEvent =>
323
// Process event
324
processEvent(sparkEvent)
325
}
326
} catch {
327
case e: Exception =>
328
logError("Failed to process Flume events", e)
329
// Handle error appropriately
330
}
331
}
332
```