or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdindex.mdinitial-positions.mdpython-api.mdstream-creation.md

python-api.mddocs/

0

# Python API Usage

1

2

Python interface for creating Kinesis streams through PySpark with simplified parameter handling and automatic type conversion.

3

4

## Core Classes

5

6

```python { .api }

7

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel

8

9

class KinesisUtils:

10

@staticmethod

11

def createStream(

12

ssc: StreamingContext,

13

kinesisAppName: str,

14

streamName: str,

15

endpointUrl: str,

16

regionName: str,

17

initialPositionInStream: int,

18

checkpointInterval: int,

19

metricsLevel: int = MetricsLevel.DETAILED,

20

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,

21

awsAccessKeyId: Optional[str] = None,

22

awsSecretKey: Optional[str] = None,

23

decoder: Callable[[Optional[bytes]], T] = utf8_decoder,

24

stsAssumeRoleArn: Optional[str] = None,

25

stsSessionName: Optional[str] = None,

26

stsExternalId: Optional[str] = None,

27

) -> DStream[T]

28

29

class InitialPositionInStream:

30

LATEST: int = 0

31

TRIM_HORIZON: int = 1

32

33

class MetricsLevel:

34

DETAILED: int = 0

35

SUMMARY: int = 1

36

NONE: int = 2

37

38

def utf8_decoder(s: Optional[bytes]) -> Optional[str]

39

```

40

41

## Basic Usage

42

43

```python

44

from pyspark.streaming import StreamingContext

45

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

46

47

# Create StreamingContext

48

ssc = StreamingContext(sparkContext, 2) # 2 second batch interval

49

50

# Create Kinesis stream

51

kinesis_stream = KinesisUtils.createStream(

52

ssc=ssc,

53

kinesisAppName="my-python-kinesis-app",

54

streamName="my-kinesis-stream",

55

endpointUrl="https://kinesis.us-east-1.amazonaws.com",

56

regionName="us-east-1",

57

initialPositionInStream=InitialPositionInStream.LATEST,

58

checkpointInterval=30 # 30 seconds

59

)

60

61

# Process the stream

62

kinesis_stream.map(lambda x: x.upper()) \

63

.flatMap(lambda line: line.split()) \

64

.map(lambda word: (word, 1)) \

65

.reduceByKey(lambda a, b: a + b) \

66

.pprint()

67

68

ssc.start()

69

ssc.awaitTermination()

70

```

71

72

## Parameter Details

73

74

### Required Parameters

75

76

- **ssc**: `StreamingContext` - Spark StreamingContext object

77

- **kinesisAppName**: `str` - Application name for KCL checkpointing and metrics

78

- **streamName**: `str` - Name of the Kinesis stream to read from

79

- **endpointUrl**: `str` - Kinesis service endpoint URL

80

- **regionName**: `str` - AWS region name for the Kinesis stream

81

- **initialPositionInStream**: `int` - Where to start reading (use `InitialPositionInStream` constants)

82

- **checkpointInterval**: `int` - Checkpoint interval in seconds

83

84

### Optional Parameters

85

86

#### Metrics Configuration

87

- **metricsLevel**: `int` - CloudWatch metrics level (default: `MetricsLevel.DETAILED`)

88

- **storageLevel**: `StorageLevel` - How to store received data (default: `MEMORY_AND_DISK_2`)

89

90

#### AWS Credentials

91

- **awsAccessKeyId**: `str` - AWS access key ID (optional, uses default provider chain if not specified)

92

- **awsSecretKey**: `str` - AWS secret access key (optional, must be provided with access key ID)

93

94

#### STS Assume Role

95

- **stsAssumeRoleArn**: `str` - ARN of IAM role to assume via STS

96

- **stsSessionName**: `str` - Name for the STS session

97

- **stsExternalId**: `str` - External ID for STS assume role

98

99

#### Data Processing

100

- **decoder**: `Callable[[Optional[bytes]], T]` - Function to decode byte data (default: `utf8_decoder`)

101

102

## Initial Position Configuration

103

104

```python

105

from pyspark.streaming.kinesis import InitialPositionInStream

106

107

# Start from latest records

108

kinesis_stream = KinesisUtils.createStream(

109

ssc=ssc,

110

kinesisAppName="my-app",

111

streamName="my-stream",

112

endpointUrl="https://kinesis.us-east-1.amazonaws.com",

113

regionName="us-east-1",

114

initialPositionInStream=InitialPositionInStream.LATEST,

115

checkpointInterval=30

116

)

117

118

# Start from earliest available records

119

kinesis_stream = KinesisUtils.createStream(

120

ssc=ssc,

121

kinesisAppName="my-app",

122

streamName="my-stream",

123

endpointUrl="https://kinesis.us-east-1.amazonaws.com",

124

regionName="us-east-1",

125

initialPositionInStream=InitialPositionInStream.TRIM_HORIZON,

126

checkpointInterval=30

127

)

128

```

129

130

## Metrics Configuration

131

132

```python

133

from pyspark.streaming.kinesis import MetricsLevel

134

135

# Detailed metrics (default)

136

kinesis_stream = KinesisUtils.createStream(

137

ssc=ssc,

138

kinesisAppName="my-app",

139

streamName="my-stream",

140

endpointUrl="https://kinesis.us-east-1.amazonaws.com",

141

regionName="us-east-1",

142

initialPositionInStream=InitialPositionInStream.LATEST,

143

checkpointInterval=30,

144

metricsLevel=MetricsLevel.DETAILED

145

)

146

147

# Summary metrics only

148

kinesis_stream = KinesisUtils.createStream(

149

# ... other parameters ...

150

metricsLevel=MetricsLevel.SUMMARY

151

)

152

153

# No metrics

154

kinesis_stream = KinesisUtils.createStream(

155

# ... other parameters ...

156

metricsLevel=MetricsLevel.NONE

157

)

158

```

159

160

## AWS Credentials Configuration

161

162

### Default Credentials

163

```python

164

# Uses default AWS credentials provider chain

165

kinesis_stream = KinesisUtils.createStream(

166

ssc=ssc,

167

kinesisAppName="my-app",

168

streamName="my-stream",

169

endpointUrl="https://kinesis.us-east-1.amazonaws.com",

170

regionName="us-east-1",

171

initialPositionInStream=InitialPositionInStream.LATEST,

172

checkpointInterval=30

173

# No AWS credentials specified - uses default provider chain

174

)

175

```

176

177

### Basic Credentials

178

```python

179

kinesis_stream = KinesisUtils.createStream(

180

ssc=ssc,

181

kinesisAppName="my-app",

182

streamName="my-stream",

183

endpointUrl="https://kinesis.us-east-1.amazonaws.com",

184

regionName="us-east-1",

185

initialPositionInStream=InitialPositionInStream.LATEST,

186

checkpointInterval=30,

187

awsAccessKeyId="AKIAIOSFODNN7EXAMPLE",

188

awsSecretKey="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"

189

)

190

```

191

192

### STS Assume Role

193

```python

194

kinesis_stream = KinesisUtils.createStream(

195

ssc=ssc,

196

kinesisAppName="my-app",

197

streamName="my-stream",

198

endpointUrl="https://kinesis.us-east-1.amazonaws.com",

199

regionName="us-east-1",

200

initialPositionInStream=InitialPositionInStream.LATEST,

201

checkpointInterval=30,

202

stsAssumeRoleArn="arn:aws:iam::123456789012:role/KinesisAccessRole",

203

stsSessionName="python-kinesis-session",

204

stsExternalId="unique-external-id" # Optional

205

)

206

```

207

208

## Custom Data Decoders

209

210

### Default UTF-8 Decoder

211

```python

212

from pyspark.streaming.kinesis import utf8_decoder

213

214

# Default decoder converts bytes to UTF-8 strings

215

kinesis_stream = KinesisUtils.createStream(

216

# ... parameters ...

217

decoder=utf8_decoder # This is the default

218

)

219

```

220

221

### JSON Decoder

222

```python

223

import json

224

225

def json_decoder(data):

226

if data is None:

227

return None

228

try:

229

return json.loads(data.decode('utf-8'))

230

except (ValueError, UnicodeDecodeError):

231

return None

232

233

kinesis_stream = KinesisUtils.createStream(

234

# ... parameters ...

235

decoder=json_decoder

236

)

237

```

238

239

### Binary Decoder

240

```python

241

def binary_decoder(data):

242

# Return raw bytes without decoding

243

return data

244

245

kinesis_stream = KinesisUtils.createStream(

246

# ... parameters ...

247

decoder=binary_decoder

248

)

249

```

250

251

## Storage Level Configuration

252

253

```python

254

from pyspark import StorageLevel

255

256

# Memory and disk with replication

257

kinesis_stream = KinesisUtils.createStream(

258

# ... parameters ...

259

storageLevel=StorageLevel.MEMORY_AND_DISK_2

260

)

261

262

# Memory only

263

kinesis_stream = KinesisUtils.createStream(

264

# ... parameters ...

265

storageLevel=StorageLevel.MEMORY_ONLY

266

)

267

268

# Disk only

269

kinesis_stream = KinesisUtils.createStream(

270

# ... parameters ...

271

storageLevel=StorageLevel.DISK_ONLY

272

)

273

```

274

275

## Error Handling

276

277

### Missing JAR File

278

```python

279

try:

280

kinesis_stream = KinesisUtils.createStream(

281

# ... parameters ...

282

)

283

except Exception as e:

284

if "streaming-kinesis-asl" in str(e):

285

print("Missing Kinesis JAR file. Add spark-streaming-kinesis-asl to classpath")

286

raise

287

```

288

289

### Invalid Credentials

290

```python

291

# Both access key ID and secret key must be provided together

292

try:

293

kinesis_stream = KinesisUtils.createStream(

294

# ... parameters ...

295

awsAccessKeyId="AKIAIOSFODNN7EXAMPLE",

296

awsSecretKey=None # Invalid: missing secret key

297

)

298

except IllegalArgumentException as e:

299

print(f"Credential error: {e}")

300

```

301

302

### STS Parameter Validation

303

```python

304

# All STS parameters must be provided together

305

try:

306

kinesis_stream = KinesisUtils.createStream(

307

# ... parameters ...

308

stsAssumeRoleArn="arn:aws:iam::123456789012:role/MyRole",

309

stsSessionName=None # Invalid: missing session name

310

)

311

except IllegalArgumentException as e:

312

print(f"STS parameter error: {e}")

313

```

314

315

## Complete Example

316

317

```python

318

from pyspark import SparkContext, SparkConf

319

from pyspark.streaming import StreamingContext

320

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel

321

from pyspark import StorageLevel

322

import json

323

324

# Spark configuration

325

conf = SparkConf().setAppName("KinesisWordCount")

326

sc = SparkContext(conf=conf)

327

ssc = StreamingContext(sc, 2) # 2 second batch interval

328

329

# Enable checkpointing

330

ssc.checkpoint("s3://my-bucket/checkpoints/")

331

332

# JSON decoder for processing structured data

333

def json_decoder(data):

334

if data is None:

335

return None

336

try:

337

return json.loads(data.decode('utf-8'))

338

except (ValueError, UnicodeDecodeError):

339

return None

340

341

# Create Kinesis stream

342

kinesis_stream = KinesisUtils.createStream(

343

ssc=ssc,

344

kinesisAppName="python-kinesis-word-count",

345

streamName="text-stream",

346

endpointUrl="https://kinesis.us-west-2.amazonaws.com",

347

regionName="us-west-2",

348

initialPositionInStream=InitialPositionInStream.LATEST,

349

checkpointInterval=30,

350

metricsLevel=MetricsLevel.SUMMARY,

351

storageLevel=StorageLevel.MEMORY_AND_DISK_2,

352

decoder=json_decoder,

353

stsAssumeRoleArn="arn:aws:iam::123456789012:role/KinesisRole",

354

stsSessionName="python-session"

355

)

356

357

# Process the stream

358

word_counts = kinesis_stream \

359

.filter(lambda record: record is not None and 'text' in record) \

360

.map(lambda record: record['text']) \

361

.flatMap(lambda text: text.split()) \

362

.map(lambda word: (word.lower(), 1)) \

363

.reduceByKey(lambda a, b: a + b)

364

365

word_counts.pprint()

366

367

# Start streaming

368

ssc.start()

369

ssc.awaitTermination()

370

```