or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-streaming-flume-sink_2.10

Apache Spark Streaming custom Flume sink for reliable data ingestion

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-flume-sink_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-flume-sink_2.10@1.6.0

0

# Spark Streaming Flume Sink

1

2

Apache Spark Streaming custom Flume sink that enables reliable, pull-based data ingestion from Apache Flume into Spark Streaming applications. The sink implements a transactional protocol using Avro RPC to ensure fault-tolerance and data durability through proper transaction management and acknowledgment handling.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-flume-sink_2.10

7

- **Package Type**: Maven

8

- **Language**: Scala

9

- **Installation**: Add dependency to your Maven pom.xml:

10

11

```xml

12

<dependency>

13

<groupId>org.apache.spark</groupId>

14

<artifactId>spark-streaming-flume-sink_2.10</artifactId>

15

<version>1.6.3</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

The sink is deployed as a Flume sink component - no direct imports in application code are typically needed. However, for advanced integration or testing:

22

23

```scala { .api }

24

import org.apache.spark.streaming.flume.sink.SparkSink

25

import org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler

26

import org.apache.spark.streaming.flume.sink.SparkSinkUtils

27

import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol

28

// Note: SparkSinkEvent and EventBatch are generated from Avro IDL and imported from the protocol

29

```

30

31

## Basic Usage

32

33

### Flume Configuration

34

35

Configure the sink in your Flume agent configuration:

36

37

```properties

38

# Configure sink

39

agent.sinks = sparkSink

40

agent.sinks.sparkSink.type = org.apache.spark.streaming.flume.sink.SparkSink

41

agent.sinks.sparkSink.hostname = 0.0.0.0

42

agent.sinks.sparkSink.port = 9999

43

agent.sinks.sparkSink.channel = memoryChannel

44

agent.sinks.sparkSink.threads = 10

45

agent.sinks.sparkSink.timeout = 60

46

agent.sinks.sparkSink.backoffInterval = 200

47

```

48

49

### Spark Streaming Integration

50

51

Connect to the sink from Spark Streaming:

52

53

```scala

54

import org.apache.spark.streaming.flume._

55

56

val flumeStream = FlumeUtils.createPollingStream(

57

streamingContext,

58

"hostname",

59

9999

60

)

61

```

62

63

## Architecture

64

65

The Spark Streaming Flume Sink operates using a pull-based architecture:

66

67

- **SparkSink**: Main Flume sink that manages the Avro RPC server lifecycle

68

- **Avro RPC Protocol**: Defines communication between Spark and Flume using SparkFlumeProtocol

69

- **Transaction Management**: Ensures data durability through transactional event batching

70

- **Thread Pool**: Manages concurrent transaction processors for handling multiple Spark requests

71

- **Timeout Handling**: Automatic rollback for transactions that don't receive acknowledgments

72

73

## Capabilities

74

75

### Sink Configuration

76

77

Configure the SparkSink with Flume context parameters.

78

79

```scala { .api }

80

class SparkSink extends AbstractSink with Logging with Configurable {

81

/**

82

* Configure the sink with Flume context parameters

83

* @param ctx Flume context containing configuration properties

84

*/

85

def configure(ctx: Context): Unit

86

87

/**

88

* Start the sink and initialize Avro RPC server

89

*/

90

override def start(): Unit

91

92

/**

93

* Stop the sink and shutdown resources

94

*/

95

override def stop(): Unit

96

97

/**

98

* Main processing method (blocks until shutdown)

99

* @return Processing status

100

*/

101

override def process(): Status

102

103

/**

104

* Get the port the server is listening on

105

* @return Port number (package-private method for testing)

106

*/

107

private[flume] def getPort(): Int

108

109

/**

110

* Testing utility to count down when batches are received

111

* @param latch CountDownLatch to count down on batch receipt

112

*/

113

private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit

114

}

115

```

116

117

### Avro RPC Protocol

118

119

Protocol definition for communication between Spark Streaming and the Flume sink (defined in sparkflume.avdl).

120

121

```avro { .api }

122

@namespace("org.apache.spark.streaming.flume.sink")

123

protocol SparkFlumeProtocol {

124

/**

125

* Request a batch of events from the sink

126

* @param n Maximum number of events to return

127

* @return EventBatch containing events and sequence number

128

*/

129

EventBatch getEventBatch(int n);

130

131

/**

132

* Acknowledge successful processing of an event batch

133

* @param sequenceNumber Sequence number of the batch to acknowledge

134

*/

135

void ack(string sequenceNumber);

136

137

/**

138

* Signal failed processing of an event batch

139

* @param sequenceNumber Sequence number of the batch that failed

140

*/

141

void nack(string sequenceNumber);

142

}

143

```

144

145

### Protocol Implementation

146

147

The protocol is implemented by the SparkAvroCallbackHandler class:

148

149

```scala { .api }

150

private[flume] class SparkAvroCallbackHandler(

151

val threads: Int,

152

val channel: Channel,

153

val transactionTimeout: Int,

154

val backOffInterval: Int

155

) extends SparkFlumeProtocol with Logging {

156

157

/**

158

* Returns a batch of events to Spark over Avro RPC

159

* @param n Maximum number of events to return in a batch

160

* @return EventBatch instance with sequence number and events

161

*/

162

override def getEventBatch(n: Int): EventBatch

163

164

/**

165

* Called by Spark to indicate successful commit of a batch

166

* @param sequenceNumber The sequence number of the successful batch

167

*/

168

override def ack(sequenceNumber: CharSequence): Void

169

170

/**

171

* Called by Spark to indicate failed commit of a batch

172

* @param sequenceNumber The sequence number of the failed batch

173

*/

174

override def nack(sequenceNumber: CharSequence): Void

175

176

/**

177

* Shutdown the handler and executor threads

178

*/

179

def shutdown(): Unit

180

}

181

182

### Event Data Structures

183

184

Core data structures for event transmission (defined in Avro IDL and generated as Scala classes).

185

186

```avro { .api }

187

// Avro record definitions

188

record SparkSinkEvent {

189

map<string> headers;

190

bytes body;

191

}

192

193

record EventBatch {

194

string errorMsg = ""; // Empty indicates success, non-empty indicates error

195

string sequenceNumber; // Unique identifier for transaction tracking

196

array<SparkSinkEvent> events;

197

}

198

```

199

200

The Avro compiler generates these classes from the IDL definition. While the classes are used internally by the sink, they are not typically accessed directly by client code since the sink is configured through Flume agent properties rather than programmatic instantiation.

201

202

### Configuration Parameters

203

204

Available configuration parameters for the SparkSink.

205

206

```scala { .api }

207

private[flume] object SparkSinkConfig {

208

// Thread pool configuration

209

val THREADS = "threads"

210

val DEFAULT_THREADS = 10

211

212

// Transaction timeout configuration

213

val CONF_TRANSACTION_TIMEOUT = "timeout"

214

val DEFAULT_TRANSACTION_TIMEOUT = 60 // seconds

215

216

// Network binding configuration

217

val CONF_HOSTNAME = "hostname"

218

val DEFAULT_HOSTNAME = "0.0.0.0"

219

val CONF_PORT = "port" // No default - mandatory

220

221

// Backoff configuration

222

val CONF_BACKOFF_INTERVAL = "backoffInterval"

223

val DEFAULT_BACKOFF_INTERVAL = 200 // milliseconds

224

}

225

```

226

227

### Transaction Processing

228

229

Internal transaction management handled by the TransactionProcessor class:

230

231

```scala { .api }

232

private class TransactionProcessor(

233

val channel: Channel,

234

val seqNum: String,

235

var maxBatchSize: Int,

236

val transactionTimeout: Int,

237

val backOffInterval: Int,

238

val parent: SparkAvroCallbackHandler

239

) extends Callable[Void] with Logging {

240

241

/**

242

* Get an event batch from the channel (blocks until available)

243

* @return EventBatch instance with events or error message

244

*/

245

def getEventBatch: EventBatch

246

247

/**

248

* Called when ACK or NACK received from Spark

249

* @param success true for ACK, false for NACK

250

*/

251

def batchProcessed(success: Boolean): Unit

252

253

/**

254

* Shutdown the transaction processor

255

*/

256

def shutdown(): Unit

257

258

/**

259

* Main execution method (implements Callable)

260

*/

261

override def call(): Void

262

}

263

264

### Utility Functions

265

266

Helper functions for working with event batches.

267

268

```scala { .api }

269

object SparkSinkUtils {

270

/**

271

* Determine if an event batch represents an error condition

272

* Checks if errorMsg is non-empty using !batch.getErrorMsg.toString.equals("")

273

* @param batch The EventBatch to check

274

* @return true if the batch contains an error, false otherwise

275

*/

276

def isErrorBatch(batch: EventBatch): Boolean

277

}

278

```

279

280

### Supporting Classes

281

282

Additional utility classes used internally:

283

284

```scala { .api }

285

/**

286

* Thread factory for creating daemon threads with specific naming

287

*/

288

private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {

289

/**

290

* Create a new daemon thread with formatted name

291

* @param r Runnable to execute in the thread

292

* @return New daemon Thread instance

293

*/

294

override def newThread(r: Runnable): Thread

295

}

296

297

/**

298

* Logging trait providing SLF4J-based logging functionality

299

*/

300

private[sink] trait Logging {

301

protected def logInfo(msg: => String): Unit

302

protected def logDebug(msg: => String): Unit

303

protected def logWarning(msg: => String): Unit

304

protected def logError(msg: => String): Unit

305

protected def logTrace(msg: => String): Unit

306

307

// Overloaded versions that accept Throwable

308

protected def logInfo(msg: => String, throwable: Throwable): Unit

309

protected def logDebug(msg: => String, throwable: Throwable): Unit

310

protected def logWarning(msg: => String, throwable: Throwable): Unit

311

protected def logError(msg: => String, throwable: Throwable): Unit

312

protected def logTrace(msg: => String, throwable: Throwable): Unit

313

314

protected def isTraceEnabled(): Boolean

315

}

316

```

317

318

## Configuration Options

319

320

### Required Parameters

321

322

- **port**: Port number for the Avro RPC server (no default)

323

324

### Optional Parameters

325

326

- **hostname**: Hostname to bind to (default: "0.0.0.0")

327

- **threads**: Number of processing threads (default: 10)

328

- **timeout**: Transaction timeout in seconds (default: 60)

329

- **backoffInterval**: Backoff interval in milliseconds when no events available (default: 200)

330

331

## Error Handling

332

333

The sink provides comprehensive error handling:

334

335

- **Transaction Timeouts**: Automatic rollback if Spark doesn't acknowledge within timeout period

336

- **NACK Handling**: Transaction rollback when Spark signals processing failure

337

- **Error Batches**: Invalid batches indicated by non-empty errorMsg field

338

- **Resource Cleanup**: Proper shutdown of threads and network resources

339

- **Channel Errors**: Graceful handling of Flume channel communication failures

340

341

Common error scenarios:

342

343

- **No Events Available**: Returns error batch when channel has no events after multiple attempts

344

- **Channel Transaction Failure**: Error batch when unable to create Flume transaction

345

- **Network Connectivity**: Server startup failures logged with appropriate error messages

346

- **Thread Pool Exhaustion**: Blocks until threads become available rather than failing

347

348

## Thread Safety

349

350

The sink ensures thread safety through:

351

352

- **Thread-Local Transactions**: Flume transactions handled in dedicated threads

353

- **Synchronized Access**: Protected access to shared transaction processor map

354

- **Atomic Counters**: Thread-safe sequence number generation

355

- **Proper Shutdown**: Coordinated shutdown of thread pools and network resources