AWS CDK v1 construct library for Amazon Kinesis Data Streams with encryption, IAM permissions, and CloudWatch metrics
npx @tessl/cli install tessl/npm-aws-cdk--aws-kinesis@1.204.0AWS CDK Kinesis is a construct library for Amazon Kinesis Data Streams in AWS CDK v1. It provides high-level constructs for creating and managing Kinesis streams with features including configurable shard counts, retention periods, server-side encryption with KMS keys, and comprehensive IAM permission management.
npm install @aws-cdk/aws-kinesisimport * as kinesis from '@aws-cdk/aws-kinesis';
import { Stream, StreamEncryption, StreamMode, CfnStream } from '@aws-cdk/aws-kinesis';
import { CfnTag } from '@aws-cdk/core';For CommonJS:
const kinesis = require('@aws-cdk/aws-kinesis');
const { Stream, StreamEncryption, StreamMode } = require('@aws-cdk/aws-kinesis');import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kms from '@aws-cdk/aws-kms';
import * as iam from '@aws-cdk/aws-iam';
import { Duration } from '@aws-cdk/core';
// Create a basic stream
const stream = new kinesis.Stream(this, 'MyStream', {
streamName: 'my-data-stream',
shardCount: 2,
retentionPeriod: Duration.hours(48)
});
// Create an encrypted stream
const encryptedStream = new kinesis.Stream(this, 'MyEncryptedStream', {
streamName: 'my-encrypted-stream',
encryption: kinesis.StreamEncryption.KMS,
encryptionKey: new kms.Key(this, 'StreamKey')
});
// Create a role for Lambda function
const role = new iam.Role(this, 'StreamConsumerRole', {
assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com')
});
// Grant permissions to a role
stream.grantReadWrite(role);
// Create CloudWatch alarms
const alarm = stream.metricIncomingRecords().createAlarm(this, 'HighVolumeAlarm', {
threshold: 1000,
evaluationPeriods: 2
});AWS CDK Kinesis is built around several key components:
Stream class and low-level CfnStream CloudFormation resourceCore functionality for creating, configuring, and importing Kinesis data streams with support for encryption, capacity modes, and retention settings.
class Stream extends StreamBase {
constructor(scope: Construct, id: string, props?: StreamProps);
static fromStreamArn(scope: Construct, id: string, streamArn: string): IStream;
static fromStreamAttributes(scope: Construct, id: string, attrs: StreamAttributes): IStream;
}
interface StreamProps {
streamName?: string;
retentionPeriod?: Duration;
shardCount?: number;
encryption?: StreamEncryption;
encryptionKey?: kms.IKey;
streamMode?: StreamMode;
}Permission management system providing granular access control for Kinesis streams with automatic KMS key permissions when encryption is enabled.
interface IStream {
grantRead(grantee: iam.IGrantable): iam.Grant;
grantWrite(grantee: iam.IGrantable): iam.Grant;
grantReadWrite(grantee: iam.IGrantable): iam.Grant;
grant(grantee: iam.IGrantable, ...actions: string[]): iam.Grant;
}Comprehensive CloudWatch metrics integration with 20+ pre-defined stream metrics for monitoring performance, throughput, and errors.
interface IStream {
metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricGetRecordsBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricGetRecordsIteratorAgeMilliseconds(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricGetRecordsLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricGetRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricGetRecordsSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordsBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordsLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordsSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordsTotalRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordsSuccessfulRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordsFailedRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricPutRecordsThrottledRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricReadProvisionedThroughputExceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
metricWriteProvisionedThroughputExceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
}interface IStream extends IResource {
readonly streamArn: string;
readonly streamName: string;
readonly encryptionKey?: kms.IKey;
}
interface StreamAttributes {
readonly streamArn: string;
readonly encryptionKey?: kms.IKey;
}enum StreamEncryption {
UNENCRYPTED = 'NONE',
KMS = 'KMS',
MANAGED = 'MANAGED'
}
enum StreamMode {
PROVISIONED = 'PROVISIONED',
ON_DEMAND = 'ON_DEMAND'
}Low-level CloudFormation resource for direct AWS::Kinesis::Stream access.
class CfnStream extends CfnResource {
constructor(scope: Construct, id: string, props?: CfnStreamProps);
/** The ARN of the stream */
readonly attrArn: string;
/** Reference to the stream resource */
readonly ref: string;
}
interface CfnStreamProps {
/** Stream name */
name?: string;
/** Retention period in hours (24-8760) */
retentionPeriodHours?: number;
/** Number of shards (only for PROVISIONED mode) */
shardCount?: number;
/** Stream encryption configuration */
streamEncryption?: CfnStream.StreamEncryptionProperty;
/** Stream mode configuration */
streamModeDetails?: CfnStream.StreamModeDetailsProperty;
/** Tags for the stream */
tags?: CfnTag[];
}
namespace CfnStream {
interface StreamEncryptionProperty {
/** Encryption type: 'KMS' or 'NONE' */
encryptionType: string;
/** KMS key ID or ARN */
keyId?: string;
}
interface StreamModeDetailsProperty {
/** Stream mode: 'PROVISIONED' or 'ON_DEMAND' */
streamMode: string;
}
}