or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Spark Streaming Kinesis ASL

1

2

Spark Streaming Kinesis ASL provides seamless integration between Apache Spark Streaming and Amazon Kinesis Data Streams, enabling real-time processing of streaming data from Kinesis. It offers a comprehensive Kinesis receiver implementation that handles AWS credentials, checkpointing, metrics collection, and fault tolerance with configurable initial positions, automatic scaling based on Kinesis shard count, and efficient data processing through Spark's distributed computing capabilities.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-kinesis-asl_2.13

7

- **Package Type**: Maven

8

- **Language**: Scala/Java

9

- **Group ID**: org.apache.spark

10

- **Version**: 3.5.6

11

- **Installation**:

12

```xml

13

<dependency>

14

<groupId>org.apache.spark</groupId>

15

<artifactId>spark-streaming-kinesis-asl_2.13</artifactId>

16

<version>3.5.6</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

**Scala:**

23

```scala

24

import org.apache.spark.streaming.kinesis.KinesisInputDStream

25

import org.apache.spark.streaming.kinesis.{KinesisInitialPositions, SparkAWSCredentials}

26

import org.apache.spark.streaming.StreamingContext

27

```

28

29

**Java:**

30

```java

31

import org.apache.spark.streaming.kinesis.KinesisInputDStream;

32

import org.apache.spark.streaming.kinesis.KinesisInitialPositions;

33

import org.apache.spark.streaming.kinesis.SparkAWSCredentials;

34

import org.apache.spark.streaming.api.java.JavaStreamingContext;

35

import org.apache.spark.streaming.api.java.JavaDStream;

36

import org.apache.spark.storage.StorageLevel;

37

import org.apache.spark.SparkConf;

38

import org.apache.spark.streaming.Duration;

39

import scala.reflect.ClassTag$;

40

```

41

42

**Python:**

43

```python

44

from pyspark.streaming import StreamingContext

45

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel

46

from pyspark.storagelevel import StorageLevel

47

```

48

49

## Basic Usage

50

51

**Scala:**

52

```scala

53

import org.apache.spark.streaming.kinesis.KinesisInputDStream

54

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

55

import org.apache.spark.streaming.{StreamingContext, Duration}

56

import org.apache.spark.SparkConf

57

import org.apache.spark.storage.StorageLevel

58

59

val conf = new SparkConf().setAppName("KinesisExample")

60

val ssc = new StreamingContext(conf, Duration.seconds(10))

61

62

val kinesisStream = KinesisInputDStream.builder

63

.streamingContext(ssc)

64

.streamName("my-kinesis-stream")

65

.checkpointAppName("my-spark-app")

66

.regionName("us-east-1")

67

.initialPosition(new KinesisInitialPositions.Latest())

68

.build()

69

70

kinesisStream.foreachRDD { rdd =>

71

val records = rdd.collect()

72

records.foreach(record => println(new String(record)))

73

}

74

75

ssc.start()

76

ssc.awaitTermination()

77

```

78

79

**Java:**

80

```java

81

import org.apache.spark.streaming.kinesis.KinesisInputDStream;

82

import org.apache.spark.streaming.kinesis.KinesisInitialPositions;

83

import org.apache.spark.streaming.api.java.JavaStreamingContext;

84

import org.apache.spark.streaming.api.java.JavaDStream;

85

import org.apache.spark.streaming.Duration;

86

import org.apache.spark.SparkConf;

87

import org.apache.spark.storage.StorageLevel;

88

89

SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");

90

JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));

91

92

JavaDStream<byte[]> kinesisStream = JavaDStream.fromDStream(

93

KinesisInputDStream.builder()

94

.streamingContext(jssc)

95

.streamName("my-kinesis-stream")

96

.checkpointAppName("my-spark-app")

97

.regionName("us-east-1")

98

.initialPosition(new KinesisInitialPositions.Latest())

99

.build(),

100

ClassTag$.MODULE$.apply(byte[].class)

101

);

102

103

kinesisStream.foreachRDD(rdd -> {

104

List<byte[]> records = rdd.collect();

105

records.forEach(record -> System.out.println(new String(record)));

106

});

107

108

jssc.start();

109

jssc.awaitTermination();

110

```

111

112

**Python:**

113

```python

114

from pyspark import SparkContext, SparkConf

115

from pyspark.streaming import StreamingContext

116

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

117

118

conf = SparkConf().setAppName("PythonKinesisExample")

119

sc = SparkContext(conf=conf)

120

ssc = StreamingContext(sc, 10)

121

122

kinesisStream = KinesisUtils.createStream(

123

ssc,

124

kinesisAppName="my-spark-app",

125

streamName="my-kinesis-stream",

126

endpointUrl="https://kinesis.us-east-1.amazonaws.com",

127

regionName="us-east-1",

128

initialPositionInStream=InitialPositionInStream.LATEST,

129

checkpointInterval=10

130

)

131

132

def process_rdd(rdd):

133

records = rdd.collect()

134

for record in records:

135

if record:

136

print(record)

137

138

kinesisStream.foreachRDD(process_rdd)

139

140

ssc.start()

141

ssc.awaitTermination()

142

```

143

144

## Architecture

145

146

The Spark Streaming Kinesis ASL integration is built around several key components:

147

148

- **KinesisInputDStream**: The main DStream class that extends Spark's ReceiverInputDStream to handle Kinesis data

149

- **KinesisReceiver**: Uses Amazon's Kinesis Client Library (KCL) Worker to process data from multiple shards

150

- **Builder Pattern**: Fluent API for configuring Kinesis DStreams with sensible defaults

151

- **Fault Tolerance**: KinesisBackedBlockRDD enables recovery from failures using Kinesis sequence numbers

152

- **Checkpointing**: DynamoDB-based state management for tracking processing progress

153

154

## Capabilities

155

156

### Stream Creation and Configuration

157

158

Create and configure Kinesis DStreams with builder pattern for both Scala and Java APIs.

159

160

```scala { .api }

161

object KinesisInputDStream {

162

def builder: Builder

163

164

class Builder {

165

def streamingContext(ssc: StreamingContext): Builder

166

def streamingContext(jssc: JavaStreamingContext): Builder

167

def streamName(streamName: String): Builder

168

def checkpointAppName(appName: String): Builder

169

def endpointUrl(url: String): Builder

170

def regionName(regionName: String): Builder

171

def initialPosition(initialPosition: KinesisInitialPosition): Builder

172

@deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")

173

def initialPositionInStream(initialPosition: InitialPositionInStream): Builder

174

def checkpointInterval(interval: Duration): Builder

175

def storageLevel(storageLevel: StorageLevel): Builder

176

def kinesisCredentials(credentials: SparkAWSCredentials): Builder

177

def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder

178

def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder

179

def metricsLevel(metricsLevel: MetricsLevel): Builder

180

def metricsEnabledDimensions(dimensions: Set[String]): Builder

181

def build(): KinesisInputDStream[Array[Byte]]

182

def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]

183

}

184

}

185

```

186

187

### Initial Position Configuration

188

189

Configure where in the Kinesis stream to start reading data from - latest records, oldest available records, or from a specific timestamp.

190

191

```java { .api }

192

// Interface for initial position

193

interface KinesisInitialPosition {

194

InitialPositionInStream getPosition();

195

}

196

197

// Position implementations

198

class KinesisInitialPositions {

199

static class Latest implements KinesisInitialPosition

200

static class TrimHorizon implements KinesisInitialPosition

201

static class AtTimestamp implements KinesisInitialPosition {

202

public AtTimestamp(Date timestamp)

203

public Date getTimestamp()

204

}

205

}

206

```

207

208

### AWS Credentials Management

209

210

Handle AWS authentication through multiple credential providers including basic keys, STS assume role, and default AWS credential chains.

211

212

```scala { .api }

213

sealed trait SparkAWSCredentials {

214

def provider: AWSCredentialsProvider

215

}

216

217

case object DefaultCredentials extends SparkAWSCredentials

218

case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials

219

case class STSCredentials(

220

stsRoleArn: String,

221

stsSessionName: String,

222

stsExternalId: Option[String] = None,

223

longLivedCreds: SparkAWSCredentials = DefaultCredentials

224

) extends SparkAWSCredentials

225

226

object SparkAWSCredentials {

227

def builder: Builder

228

229

class Builder {

230

def basicCredentials(accessKeyId: String, secretKey: String): Builder

231

def stsCredentials(roleArn: String, sessionName: String): Builder

232

def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder

233

def build(): SparkAWSCredentials

234

}

235

}

236

```

237

238

### Python API

239

240

Python integration through KinesisUtils class providing a simpler interface for Kinesis streaming.

241

242

```python { .api }

243

class KinesisUtils:

244

@staticmethod

245

def createStream(

246

ssc: StreamingContext,

247

kinesisAppName: str,

248

streamName: str,

249

endpointUrl: str,

250

regionName: str,

251

initialPositionInStream: int,

252

checkpointInterval: int,

253

metricsLevel: int = MetricsLevel.DETAILED,

254

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,

255

awsAccessKeyId: Optional[str] = None,

256

awsSecretKey: Optional[str] = None,

257

decoder: Callable[[Optional[bytes]], T] = utf8_decoder,

258

stsAssumeRoleArn: Optional[str] = None,

259

stsSessionName: Optional[str] = None,

260

stsExternalId: Optional[str] = None

261

) -> DStream[T]

262

```

263

264

```python { .api }

265

class InitialPositionInStream:

266

"""Constants for initial position in Kinesis stream"""

267

LATEST = 0

268

TRIM_HORIZON = 1

269

270

class MetricsLevel:

271

"""Constants for CloudWatch metrics level"""

272

DETAILED = 0

273

SUMMARY = 1

274

NONE = 2

275

276

def utf8_decoder(s: Optional[bytes]) -> Optional[str]:

277

"""Default decoder function for Kinesis records"""

278

if s is None:

279

return None

280

return s.decode("utf-8")

281

```

282

283

### Advanced Configuration

284

285

Configure retry behavior, timeout settings, and CloudWatch metrics for production deployments.

286

287

```scala { .api }

288

// Kinesis read configuration

289

case class KinesisReadConfigurations(

290

maxRetries: Int,

291

retryWaitTimeMs: Long,

292

retryTimeoutMs: Long

293

)

294

295

object KinesisReadConfigurations {

296

def apply(): KinesisReadConfigurations

297

def apply(ssc: StreamingContext): KinesisReadConfigurations

298

299

val RETRY_MAX_ATTEMPTS_KEY: String = "spark.streaming.kinesis.retry.maxAttempts"

300

val RETRY_WAIT_TIME_KEY: String = "spark.streaming.kinesis.retry.waitTime"

301

val DEFAULT_MAX_RETRIES: Int = 3

302

val DEFAULT_RETRY_WAIT_TIME: String = "100ms"

303

val DEFAULT_RETRY_TIMEOUT: Long = 10000

304

}

305

306

```

307

308

## Types

309

310

```scala { .api }

311

// Core DStream type

312

class KinesisInputDStream[T: ClassTag](

313

_ssc: StreamingContext,

314

streamName: String,

315

endpointUrl: String,

316

regionName: String,

317

initialPosition: KinesisInitialPosition,

318

checkpointAppName: String,

319

checkpointInterval: Duration,

320

_storageLevel: StorageLevel,

321

messageHandler: Record => T,

322

kinesisCreds: SparkAWSCredentials,

323

dynamoDBCreds: Option[SparkAWSCredentials],

324

cloudWatchCreds: Option[SparkAWSCredentials],

325

metricsLevel: MetricsLevel,

326

metricsEnabledDimensions: Set[String]

327

) extends ReceiverInputDStream[T]

328

329

// Message handler function type

330

type MessageHandler[T] = Record => T

331

332

// Default constants

333

object KinesisInputDStream {

334

val DEFAULT_KINESIS_ENDPOINT_URL: String = "https://kinesis.us-east-1.amazonaws.com"

335

val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"

336

val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()

337

val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2

338

val DEFAULT_METRICS_LEVEL: MetricsLevel = KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL

339

val DEFAULT_METRICS_ENABLED_DIMENSIONS: Set[String] = KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet

340

}

341

```