Python interface for creating Kinesis streams through PySpark with simplified parameter handling and automatic type conversion.
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel
class KinesisUtils:
@staticmethod
def createStream(
ssc: StreamingContext,
kinesisAppName: str,
streamName: str,
endpointUrl: str,
regionName: str,
initialPositionInStream: int,
checkpointInterval: int,
metricsLevel: int = MetricsLevel.DETAILED,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,
awsAccessKeyId: Optional[str] = None,
awsSecretKey: Optional[str] = None,
decoder: Callable[[Optional[bytes]], T] = utf8_decoder,
stsAssumeRoleArn: Optional[str] = None,
stsSessionName: Optional[str] = None,
stsExternalId: Optional[str] = None,
) -> DStream[T]
class InitialPositionInStream:
LATEST: int = 0
TRIM_HORIZON: int = 1
class MetricsLevel:
DETAILED: int = 0
SUMMARY: int = 1
NONE: int = 2
def utf8_decoder(s: Optional[bytes]) -> Optional[str]from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
# Create StreamingContext
ssc = StreamingContext(sparkContext, 2) # 2 second batch interval
# Create Kinesis stream
kinesis_stream = KinesisUtils.createStream(
ssc=ssc,
kinesisAppName="my-python-kinesis-app",
streamName="my-kinesis-stream",
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
regionName="us-east-1",
initialPositionInStream=InitialPositionInStream.LATEST,
checkpointInterval=30 # 30 seconds
)
# Process the stream
kinesis_stream.map(lambda x: x.upper()) \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.pprint()
ssc.start()
ssc.awaitTermination()StreamingContext - Spark StreamingContext objectstr - Application name for KCL checkpointing and metricsstr - Name of the Kinesis stream to read fromstr - Kinesis service endpoint URLstr - AWS region name for the Kinesis streamint - Where to start reading (use InitialPositionInStream constants)int - Checkpoint interval in secondsint - CloudWatch metrics level (default: MetricsLevel.DETAILED)StorageLevel - How to store received data (default: MEMORY_AND_DISK_2)str - AWS access key ID (optional, uses default provider chain if not specified)str - AWS secret access key (optional, must be provided with access key ID)str - ARN of IAM role to assume via STSstr - Name for the STS sessionstr - External ID for STS assume roleCallable[[Optional[bytes]], T] - Function to decode byte data (default: utf8_decoder)from pyspark.streaming.kinesis import InitialPositionInStream
# Start from latest records
kinesis_stream = KinesisUtils.createStream(
ssc=ssc,
kinesisAppName="my-app",
streamName="my-stream",
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
regionName="us-east-1",
initialPositionInStream=InitialPositionInStream.LATEST,
checkpointInterval=30
)
# Start from earliest available records
kinesis_stream = KinesisUtils.createStream(
ssc=ssc,
kinesisAppName="my-app",
streamName="my-stream",
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
regionName="us-east-1",
initialPositionInStream=InitialPositionInStream.TRIM_HORIZON,
checkpointInterval=30
)from pyspark.streaming.kinesis import MetricsLevel
# Detailed metrics (default)
kinesis_stream = KinesisUtils.createStream(
ssc=ssc,
kinesisAppName="my-app",
streamName="my-stream",
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
regionName="us-east-1",
initialPositionInStream=InitialPositionInStream.LATEST,
checkpointInterval=30,
metricsLevel=MetricsLevel.DETAILED
)
# Summary metrics only
kinesis_stream = KinesisUtils.createStream(
# ... other parameters ...
metricsLevel=MetricsLevel.SUMMARY
)
# No metrics
kinesis_stream = KinesisUtils.createStream(
# ... other parameters ...
metricsLevel=MetricsLevel.NONE
)# Uses default AWS credentials provider chain
kinesis_stream = KinesisUtils.createStream(
ssc=ssc,
kinesisAppName="my-app",
streamName="my-stream",
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
regionName="us-east-1",
initialPositionInStream=InitialPositionInStream.LATEST,
checkpointInterval=30
# No AWS credentials specified - uses default provider chain
)kinesis_stream = KinesisUtils.createStream(
ssc=ssc,
kinesisAppName="my-app",
streamName="my-stream",
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
regionName="us-east-1",
initialPositionInStream=InitialPositionInStream.LATEST,
checkpointInterval=30,
awsAccessKeyId="AKIAIOSFODNN7EXAMPLE",
awsSecretKey="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)kinesis_stream = KinesisUtils.createStream(
ssc=ssc,
kinesisAppName="my-app",
streamName="my-stream",
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
regionName="us-east-1",
initialPositionInStream=InitialPositionInStream.LATEST,
checkpointInterval=30,
stsAssumeRoleArn="arn:aws:iam::123456789012:role/KinesisAccessRole",
stsSessionName="python-kinesis-session",
stsExternalId="unique-external-id" # Optional
)from pyspark.streaming.kinesis import utf8_decoder
# Default decoder converts bytes to UTF-8 strings
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
decoder=utf8_decoder # This is the default
)import json
def json_decoder(data):
if data is None:
return None
try:
return json.loads(data.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
return None
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
decoder=json_decoder
)def binary_decoder(data):
# Return raw bytes without decoding
return data
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
decoder=binary_decoder
)from pyspark import StorageLevel
# Memory and disk with replication
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
storageLevel=StorageLevel.MEMORY_AND_DISK_2
)
# Memory only
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
storageLevel=StorageLevel.MEMORY_ONLY
)
# Disk only
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
storageLevel=StorageLevel.DISK_ONLY
)try:
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
)
except Exception as e:
if "streaming-kinesis-asl" in str(e):
print("Missing Kinesis JAR file. Add spark-streaming-kinesis-asl to classpath")
raise# Both access key ID and secret key must be provided together
try:
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
awsAccessKeyId="AKIAIOSFODNN7EXAMPLE",
awsSecretKey=None # Invalid: missing secret key
)
except IllegalArgumentException as e:
print(f"Credential error: {e}")# All STS parameters must be provided together
try:
kinesis_stream = KinesisUtils.createStream(
# ... parameters ...
stsAssumeRoleArn="arn:aws:iam::123456789012:role/MyRole",
stsSessionName=None # Invalid: missing session name
)
except IllegalArgumentException as e:
print(f"STS parameter error: {e}")from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel
from pyspark import StorageLevel
import json
# Spark configuration
conf = SparkConf().setAppName("KinesisWordCount")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2) # 2 second batch interval
# Enable checkpointing
ssc.checkpoint("s3://my-bucket/checkpoints/")
# JSON decoder for processing structured data
def json_decoder(data):
if data is None:
return None
try:
return json.loads(data.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
return None
# Create Kinesis stream
kinesis_stream = KinesisUtils.createStream(
ssc=ssc,
kinesisAppName="python-kinesis-word-count",
streamName="text-stream",
endpointUrl="https://kinesis.us-west-2.amazonaws.com",
regionName="us-west-2",
initialPositionInStream=InitialPositionInStream.LATEST,
checkpointInterval=30,
metricsLevel=MetricsLevel.SUMMARY,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
decoder=json_decoder,
stsAssumeRoleArn="arn:aws:iam::123456789012:role/KinesisRole",
stsSessionName="python-session"
)
# Process the stream
word_counts = kinesis_stream \
.filter(lambda record: record is not None and 'text' in record) \
.map(lambda record: record['text']) \
.flatMap(lambda text: text.split()) \
.map(lambda word: (word.lower(), 1)) \
.reduceByKey(lambda a, b: a + b)
word_counts.pprint()
# Start streaming
ssc.start()
ssc.awaitTermination()