or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Spark Streaming Flume Integration

1

2

Apache Spark Streaming integration with Apache Flume provides comprehensive real-time data ingestion capabilities. This library offers both push-based and pull-based approaches for integrating Flume data streams with Spark Streaming applications, supporting reliable, fault-tolerant data processing pipelines.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-flume_2.11

7

- **Package Type**: maven

8

- **Language**: Scala (with Java API support)

9

- **Installation**: Add to Maven dependencies:

10

```xml

11

<dependency>

12

<groupId>org.apache.spark</groupId>

13

<artifactId>spark-streaming-flume_2.11</artifactId>

14

<version>2.2.3</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```scala

21

import org.apache.spark.streaming.{StreamingContext, Seconds}

22

import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}

23

import org.apache.spark.storage.StorageLevel

24

```

25

26

For Java API:

27

28

```java

29

import org.apache.spark.streaming.StreamingContext;

30

import org.apache.spark.streaming.Seconds;

31

import org.apache.spark.streaming.flume.FlumeUtils;

32

import org.apache.spark.streaming.flume.SparkFlumeEvent;

33

import org.apache.spark.storage.StorageLevel;

34

```

35

36

## Basic Usage

37

38

### Push-based Approach (Flume as Client)

39

40

```scala

41

import org.apache.spark.streaming.{StreamingContext, Seconds}

42

import org.apache.spark.streaming.flume.FlumeUtils

43

import org.apache.spark.storage.StorageLevel

44

45

val ssc = new StreamingContext(conf, Seconds(10))

46

47

// Create stream that receives data from Flume agent

48

val flumeStream = FlumeUtils.createStream(

49

ssc,

50

"localhost", // hostname where Spark receiver will listen

51

9999, // port where Spark receiver will listen

52

StorageLevel.MEMORY_AND_DISK_SER_2

53

)

54

55

// Process the events

56

flumeStream.map(_.event.getBody.array()).print()

57

```

58

59

### Pull-based Approach (Spark as Client)

60

61

```scala

62

import java.net.InetSocketAddress

63

64

// Using the same ssc from above example

65

// Create polling stream that pulls data from SparkSink

66

val pollingStream = FlumeUtils.createPollingStream(

67

ssc,

68

Seq(new InetSocketAddress("flume-host", 9988)), // SparkSink addresses

69

StorageLevel.MEMORY_AND_DISK_SER_2

70

)

71

72

// Process the events

73

pollingStream.map(_.event.getBody.array()).print()

74

```

75

76

## Architecture

77

78

The Spark Streaming Flume integration provides two distinct data ingestion patterns:

79

80

1. **Push-based (FlumeInputDStream)**: Flume agents push data to Spark Streaming receivers configured as Avro agents. Simple setup but less reliable.

81

82

2. **Pull-based (FlumePollingInputDStream)**: Spark Streaming polls custom SparkSink deployed on Flume agents. More reliable with transaction support and better fault tolerance.

83

84

## Capabilities

85

86

### Stream Creation - Push-based

87

88

Creates input streams where Flume acts as client pushing data to Spark receivers.

89

90

```scala { .api }

91

// Scala API with default storage level

92

def createStream(

93

ssc: StreamingContext,

94

hostname: String,

95

port: Int,

96

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

97

): ReceiverInputDStream[SparkFlumeEvent]

98

99

// Scala API with compression support

100

def createStream(

101

ssc: StreamingContext,

102

hostname: String,

103

port: Int,

104

storageLevel: StorageLevel,

105

enableDecompression: Boolean

106

): ReceiverInputDStream[SparkFlumeEvent]

107

```

108

109

```java { .api }

110

// Java API with default storage level

111

public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(

112

JavaStreamingContext jssc,

113

String hostname,

114

int port

115

)

116

117

// Java API with custom storage level

118

public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(

119

JavaStreamingContext jssc,

120

String hostname,

121

int port,

122

StorageLevel storageLevel

123

)

124

125

// Java API with compression support

126

public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(

127

JavaStreamingContext jssc,

128

String hostname,

129

int port,

130

StorageLevel storageLevel,

131

boolean enableDecompression

132

)

133

```

134

135

### Stream Creation - Pull-based

136

137

Creates input streams that poll SparkSink for data with better reliability guarantees.

138

139

```scala { .api }

140

// Scala API with single address

141

def createPollingStream(

142

ssc: StreamingContext,

143

hostname: String,

144

port: Int,

145

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

146

): ReceiverInputDStream[SparkFlumeEvent]

147

148

// Scala API with multiple addresses

149

def createPollingStream(

150

ssc: StreamingContext,

151

addresses: Seq[InetSocketAddress],

152

storageLevel: StorageLevel

153

): ReceiverInputDStream[SparkFlumeEvent]

154

155

// Scala API with full configuration

156

def createPollingStream(

157

ssc: StreamingContext,

158

addresses: Seq[InetSocketAddress],

159

storageLevel: StorageLevel,

160

maxBatchSize: Int,

161

parallelism: Int

162

): ReceiverInputDStream[SparkFlumeEvent]

163

```

164

165

```java { .api }

166

// Java API with single address

167

public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(

168

JavaStreamingContext jssc,

169

String hostname,

170

int port

171

)

172

173

// Java API with custom storage level

174

public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(

175

JavaStreamingContext jssc,

176

String hostname,

177

int port,

178

StorageLevel storageLevel

179

)

180

181

// Java API with multiple addresses

182

public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(

183

JavaStreamingContext jssc,

184

Array[InetSocketAddress] addresses,

185

StorageLevel storageLevel

186

)

187

188

// Java API with full configuration

189

public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(

190

JavaStreamingContext jssc,

191

Array[InetSocketAddress] addresses,

192

StorageLevel storageLevel,

193

int maxBatchSize,

194

int parallelism

195

)

196

```

197

198

### Event Processing

199

200

Process SparkFlumeEvent objects received from Flume agents.

201

202

```scala { .api }

203

class SparkFlumeEvent() extends Externalizable {

204

var event: AvroFlumeEvent

205

def readExternal(in: ObjectInput): Unit

206

def writeExternal(out: ObjectOutput): Unit

207

}

208

209

object SparkFlumeEvent {

210

def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent

211

}

212

```

213

214

#### Usage Examples

215

216

```scala

217

// Extract event body as byte array

218

flumeStream.map(sparkEvent => {

219

val body = sparkEvent.event.getBody.array()

220

new String(body, "UTF-8")

221

})

222

223

// Extract event headers

224

flumeStream.map(sparkEvent => {

225

val headers = sparkEvent.event.getHeaders

226

headers.asScala.toMap

227

})

228

229

// Process both headers and body

230

flumeStream.map(sparkEvent => {

231

val event = sparkEvent.event

232

val bodyString = new String(event.getBody.array(), "UTF-8")

233

val headerMap = event.getHeaders.asScala.toMap

234

(bodyString, headerMap)

235

})

236

```

237

238

239

## Types

240

241

### Core Types

242

243

```scala { .api }

244

import org.apache.spark.streaming.dstream.ReceiverInputDStream

245

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream

246

import org.apache.spark.storage.StorageLevel

247

import org.apache.spark.streaming.StreamingContext

248

import org.apache.spark.streaming.api.java.JavaStreamingContext

249

250

// Flume-specific types

251

import org.apache.flume.source.avro.AvroFlumeEvent

252

import java.net.InetSocketAddress

253

import java.io.{Externalizable, ObjectInput, ObjectOutput}

254

```

255

256

257

## Configuration Patterns

258

259

### Storage Levels

260

261

Choose appropriate storage levels based on your reliability and performance requirements:

262

263

```scala

264

import org.apache.spark.storage.StorageLevel

265

266

// Default - serialized, replicated, disk + memory

267

StorageLevel.MEMORY_AND_DISK_SER_2

268

269

// Memory only, replicated

270

StorageLevel.MEMORY_ONLY_2

271

272

// Disk only, replicated

273

StorageLevel.DISK_ONLY_2

274

275

// Memory and disk, not serialized, replicated

276

StorageLevel.MEMORY_AND_DISK_2

277

```

278

279

### Default Configuration Constants

280

281

The pull-based streaming API uses the following default values:

282

283

```scala

284

// Default constants from FlumeUtils

285

DEFAULT_POLLING_BATCH_SIZE = 1000 // events per batch

286

DEFAULT_POLLING_PARALLELISM = 5 // concurrent connections

287

```

288

289

### Pull-based Configuration

290

291

```scala

292

// High throughput configuration

293

FlumeUtils.createPollingStream(

294

ssc,

295

addresses,

296

StorageLevel.MEMORY_AND_DISK_SER_2,

297

maxBatchSize = 2000, // Larger batches

298

parallelism = 10 // More concurrent connections

299

)

300

301

// Conservative configuration

302

FlumeUtils.createPollingStream(

303

ssc,

304

addresses,

305

StorageLevel.MEMORY_AND_DISK_SER_2,

306

maxBatchSize = 500, // Smaller batches

307

parallelism = 2 // Fewer connections

308

)

309

```

310

311

## Error Handling

312

313

Both stream types handle failures at different levels:

314

315

- **Push-based**: Network failures result in data loss unless Flume is configured with reliable channels

316

- **Pull-based**: Provides transaction support; failed batches are rolled back and can be retried

317

318

```scala

319

// Add error handling for stream processing

320

flumeStream.foreachRDD { rdd =>

321

try {

322

rdd.collect().foreach { sparkEvent =>

323

// Process event

324

processEvent(sparkEvent)

325

}

326

} catch {

327

case e: Exception =>

328

logError("Failed to process Flume events", e)

329

// Handle error appropriately

330

}

331

}

332

```