or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Spark Streaming Flume Sink

1

2

A specialized Flume sink implementation that provides an Avro RPC server for Apache Spark streaming applications to poll data from Flume agents using a reliable, transaction-based approach.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-flume-sink_2.11

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Version**: 2.4.8

10

11

### Installation

12

13

**Maven:**

14

```xml

15

<dependency>

16

<groupId>org.apache.spark</groupId>

17

<artifactId>spark-streaming-flume-sink_2.11</artifactId>

18

<version>2.4.8</version>

19

</dependency>

20

```

21

22

**SBT:**

23

```scala

24

libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.4.8"

25

```

26

27

## Core Imports

28

29

```scala

30

// Main sink class

31

import org.apache.spark.streaming.flume.sink.SparkSink

32

33

// Flume configuration

34

import org.apache.flume.Context

35

import org.apache.flume.conf.Configurable

36

import org.apache.flume.sink.AbstractSink

37

import org.apache.flume.Sink.Status

38

39

// Avro protocol (generated from sparkflume.avdl)

40

import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol

41

import org.apache.spark.streaming.flume.sink.SparkSinkEvent

42

import org.apache.spark.streaming.flume.sink.EventBatch

43

44

// For Avro RPC client usage

45

import org.apache.avro.ipc.NettyTransceiver

46

import org.apache.avro.ipc.specific.SpecificRequestor

47

48

// Java concurrency for testing/monitoring

49

import java.util.concurrent.CountDownLatch

50

```

51

52

## Basic Usage

53

54

The SparkSink is deployed as a Flume sink component and configured through Flume's configuration system. It creates an Avro RPC server that Spark streaming applications connect to for polling events.

55

56

### Flume Configuration Example

57

58

```properties

59

# Flume agent configuration

60

agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink

61

agent.sinks.spark-sink.hostname = 0.0.0.0

62

agent.sinks.spark-sink.port = 9999

63

agent.sinks.spark-sink.threads = 5

64

agent.sinks.spark-sink.timeout = 30

65

agent.sinks.spark-sink.backoffInterval = 200

66

agent.sinks.spark-sink.channel = memory-channel

67

```

68

69

### Spark Streaming Integration

70

71

```scala

72

import org.apache.spark.streaming.flume.FlumeUtils

73

import org.apache.spark.streaming.StreamingContext

74

75

val ssc = new StreamingContext(sparkConf, Seconds(1))

76

val flumeStream = FlumeUtils.createPollingStream(ssc, "hostname", 9999)

77

```

78

79

## Architecture

80

81

The SparkSink implements a polling-based architecture instead of the traditional push-based Flume sink pattern:

82

83

1. **Avro RPC Server**: SparkSink creates an Avro server that listens for connections from Spark

84

2. **Transaction Management**: Each batch request creates a Flume channel transaction

85

3. **Thread Pool**: Configurable thread pool processes concurrent batch requests

86

4. **Acknowledgment Protocol**: Spark acknowledges successful/failed batch processing

87

5. **Timeout Handling**: Automatic transaction rollback on timeout or failure

88

89

## Configuration

90

91

### SparkSink Configuration Parameters

92

93

```scala { .api }

94

object SparkSinkConfig {

95

val THREADS: String = "threads"

96

val DEFAULT_THREADS: Int = 10

97

98

val CONF_TRANSACTION_TIMEOUT: String = "timeout"

99

val DEFAULT_TRANSACTION_TIMEOUT: Int = 60

100

101

val CONF_HOSTNAME: String = "hostname"

102

val DEFAULT_HOSTNAME: String = "0.0.0.0"

103

104

val CONF_PORT: String = "port"

105

// No default - must be specified

106

107

val CONF_BACKOFF_INTERVAL: String = "backoffInterval"

108

val DEFAULT_BACKOFF_INTERVAL: Int = 200

109

}

110

```

111

112

### Configuration Parameters

113

114

- **hostname** (optional): IP address or hostname to bind the server to. Default: "0.0.0.0"

115

- **port** (required): Port number for the Avro RPC server. No default value.

116

- **threads** (optional): Number of threads for processing batch requests. Default: 10

117

- **timeout** (optional): Transaction timeout in seconds. Default: 60 seconds

118

- **backoffInterval** (optional): Sleep interval in milliseconds when no events are available. Default: 200ms

119

120

## Core API Components

121

122

### SparkSink Class

123

124

```scala { .api }

125

class SparkSink extends AbstractSink with Logging with Configurable {

126

def start(): Unit

127

def stop(): Unit

128

def configure(ctx: Context): Unit

129

def process(): Status

130

131

// Package-private methods for testing

132

private[flume] def getPort(): Int

133

private[flume] def countdownWhenBatchReceived(latch: CountDownLatch): Unit

134

}

135

```

136

137

The main Flume sink implementation that:

138

- Extends Flume's `AbstractSink` for integration with Flume framework

139

- Implements `Configurable` for parameter configuration

140

- Provides `Logging` capabilities for operational monitoring

141

142

**Key Methods:**

143

- `start()`: Initializes and starts the Avro RPC server

144

- `stop()`: Shuts down the server and releases all resources

145

- `configure(ctx: Context)`: Configures the sink from Flume context parameters

146

- `process()`: Blocks the Flume framework thread (required by Flume sink interface)

147

- `getPort()`: Returns the actual port the server is listening on

148

149

### Avro Protocol Interface

150

151

```java { .api }

152

// Generated from sparkflume.avdl - Avro protocol interface

153

public interface SparkFlumeProtocol {

154

EventBatch getEventBatch(int n);

155

Void ack(CharSequence sequenceNumber);

156

Void nack(CharSequence sequenceNumber);

157

}

158

```

159

160

The Avro RPC protocol that Spark clients use to interact with the sink:

161

- `getEventBatch(n)`: Requests up to n events from the Flume channel

162

- `ack(sequenceNumber)`: Acknowledges successful processing of a batch

163

- `nack(sequenceNumber)`: Signals failed processing, triggering rollback

164

165

### Data Types

166

167

#### SparkSinkEvent

168

```java { .api }

169

// Generated from sparkflume.avdl - Avro record

170

public class SparkSinkEvent {

171

// Constructors

172

public SparkSinkEvent();

173

public SparkSinkEvent(java.util.Map<CharSequence, CharSequence> headers, java.nio.ByteBuffer body);

174

175

// Headers accessors

176

public java.util.Map<CharSequence, CharSequence> getHeaders();

177

public void setHeaders(java.util.Map<CharSequence, CharSequence> headers);

178

179

// Body accessors

180

public java.nio.ByteBuffer getBody();

181

public void setBody(java.nio.ByteBuffer body);

182

}

183

```

184

185

Represents a single Flume event with:

186

- Constructor takes `headers` (event metadata) and `body` (event payload)

187

- `getHeaders()`: Returns event metadata as key-value pairs

188

- `getBody()`: Returns event payload as binary data

189

190

#### EventBatch

191

```java { .api }

192

// Generated from sparkflume.avdl - Avro record

193

public class EventBatch {

194

// Constructors

195

public EventBatch();

196

public EventBatch(CharSequence errorMsg, CharSequence sequenceNumber, java.util.List<SparkSinkEvent> events);

197

198

// Error message accessors

199

public CharSequence getErrorMsg(); // Empty string indicates success

200

public void setErrorMsg(CharSequence errorMsg);

201

202

// Sequence number accessors

203

public CharSequence getSequenceNumber(); // Unique transaction identifier

204

public void setSequenceNumber(CharSequence sequenceNumber);

205

206

// Events list accessors

207

public java.util.List<SparkSinkEvent> getEvents();

208

public void setEvents(java.util.List<SparkSinkEvent> events);

209

}

210

```

211

212

Container for a batch of events returned to Spark:

213

- Constructor takes `errorMsg`, `sequenceNumber`, and `events` list

214

- `getErrorMsg()`: Returns error message if batch creation failed, empty string for success

215

- `getSequenceNumber()`: Returns unique identifier for transaction tracking and acknowledgment

216

- `getEvents()`: Returns list of SparkSinkEvent objects in the batch

217

- `setErrorMsg(errorMsg)`: Sets the error message for this batch

218

219

## Error Handling

220

221

### Transaction States

222

- **Success**: Spark calls `ack(sequenceNumber)` → transaction commits

223

- **Failure**: Spark calls `nack(sequenceNumber)` → transaction rolls back

224

- **Timeout**: No response within timeout period → automatic rollback

225

226

### Error Conditions

227

- **No Events Available**: Returns EventBatch with empty events list

228

- **Channel Errors**: Returns EventBatch with non-empty errorMsg

229

- **Connection Issues**: Avro RPC exceptions propagated to client

230

- **Configuration Errors**: ConfigurationException thrown during setup

231

232

### Retry Behavior

233

- Failed transactions (nack/timeout) leave events in Flume channel for retry

234

- Configurable backoff interval prevents excessive polling when no events available

235

- Thread pool sizing controls concurrent request handling capacity

236

237

## Threading Model

238

239

The SparkSink uses a multi-threaded architecture:

240

241

1. **Main Thread**: Handles Flume framework lifecycle (start/stop/configure)

242

2. **Avro Server Threads**: Netty-based server handles RPC connections

243

3. **Transaction Processor Threads**: Configurable pool processes batch requests

244

4. **Channel Transaction Threads**: Each Flume transaction must execute in its originating thread

245

246

### Thread Safety

247

- Transaction processors are isolated per request

248

- Sequence number generation uses atomic counters

249

- Shutdown coordination prevents resource leaks

250

- Flume channel transactions are thread-local by design

251

252

## Integration Patterns

253

254

### With Spark Streaming

255

```scala

256

// Spark side - polling from multiple sinks

257

val flumeStreams = (1 to numSinks).map { i =>

258

FlumeUtils.createPollingStream(ssc, hostnames(i), ports(i))

259

}

260

val unionStream = ssc.union(flumeStreams)

261

```

262

263

### High Availability Setup

264

```properties

265

# Multiple sink configuration for failover

266

agent.sinks.spark-sink1.hostname = host1

267

agent.sinks.spark-sink1.port = 9999

268

269

agent.sinks.spark-sink2.hostname = host2

270

agent.sinks.spark-sink2.port = 9999

271

272

agent.sinkgroups.spark-group.sinks = spark-sink1 spark-sink2

273

agent.sinkgroups.spark-group.processor.type = failover

274

```

275

276

### Performance Tuning

277

- **Thread Pool Size**: Match to expected concurrent Spark receivers

278

- **Transaction Timeout**: Balance between reliability and throughput

279

- **Batch Size**: Configured on Spark side, affects memory usage

280

- **Backoff Interval**: Reduce CPU usage when channels are empty

281

282

## Utility Components

283

284

### SparkSinkUtils

285

286

```scala { .api }

287

object SparkSinkUtils {

288

def isErrorBatch(batch: EventBatch): Boolean

289

}

290

```

291

292

Utility methods for working with event batches:

293

- `isErrorBatch(batch)`: Returns true if the batch represents an error condition (non-empty error message)

294

295

## Monitoring and Observability

296

297

The sink provides extensive logging through SLF4J:

298

- Connection establishment and teardown

299

- Transaction lifecycle events

300

- Error conditions and timeouts

301

- Performance metrics (batch sizes, processing times)

302

- Configuration parameter validation

303

304

Log levels can be configured to control verbosity:

305

- INFO: Operational events, configuration

306

- DEBUG: Detailed transaction flow

307

- WARN: Recoverable errors, timeouts

308

- ERROR: Unrecoverable failures