or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-streaming-flume_2-10

Spark Streaming integration with Apache Flume for real-time data ingestion from Flume agents

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-flume_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume_2-10@1.6.0

0

# Spark Streaming Flume

1

2

Spark Streaming Flume provides seamless integration between Apache Spark Streaming and Apache Flume for real-time data ingestion. It offers two complementary patterns: push-based receivers where Flume agents push data directly to Spark, and pull-based polling where Spark actively pulls data from Flume sinks for enhanced reliability.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-flume_2.10

7

- **Package Type**: Maven

8

- **Language**: Scala with Java interop

9

- **Installation**: `org.apache.spark:spark-streaming-flume_2.10:1.6.3`

10

- **Dependencies**: Requires Apache Flume libraries for event processing

11

12

## Core Imports

13

14

Scala:

15

```scala

16

import org.apache.spark.streaming.flume.FlumeUtils

17

import org.apache.spark.streaming.flume.SparkFlumeEvent

18

import org.apache.spark.storage.StorageLevel

19

import java.net.InetSocketAddress

20

import scala.collection.JavaConverters._

21

22

// For direct event manipulation (from Apache Flume dependency)

23

import org.apache.flume.source.avro.AvroFlumeEvent

24

```

25

26

Java:

27

```java

28

import org.apache.spark.streaming.flume.FlumeUtils;

29

import org.apache.spark.streaming.flume.SparkFlumeEvent;

30

import org.apache.spark.storage.StorageLevel;

31

import java.net.InetSocketAddress;

32

33

// For direct event manipulation (from Apache Flume dependency)

34

import org.apache.flume.source.avro.AvroFlumeEvent;

35

```

36

37

## Basic Usage

38

39

### Push-based Stream (Receiver Pattern)

40

41

```scala

42

import org.apache.spark.streaming.{StreamingContext, Seconds}

43

import org.apache.spark.streaming.flume.FlumeUtils

44

import org.apache.spark.storage.StorageLevel

45

46

val ssc = new StreamingContext(sparkConf, Seconds(5))

47

48

// Create Flume stream - Flume pushes data to this receiver

49

val flumeStream = FlumeUtils.createStream(

50

ssc,

51

"localhost", // hostname where receiver listens

52

9999, // port where receiver listens

53

StorageLevel.MEMORY_AND_DISK_SER_2

54

)

55

56

// Process the stream

57

flumeStream.map(sparkFlumeEvent => {

58

val event = sparkFlumeEvent.event

59

new String(event.getBody.array())

60

}).print()

61

62

ssc.start()

63

ssc.awaitTermination()

64

```

65

66

### Pull-based Stream (Polling Pattern)

67

68

```scala

69

import java.net.InetSocketAddress

70

import org.apache.spark.streaming.flume.FlumeUtils

71

72

val ssc = new StreamingContext(sparkConf, Seconds(5))

73

74

// Create polling stream - Spark pulls data from Flume sink

75

val pollingStream = FlumeUtils.createPollingStream(

76

ssc,

77

Seq(new InetSocketAddress("flume-host", 9090)), // Flume sink addresses

78

StorageLevel.MEMORY_AND_DISK_SER_2,

79

1000, // maxBatchSize

80

5 // parallelism

81

)

82

83

// Process the stream

84

pollingStream.map(sparkFlumeEvent => {

85

val headers = sparkFlumeEvent.event.getHeaders

86

val body = new String(sparkFlumeEvent.event.getBody.array())

87

s"Headers: $headers, Body: $body"

88

}).print()

89

90

ssc.start()

91

ssc.awaitTermination()

92

```

93

94

## Architecture

95

96

The Spark Streaming Flume integration is built around several key components:

97

98

- **FlumeUtils**: Main factory class providing stream creation methods for both patterns

99

- **SparkFlumeEvent**: Serializable wrapper for Flume events that can be processed in Spark

100

- **Push Pattern**: Flume agents use Avro RPC to push events to Spark receivers

101

- **Pull Pattern**: Spark receivers poll Flume sinks using custom Avro protocol with transaction support

102

- **Storage Integration**: Configurable storage levels for fault tolerance and performance tuning

103

104

## Capabilities

105

106

### Push-based Stream Creation

107

108

Creates input streams where Flume agents push data directly to Spark Streaming receivers using Avro RPC protocol.

109

110

```scala { .api }

111

object FlumeUtils {

112

/**

113

* Create a push-based input stream from a Flume source with default storage level

114

* @param ssc StreamingContext object

115

* @param hostname Hostname where the receiver will listen

116

* @param port Port where the receiver will listen

117

* @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)

118

* @return ReceiverInputDStream of SparkFlumeEvent objects

119

*/

120

def createStream(

121

ssc: StreamingContext,

122

hostname: String,

123

port: Int,

124

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

125

): ReceiverInputDStream[SparkFlumeEvent]

126

127

/**

128

* Create a push-based input stream with compression support

129

* @param ssc StreamingContext object

130

* @param hostname Hostname where the receiver will listen

131

* @param port Port where the receiver will listen

132

* @param storageLevel Storage level for received objects

133

* @param enableDecompression Enable Netty decompression for incoming data

134

* @return ReceiverInputDStream of SparkFlumeEvent objects

135

*/

136

def createStream(

137

ssc: StreamingContext,

138

hostname: String,

139

port: Int,

140

storageLevel: StorageLevel,

141

enableDecompression: Boolean

142

): ReceiverInputDStream[SparkFlumeEvent]

143

}

144

```

145

146

**Java API:**

147

148

```java { .api }

149

/**

150

* Create a push-based input stream from a Flume source (Java API)

151

* @param jssc JavaStreamingContext object

152

* @param hostname Hostname where the receiver will listen

153

* @param port Port where the receiver will listen

154

* @return JavaReceiverInputDStream of SparkFlumeEvent objects

155

*/

156

public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(

157

JavaStreamingContext jssc,

158

String hostname,

159

int port

160

);

161

162

/**

163

* Create a push-based input stream with custom storage level (Java API)

164

* @param jssc JavaStreamingContext object

165

* @param hostname Hostname where the receiver will listen

166

* @param port Port where the receiver will listen

167

* @param storageLevel Storage level for received objects

168

* @return JavaReceiverInputDStream of SparkFlumeEvent objects

169

*/

170

public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(

171

JavaStreamingContext jssc,

172

String hostname,

173

int port,

174

StorageLevel storageLevel

175

);

176

177

/**

178

* Create a push-based input stream with compression support (Java API)

179

* @param jssc JavaStreamingContext object

180

* @param hostname Hostname where the receiver will listen

181

* @param port Port where the receiver will listen

182

* @param storageLevel Storage level for received objects

183

* @param enableDecompression Enable Netty decompression for incoming data

184

* @return JavaReceiverInputDStream of SparkFlumeEvent objects

185

*/

186

public static JavaReceiverInputDStream<SparkFlumeEvent> createStream(

187

JavaStreamingContext jssc,

188

String hostname,

189

int port,

190

StorageLevel storageLevel,

191

boolean enableDecompression

192

);

193

```

194

195

### Pull-based Stream Creation

196

197

Creates input streams that actively poll Flume sinks for data, providing better reliability through transaction support and acknowledgments.

198

199

```scala { .api }

200

/**

201

* Create a pull-based polling stream with default batch size and parallelism

202

* @param ssc StreamingContext object

203

* @param hostname Address of the host running the Spark Sink

204

* @param port Port where the Spark Sink is listening

205

* @param storageLevel Storage level for received objects (default: MEMORY_AND_DISK_SER_2)

206

* @return ReceiverInputDStream of SparkFlumeEvent objects

207

*/

208

def createPollingStream(

209

ssc: StreamingContext,

210

hostname: String,

211

port: Int,

212

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

213

): ReceiverInputDStream[SparkFlumeEvent]

214

215

/**

216

* Create a pull-based polling stream with multiple sink addresses

217

* @param ssc StreamingContext object

218

* @param addresses List of InetSocketAddress representing Spark Sink hosts

219

* @param storageLevel Storage level for received objects

220

* @return ReceiverInputDStream of SparkFlumeEvent objects

221

*/

222

def createPollingStream(

223

ssc: StreamingContext,

224

addresses: Seq[InetSocketAddress],

225

storageLevel: StorageLevel

226

): ReceiverInputDStream[SparkFlumeEvent]

227

228

/**

229

* Create a pull-based polling stream with full configuration

230

* @param ssc StreamingContext object

231

* @param addresses List of InetSocketAddress representing Spark Sink hosts

232

* @param storageLevel Storage level for received objects

233

* @param maxBatchSize Maximum number of events per RPC call (default: 1000)

234

* @param parallelism Number of concurrent requests to the sink (default: 5)

235

* @return ReceiverInputDStream of SparkFlumeEvent objects

236

*/

237

def createPollingStream(

238

ssc: StreamingContext,

239

addresses: Seq[InetSocketAddress],

240

storageLevel: StorageLevel,

241

maxBatchSize: Int,

242

parallelism: Int

243

): ReceiverInputDStream[SparkFlumeEvent]

244

```

245

246

**Java API:**

247

248

```java { .api }

249

/**

250

* Create a pull-based polling stream (Java API)

251

* @param jssc JavaStreamingContext object

252

* @param hostname Address of the host running the Spark Sink

253

* @param port Port where the Spark Sink is listening

254

* @return JavaReceiverInputDStream of SparkFlumeEvent objects

255

*/

256

public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(

257

JavaStreamingContext jssc,

258

String hostname,

259

int port

260

);

261

262

/**

263

* Create a pull-based polling stream with custom storage level (Java API)

264

* @param jssc JavaStreamingContext object

265

* @param hostname Address of the host running the Spark Sink

266

* @param port Port where the Spark Sink is listening

267

* @param storageLevel Storage level for received objects

268

* @return JavaReceiverInputDStream of SparkFlumeEvent objects

269

*/

270

public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(

271

JavaStreamingContext jssc,

272

String hostname,

273

int port,

274

StorageLevel storageLevel

275

);

276

277

/**

278

* Create a pull-based polling stream with multiple sinks (Java API)

279

* @param jssc JavaStreamingContext object

280

* @param addresses Array of InetSocketAddress representing Spark Sink hosts

281

* @param storageLevel Storage level for received objects

282

* @return JavaReceiverInputDStream of SparkFlumeEvent objects

283

*/

284

public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(

285

JavaStreamingContext jssc,

286

InetSocketAddress[] addresses,

287

StorageLevel storageLevel

288

);

289

290

/**

291

* Create a pull-based polling stream with full configuration (Java API)

292

* @param jssc JavaStreamingContext object

293

* @param addresses Array of InetSocketAddress representing Spark Sink hosts

294

* @param storageLevel Storage level for received objects

295

* @param maxBatchSize Maximum number of events per RPC call

296

* @param parallelism Number of concurrent requests to the sink

297

* @return JavaReceiverInputDStream of SparkFlumeEvent objects

298

*/

299

public static JavaReceiverInputDStream<SparkFlumeEvent> createPollingStream(

300

JavaStreamingContext jssc,

301

InetSocketAddress[] addresses,

302

StorageLevel storageLevel,

303

int maxBatchSize,

304

int parallelism

305

);

306

```

307

308

## Types

309

310

### SparkFlumeEvent

311

312

Serializable wrapper for Flume events that can be processed in Spark transformations.

313

314

```scala { .api }

315

/**

316

* Serializable wrapper for AvroFlumeEvent with custom serialization

317

*/

318

class SparkFlumeEvent extends Externalizable {

319

/** The wrapped Flume event containing headers and body (mutable) */

320

var event: AvroFlumeEvent = new AvroFlumeEvent()

321

322

/** Deserialize from ObjectInput */

323

def readExternal(in: ObjectInput): Unit

324

325

/** Serialize to ObjectOutput */

326

def writeExternal(out: ObjectOutput): Unit

327

}

328

329

object SparkFlumeEvent {

330

/**

331

* Create SparkFlumeEvent from AvroFlumeEvent

332

* @param in AvroFlumeEvent to wrap

333

* @return SparkFlumeEvent instance

334

*/

335

def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent

336

}

337

```

338

339

### AvroFlumeEvent

340

341

Flume's standard event structure containing headers and body data. This class is provided by Apache Flume and imported from `org.apache.flume.source.avro.AvroFlumeEvent`.

342

343

```scala { .api }

344

/**

345

* Flume event structure (from Apache Flume library)

346

* Import: org.apache.flume.source.avro.AvroFlumeEvent

347

*/

348

class AvroFlumeEvent {

349

/** Get event headers as a Map */

350

def getHeaders(): java.util.Map[CharSequence, CharSequence]

351

352

/** Set event headers */

353

def setHeaders(headers: java.util.Map[CharSequence, CharSequence]): Unit

354

355

/** Get event body as ByteBuffer */

356

def getBody(): java.nio.ByteBuffer

357

358

/** Set event body */

359

def setBody(body: java.nio.ByteBuffer): Unit

360

}

361

```

362

363

### Common Usage Patterns

364

365

```scala

366

// Extract body as string

367

val bodyText = new String(sparkFlumeEvent.event.getBody.array())

368

369

// Extract specific header

370

val timestamp = sparkFlumeEvent.event.getHeaders.get("timestamp")

371

372

// Process headers and body together

373

val processedData = sparkFlumeEvent.event match {

374

case event =>

375

val headers = event.getHeaders.asScala.toMap

376

val body = new String(event.getBody.array())

377

(headers, body)

378

}

379

```

380

381

## Error Handling

382

383

Both integration patterns provide different reliability guarantees:

384

385

**Push-based streams**: Use at-least-once delivery semantics. If the receiver fails, Flume will retry sending events, potentially causing duplicates.

386

387

**Pull-based streams**: Provide exactly-once semantics through transaction support with ACK/NACK responses. Failed batches are automatically retried by Flume.

388

389

Common error scenarios:

390

- Network connectivity issues between Flume and Spark

391

- Receiver memory pressure causing event loss

392

- Flume agent failures during event transmission

393

- Serialization errors for malformed events

394

395

## Performance Tuning

396

397

Key configuration parameters for optimal performance:

398

399

**Storage Levels**:

400

- `MEMORY_ONLY`: Fastest access, risk of data loss

401

- `MEMORY_AND_DISK_SER_2`: Balanced performance and fault tolerance (default)

402

- `MEMORY_AND_DISK_SER`: Alternative serialized storage with single replication

403

- `DISK_ONLY`: Maximum fault tolerance, slower access

404

405

**Polling Configuration**:

406

- `maxBatchSize`: Larger batches reduce RPC overhead but increase memory usage (default: 1000)

407

- `parallelism`: Higher parallelism increases throughput but uses more resources (default: 5)

408

- Multiple sink addresses: Distribute load across multiple Flume sinks for scalability