0
# Python API Usage
1
2
Python interface for creating Kinesis streams through PySpark with simplified parameter handling and automatic type conversion.
3
4
## Core Classes
5
6
```python { .api }
7
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel
8
9
class KinesisUtils:
10
@staticmethod
11
def createStream(
12
ssc: StreamingContext,
13
kinesisAppName: str,
14
streamName: str,
15
endpointUrl: str,
16
regionName: str,
17
initialPositionInStream: int,
18
checkpointInterval: int,
19
metricsLevel: int = MetricsLevel.DETAILED,
20
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,
21
awsAccessKeyId: Optional[str] = None,
22
awsSecretKey: Optional[str] = None,
23
decoder: Callable[[Optional[bytes]], T] = utf8_decoder,
24
stsAssumeRoleArn: Optional[str] = None,
25
stsSessionName: Optional[str] = None,
26
stsExternalId: Optional[str] = None,
27
) -> DStream[T]
28
29
class InitialPositionInStream:
30
LATEST: int = 0
31
TRIM_HORIZON: int = 1
32
33
class MetricsLevel:
34
DETAILED: int = 0
35
SUMMARY: int = 1
36
NONE: int = 2
37
38
def utf8_decoder(s: Optional[bytes]) -> Optional[str]
39
```
40
41
## Basic Usage
42
43
```python
44
from pyspark.streaming import StreamingContext
45
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
46
47
# Create StreamingContext
48
ssc = StreamingContext(sparkContext, 2) # 2 second batch interval
49
50
# Create Kinesis stream
51
kinesis_stream = KinesisUtils.createStream(
52
ssc=ssc,
53
kinesisAppName="my-python-kinesis-app",
54
streamName="my-kinesis-stream",
55
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
56
regionName="us-east-1",
57
initialPositionInStream=InitialPositionInStream.LATEST,
58
checkpointInterval=30 # 30 seconds
59
)
60
61
# Process the stream
62
kinesis_stream.map(lambda x: x.upper()) \
63
.flatMap(lambda line: line.split()) \
64
.map(lambda word: (word, 1)) \
65
.reduceByKey(lambda a, b: a + b) \
66
.pprint()
67
68
ssc.start()
69
ssc.awaitTermination()
70
```
71
72
## Parameter Details
73
74
### Required Parameters
75
76
- **ssc**: `StreamingContext` - Spark StreamingContext object
77
- **kinesisAppName**: `str` - Application name for KCL checkpointing and metrics
78
- **streamName**: `str` - Name of the Kinesis stream to read from
79
- **endpointUrl**: `str` - Kinesis service endpoint URL
80
- **regionName**: `str` - AWS region name for the Kinesis stream
81
- **initialPositionInStream**: `int` - Where to start reading (use `InitialPositionInStream` constants)
82
- **checkpointInterval**: `int` - Checkpoint interval in seconds
83
84
### Optional Parameters
85
86
#### Metrics Configuration
87
- **metricsLevel**: `int` - CloudWatch metrics level (default: `MetricsLevel.DETAILED`)
88
- **storageLevel**: `StorageLevel` - How to store received data (default: `MEMORY_AND_DISK_2`)
89
90
#### AWS Credentials
91
- **awsAccessKeyId**: `str` - AWS access key ID (optional, uses default provider chain if not specified)
92
- **awsSecretKey**: `str` - AWS secret access key (optional, must be provided with access key ID)
93
94
#### STS Assume Role
95
- **stsAssumeRoleArn**: `str` - ARN of IAM role to assume via STS
96
- **stsSessionName**: `str` - Name for the STS session
97
- **stsExternalId**: `str` - External ID for STS assume role
98
99
#### Data Processing
100
- **decoder**: `Callable[[Optional[bytes]], T]` - Function to decode byte data (default: `utf8_decoder`)
101
102
## Initial Position Configuration
103
104
```python
105
from pyspark.streaming.kinesis import InitialPositionInStream
106
107
# Start from latest records
108
kinesis_stream = KinesisUtils.createStream(
109
ssc=ssc,
110
kinesisAppName="my-app",
111
streamName="my-stream",
112
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
113
regionName="us-east-1",
114
initialPositionInStream=InitialPositionInStream.LATEST,
115
checkpointInterval=30
116
)
117
118
# Start from earliest available records
119
kinesis_stream = KinesisUtils.createStream(
120
ssc=ssc,
121
kinesisAppName="my-app",
122
streamName="my-stream",
123
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
124
regionName="us-east-1",
125
initialPositionInStream=InitialPositionInStream.TRIM_HORIZON,
126
checkpointInterval=30
127
)
128
```
129
130
## Metrics Configuration
131
132
```python
133
from pyspark.streaming.kinesis import MetricsLevel
134
135
# Detailed metrics (default)
136
kinesis_stream = KinesisUtils.createStream(
137
ssc=ssc,
138
kinesisAppName="my-app",
139
streamName="my-stream",
140
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
141
regionName="us-east-1",
142
initialPositionInStream=InitialPositionInStream.LATEST,
143
checkpointInterval=30,
144
metricsLevel=MetricsLevel.DETAILED
145
)
146
147
# Summary metrics only
148
kinesis_stream = KinesisUtils.createStream(
149
# ... other parameters ...
150
metricsLevel=MetricsLevel.SUMMARY
151
)
152
153
# No metrics
154
kinesis_stream = KinesisUtils.createStream(
155
# ... other parameters ...
156
metricsLevel=MetricsLevel.NONE
157
)
158
```
159
160
## AWS Credentials Configuration
161
162
### Default Credentials
163
```python
164
# Uses default AWS credentials provider chain
165
kinesis_stream = KinesisUtils.createStream(
166
ssc=ssc,
167
kinesisAppName="my-app",
168
streamName="my-stream",
169
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
170
regionName="us-east-1",
171
initialPositionInStream=InitialPositionInStream.LATEST,
172
checkpointInterval=30
173
# No AWS credentials specified - uses default provider chain
174
)
175
```
176
177
### Basic Credentials
178
```python
179
kinesis_stream = KinesisUtils.createStream(
180
ssc=ssc,
181
kinesisAppName="my-app",
182
streamName="my-stream",
183
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
184
regionName="us-east-1",
185
initialPositionInStream=InitialPositionInStream.LATEST,
186
checkpointInterval=30,
187
awsAccessKeyId="AKIAIOSFODNN7EXAMPLE",
188
awsSecretKey="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
189
)
190
```
191
192
### STS Assume Role
193
```python
194
kinesis_stream = KinesisUtils.createStream(
195
ssc=ssc,
196
kinesisAppName="my-app",
197
streamName="my-stream",
198
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
199
regionName="us-east-1",
200
initialPositionInStream=InitialPositionInStream.LATEST,
201
checkpointInterval=30,
202
stsAssumeRoleArn="arn:aws:iam::123456789012:role/KinesisAccessRole",
203
stsSessionName="python-kinesis-session",
204
stsExternalId="unique-external-id" # Optional
205
)
206
```
207
208
## Custom Data Decoders
209
210
### Default UTF-8 Decoder
211
```python
212
from pyspark.streaming.kinesis import utf8_decoder
213
214
# Default decoder converts bytes to UTF-8 strings
215
kinesis_stream = KinesisUtils.createStream(
216
# ... parameters ...
217
decoder=utf8_decoder # This is the default
218
)
219
```
220
221
### JSON Decoder
222
```python
223
import json
224
225
def json_decoder(data):
226
if data is None:
227
return None
228
try:
229
return json.loads(data.decode('utf-8'))
230
except (ValueError, UnicodeDecodeError):
231
return None
232
233
kinesis_stream = KinesisUtils.createStream(
234
# ... parameters ...
235
decoder=json_decoder
236
)
237
```
238
239
### Binary Decoder
240
```python
241
def binary_decoder(data):
242
# Return raw bytes without decoding
243
return data
244
245
kinesis_stream = KinesisUtils.createStream(
246
# ... parameters ...
247
decoder=binary_decoder
248
)
249
```
250
251
## Storage Level Configuration
252
253
```python
254
from pyspark import StorageLevel
255
256
# Memory and disk with replication
257
kinesis_stream = KinesisUtils.createStream(
258
# ... parameters ...
259
storageLevel=StorageLevel.MEMORY_AND_DISK_2
260
)
261
262
# Memory only
263
kinesis_stream = KinesisUtils.createStream(
264
# ... parameters ...
265
storageLevel=StorageLevel.MEMORY_ONLY
266
)
267
268
# Disk only
269
kinesis_stream = KinesisUtils.createStream(
270
# ... parameters ...
271
storageLevel=StorageLevel.DISK_ONLY
272
)
273
```
274
275
## Error Handling
276
277
### Missing JAR File
278
```python
279
try:
280
kinesis_stream = KinesisUtils.createStream(
281
# ... parameters ...
282
)
283
except Exception as e:
284
if "streaming-kinesis-asl" in str(e):
285
print("Missing Kinesis JAR file. Add spark-streaming-kinesis-asl to classpath")
286
raise
287
```
288
289
### Invalid Credentials
290
```python
291
# Both access key ID and secret key must be provided together
292
try:
293
kinesis_stream = KinesisUtils.createStream(
294
# ... parameters ...
295
awsAccessKeyId="AKIAIOSFODNN7EXAMPLE",
296
awsSecretKey=None # Invalid: missing secret key
297
)
298
except IllegalArgumentException as e:
299
print(f"Credential error: {e}")
300
```
301
302
### STS Parameter Validation
303
```python
304
# All STS parameters must be provided together
305
try:
306
kinesis_stream = KinesisUtils.createStream(
307
# ... parameters ...
308
stsAssumeRoleArn="arn:aws:iam::123456789012:role/MyRole",
309
stsSessionName=None # Invalid: missing session name
310
)
311
except IllegalArgumentException as e:
312
print(f"STS parameter error: {e}")
313
```
314
315
## Complete Example
316
317
```python
318
from pyspark import SparkContext, SparkConf
319
from pyspark.streaming import StreamingContext
320
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel
321
from pyspark import StorageLevel
322
import json
323
324
# Spark configuration
325
conf = SparkConf().setAppName("KinesisWordCount")
326
sc = SparkContext(conf=conf)
327
ssc = StreamingContext(sc, 2) # 2 second batch interval
328
329
# Enable checkpointing
330
ssc.checkpoint("s3://my-bucket/checkpoints/")
331
332
# JSON decoder for processing structured data
333
def json_decoder(data):
334
if data is None:
335
return None
336
try:
337
return json.loads(data.decode('utf-8'))
338
except (ValueError, UnicodeDecodeError):
339
return None
340
341
# Create Kinesis stream
342
kinesis_stream = KinesisUtils.createStream(
343
ssc=ssc,
344
kinesisAppName="python-kinesis-word-count",
345
streamName="text-stream",
346
endpointUrl="https://kinesis.us-west-2.amazonaws.com",
347
regionName="us-west-2",
348
initialPositionInStream=InitialPositionInStream.LATEST,
349
checkpointInterval=30,
350
metricsLevel=MetricsLevel.SUMMARY,
351
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
352
decoder=json_decoder,
353
stsAssumeRoleArn="arn:aws:iam::123456789012:role/KinesisRole",
354
stsSessionName="python-session"
355
)
356
357
# Process the stream
358
word_counts = kinesis_stream \
359
.filter(lambda record: record is not None and 'text' in record) \
360
.map(lambda record: record['text']) \
361
.flatMap(lambda text: text.split()) \
362
.map(lambda word: (word.lower(), 1)) \
363
.reduceByKey(lambda a, b: a + b)
364
365
word_counts.pprint()
366
367
# Start streaming
368
ssc.start()
369
ssc.awaitTermination()
370
```