or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdindex.mdinitial-positions.mdpython-api.mdstream-creation.md
tile.json

python-api.mddocs/

Python API Usage

Python interface for creating Kinesis streams through PySpark with simplified parameter handling and automatic type conversion.

Core Classes

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel

class KinesisUtils:
    @staticmethod
    def createStream(
        ssc: StreamingContext,
        kinesisAppName: str,
        streamName: str,
        endpointUrl: str,
        regionName: str,
        initialPositionInStream: int,
        checkpointInterval: int,
        metricsLevel: int = MetricsLevel.DETAILED,
        storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,
        awsAccessKeyId: Optional[str] = None,
        awsSecretKey: Optional[str] = None,
        decoder: Callable[[Optional[bytes]], T] = utf8_decoder,
        stsAssumeRoleArn: Optional[str] = None,
        stsSessionName: Optional[str] = None,
        stsExternalId: Optional[str] = None,
    ) -> DStream[T]

class InitialPositionInStream:
    LATEST: int = 0
    TRIM_HORIZON: int = 1

class MetricsLevel:
    DETAILED: int = 0
    SUMMARY: int = 1
    NONE: int = 2

def utf8_decoder(s: Optional[bytes]) -> Optional[str]

Basic Usage

from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

# Create StreamingContext
ssc = StreamingContext(sparkContext, 2)  # 2 second batch interval

# Create Kinesis stream
kinesis_stream = KinesisUtils.createStream(
    ssc=ssc,
    kinesisAppName="my-python-kinesis-app",
    streamName="my-kinesis-stream", 
    endpointUrl="https://kinesis.us-east-1.amazonaws.com",
    regionName="us-east-1",
    initialPositionInStream=InitialPositionInStream.LATEST,
    checkpointInterval=30  # 30 seconds
)

# Process the stream
kinesis_stream.map(lambda x: x.upper()) \
    .flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .pprint()

ssc.start()
ssc.awaitTermination()

Parameter Details

Required Parameters

  • ssc: StreamingContext - Spark StreamingContext object
  • kinesisAppName: str - Application name for KCL checkpointing and metrics
  • streamName: str - Name of the Kinesis stream to read from
  • endpointUrl: str - Kinesis service endpoint URL
  • regionName: str - AWS region name for the Kinesis stream
  • initialPositionInStream: int - Where to start reading (use InitialPositionInStream constants)
  • checkpointInterval: int - Checkpoint interval in seconds

Optional Parameters

Metrics Configuration

  • metricsLevel: int - CloudWatch metrics level (default: MetricsLevel.DETAILED)
  • storageLevel: StorageLevel - How to store received data (default: MEMORY_AND_DISK_2)

AWS Credentials

  • awsAccessKeyId: str - AWS access key ID (optional, uses default provider chain if not specified)
  • awsSecretKey: str - AWS secret access key (optional, must be provided with access key ID)

STS Assume Role

  • stsAssumeRoleArn: str - ARN of IAM role to assume via STS
  • stsSessionName: str - Name for the STS session
  • stsExternalId: str - External ID for STS assume role

Data Processing

  • decoder: Callable[[Optional[bytes]], T] - Function to decode byte data (default: utf8_decoder)

Initial Position Configuration

from pyspark.streaming.kinesis import InitialPositionInStream

# Start from latest records
kinesis_stream = KinesisUtils.createStream(
    ssc=ssc,
    kinesisAppName="my-app",
    streamName="my-stream",
    endpointUrl="https://kinesis.us-east-1.amazonaws.com",
    regionName="us-east-1", 
    initialPositionInStream=InitialPositionInStream.LATEST,
    checkpointInterval=30
)

# Start from earliest available records
kinesis_stream = KinesisUtils.createStream(
    ssc=ssc,
    kinesisAppName="my-app",
    streamName="my-stream",
    endpointUrl="https://kinesis.us-east-1.amazonaws.com",
    regionName="us-east-1",
    initialPositionInStream=InitialPositionInStream.TRIM_HORIZON,
    checkpointInterval=30
)

Metrics Configuration

from pyspark.streaming.kinesis import MetricsLevel

# Detailed metrics (default)
kinesis_stream = KinesisUtils.createStream(
    ssc=ssc,
    kinesisAppName="my-app",
    streamName="my-stream",
    endpointUrl="https://kinesis.us-east-1.amazonaws.com",
    regionName="us-east-1",
    initialPositionInStream=InitialPositionInStream.LATEST,
    checkpointInterval=30,
    metricsLevel=MetricsLevel.DETAILED
)

# Summary metrics only
kinesis_stream = KinesisUtils.createStream(
    # ... other parameters ...
    metricsLevel=MetricsLevel.SUMMARY
)

# No metrics
kinesis_stream = KinesisUtils.createStream(
    # ... other parameters ...
    metricsLevel=MetricsLevel.NONE
)

AWS Credentials Configuration

Default Credentials

# Uses default AWS credentials provider chain
kinesis_stream = KinesisUtils.createStream(
    ssc=ssc,
    kinesisAppName="my-app",
    streamName="my-stream",
    endpointUrl="https://kinesis.us-east-1.amazonaws.com",
    regionName="us-east-1",
    initialPositionInStream=InitialPositionInStream.LATEST,
    checkpointInterval=30
    # No AWS credentials specified - uses default provider chain
)

Basic Credentials

kinesis_stream = KinesisUtils.createStream(
    ssc=ssc,
    kinesisAppName="my-app",
    streamName="my-stream",
    endpointUrl="https://kinesis.us-east-1.amazonaws.com",
    regionName="us-east-1",
    initialPositionInStream=InitialPositionInStream.LATEST,
    checkpointInterval=30,
    awsAccessKeyId="AKIAIOSFODNN7EXAMPLE",
    awsSecretKey="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)

STS Assume Role

kinesis_stream = KinesisUtils.createStream(
    ssc=ssc,
    kinesisAppName="my-app",
    streamName="my-stream",
    endpointUrl="https://kinesis.us-east-1.amazonaws.com",
    regionName="us-east-1",
    initialPositionInStream=InitialPositionInStream.LATEST,
    checkpointInterval=30,
    stsAssumeRoleArn="arn:aws:iam::123456789012:role/KinesisAccessRole",
    stsSessionName="python-kinesis-session",
    stsExternalId="unique-external-id"  # Optional
)

Custom Data Decoders

Default UTF-8 Decoder

from pyspark.streaming.kinesis import utf8_decoder

# Default decoder converts bytes to UTF-8 strings
kinesis_stream = KinesisUtils.createStream(
    # ... parameters ...
    decoder=utf8_decoder  # This is the default
)

JSON Decoder

import json

def json_decoder(data):
    if data is None:
        return None
    try:
        return json.loads(data.decode('utf-8'))
    except (ValueError, UnicodeDecodeError):
        return None

kinesis_stream = KinesisUtils.createStream(
    # ... parameters ...
    decoder=json_decoder
)

Binary Decoder

def binary_decoder(data):
    # Return raw bytes without decoding
    return data

kinesis_stream = KinesisUtils.createStream(
    # ... parameters ...
    decoder=binary_decoder
)

Storage Level Configuration

from pyspark import StorageLevel

# Memory and disk with replication
kinesis_stream = KinesisUtils.createStream(
    # ... parameters ...
    storageLevel=StorageLevel.MEMORY_AND_DISK_2
)

# Memory only
kinesis_stream = KinesisUtils.createStream(
    # ... parameters ...
    storageLevel=StorageLevel.MEMORY_ONLY
)

# Disk only
kinesis_stream = KinesisUtils.createStream(
    # ... parameters ...
    storageLevel=StorageLevel.DISK_ONLY
)

Error Handling

Missing JAR File

try:
    kinesis_stream = KinesisUtils.createStream(
        # ... parameters ...
    )
except Exception as e:
    if "streaming-kinesis-asl" in str(e):
        print("Missing Kinesis JAR file. Add spark-streaming-kinesis-asl to classpath")
        raise

Invalid Credentials

# Both access key ID and secret key must be provided together
try:
    kinesis_stream = KinesisUtils.createStream(
        # ... parameters ...
        awsAccessKeyId="AKIAIOSFODNN7EXAMPLE",
        awsSecretKey=None  # Invalid: missing secret key
    )
except IllegalArgumentException as e:
    print(f"Credential error: {e}")

STS Parameter Validation

# All STS parameters must be provided together
try:
    kinesis_stream = KinesisUtils.createStream(
        # ... parameters ...
        stsAssumeRoleArn="arn:aws:iam::123456789012:role/MyRole",
        stsSessionName=None  # Invalid: missing session name
    )
except IllegalArgumentException as e:
    print(f"STS parameter error: {e}")

Complete Example

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel
from pyspark import StorageLevel
import json

# Spark configuration
conf = SparkConf().setAppName("KinesisWordCount")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)  # 2 second batch interval

# Enable checkpointing
ssc.checkpoint("s3://my-bucket/checkpoints/")

# JSON decoder for processing structured data
def json_decoder(data):
    if data is None:
        return None
    try:
        return json.loads(data.decode('utf-8'))
    except (ValueError, UnicodeDecodeError):
        return None

# Create Kinesis stream
kinesis_stream = KinesisUtils.createStream(
    ssc=ssc,
    kinesisAppName="python-kinesis-word-count",
    streamName="text-stream",
    endpointUrl="https://kinesis.us-west-2.amazonaws.com",
    regionName="us-west-2",
    initialPositionInStream=InitialPositionInStream.LATEST,
    checkpointInterval=30,
    metricsLevel=MetricsLevel.SUMMARY,
    storageLevel=StorageLevel.MEMORY_AND_DISK_2,
    decoder=json_decoder,
    stsAssumeRoleArn="arn:aws:iam::123456789012:role/KinesisRole",
    stsSessionName="python-session"
)

# Process the stream
word_counts = kinesis_stream \
    .filter(lambda record: record is not None and 'text' in record) \
    .map(lambda record: record['text']) \
    .flatMap(lambda text: text.split()) \
    .map(lambda word: (word.lower(), 1)) \
    .reduceByKey(lambda a, b: a + b)

word_counts.pprint()

# Start streaming
ssc.start()
ssc.awaitTermination()