Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management
—
The primary capability for creating Kinesis input streams in Spark Streaming applications. KinesisUtils provides multiple overloaded methods to accommodate different use cases and type requirements.
Create a stream that transforms Kinesis Records to a custom type using a message handler function.
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T
): ReceiverInputDStream[T]Parameters:
ssc - StreamingContext objectkinesisAppName - Kinesis application name used by KCL for DynamoDB coordinationstreamName - Kinesis stream nameendpointUrl - Kinesis service URL (e.g., "https://kinesis.us-east-1.amazonaws.com")regionName - AWS region name for DynamoDB and CloudWatchinitialPositionInStream - Starting position: TRIM_HORIZON or LATESTcheckpointInterval - Checkpoint frequency for fault tolerancestorageLevel - Storage level for received objects (recommended: MEMORY_AND_DISK_2)messageHandler - Function to transform Record to type TUsage Example:
import com.amazonaws.services.kinesis.model.Record
import org.json4s._
import org.json4s.jackson.JsonMethods._
// Custom message handler for JSON data
def jsonMessageHandler(record: Record): JValue = {
val data = new String(record.getData.array())
parse(data)
}
val jsonStream = KinesisUtils.createStream[JValue](
ssc,
"json-processor-app",
"json-events",
"https://kinesis.us-west-2.amazonaws.com",
"us-west-2",
InitialPositionInStream.LATEST,
Seconds(30),
StorageLevel.MEMORY_AND_DISK_2,
jsonMessageHandler
)Create a stream that returns raw byte arrays using the default message handler.
def createStream(
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]Usage Example:
val byteStream = KinesisUtils.createStream(
ssc,
"data-processor",
"raw-data-stream",
"https://kinesis.eu-west-1.amazonaws.com",
"eu-west-1",
InitialPositionInStream.TRIM_HORIZON,
Seconds(60),
StorageLevel.MEMORY_AND_DISK_2
)
// Convert bytes to strings
val stringStream = byteStream.map(new String(_))@deprecated("use other forms of createStream", "1.4.0")
def createStream(
ssc: StreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]This method uses the SparkConf app name as the Kinesis application name and extracts the region from the endpoint URL.
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass
);Usage Example:
import org.apache.spark.api.java.function.Function;
import com.amazonaws.services.kinesis.model.Record;
// Define message handler
Function<Record, String> messageHandler = new Function<Record, String>() {
@Override
public String call(Record record) throws Exception {
return new String(record.getData().array());
}
};
// Create stream
JavaReceiverInputDStream<String> stringStream = KinesisUtils.createStream(
jssc,
"java-kinesis-app",
"text-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2(),
messageHandler,
String.class
);public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel
);Usage Example:
JavaReceiverInputDStream<byte[]> byteStream = KinesisUtils.createStream(
jssc,
"java-byte-processor",
"binary-data-stream",
"https://kinesis.ap-southeast-1.amazonaws.com",
"ap-southeast-1",
InitialPositionInStream.TRIM_HORIZON,
Durations.seconds(45),
StorageLevel.MEMORY_AND_DISK_2()
);
// Convert to strings
JavaDStream<String> stringStream = byteStream.map(
bytes -> new String(bytes)
);// From AWS KCL
enum InitialPositionInStream {
LATEST, // Start from the most recent record
TRIM_HORIZON // Start from the oldest available record (up to 24 hours)
}import org.apache.spark.storage.StorageLevel
// Recommended storage levels
StorageLevel.MEMORY_AND_DISK_2 // Replicated in memory and disk (recommended)
StorageLevel.MEMORY_AND_DISK // Memory and disk fallback
StorageLevel.MEMORY_ONLY_2 // Memory only with replicationMEMORY_AND_DISK_2 is recommended for fault tolerance as it provides both memory performance and disk persistence with replication.
Choose checkpoint intervals based on your application requirements:
Common errors and their solutions:
IllegalArgumentException: Invalid region name
// Ensure region name is valid
val validRegions = Seq("us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1")AWS Authentication Errors: Ensure proper AWS credentials are configured
DynamoDB Access Errors: Ensure the application has proper permissions for DynamoDB table creation and access for checkpointing.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly