Core functionality for creating, configuring, and importing Kinesis data streams with support for encryption, capacity modes, and retention settings.
Creates a new Kinesis data stream with configurable properties including shard count, retention period, and encryption.
/**
* Creates a new Kinesis data stream
* @param scope - The parent construct (usually 'this')
* @param id - Construct identifier
* @param props - Stream configuration properties
*/
class Stream extends StreamBase {
constructor(scope: Construct, id: string, props?: StreamProps);
}
interface StreamProps {
/** Enforces a particular physical stream name */
readonly streamName?: string;
/** The number of hours for data records to remain accessible (default: 24 hours) */
readonly retentionPeriod?: Duration;
/** The number of shards for the stream (default: 1, only for PROVISIONED mode) */
readonly shardCount?: number;
/** The kind of server-side encryption to apply */
readonly encryption?: StreamEncryption;
/** External KMS key to use for stream encryption */
readonly encryptionKey?: kms.IKey;
/** The capacity mode of this stream (default: PROVISIONED) */
readonly streamMode?: StreamMode;
}Usage Examples:
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kms from '@aws-cdk/aws-kms';
import { Duration } from '@aws-cdk/core';
// Basic stream with default settings
const basicStream = new kinesis.Stream(this, 'BasicStream');
// Configured stream with custom properties
const configuredStream = new kinesis.Stream(this, 'ConfiguredStream', {
streamName: 'my-application-events',
shardCount: 4,
retentionPeriod: Duration.hours(168), // 7 days
streamMode: kinesis.StreamMode.PROVISIONED
});
// On-demand capacity stream
const onDemandStream = new kinesis.Stream(this, 'OnDemandStream', {
streamName: 'auto-scaling-stream',
streamMode: kinesis.StreamMode.ON_DEMAND,
retentionPeriod: Duration.hours(24)
// Note: shardCount cannot be specified with ON_DEMAND mode
});Import existing Kinesis streams created outside of CDK for use in your CDK application.
/**
* Import an existing Kinesis Stream by ARN
* @param scope - The parent construct
* @param id - Construct identifier
* @param streamArn - ARN of the existing stream
* @returns IStream interface for the imported stream
*/
static fromStreamArn(scope: Construct, id: string, streamArn: string): IStream;
/**
* Import an existing stream with detailed attributes
* @param scope - The parent construct
* @param id - Construct identifier
* @param attrs - Stream attributes including ARN and optional encryption key
* @returns IStream interface for the imported stream
*/
static fromStreamAttributes(scope: Construct, id: string, attrs: StreamAttributes): IStream;
interface StreamAttributes {
/** The ARN of the stream */
readonly streamArn: string;
/** The KMS key securing the contents if encryption is enabled */
readonly encryptionKey?: kms.IKey;
}Usage Examples:
// Import by ARN only
const importedStream = kinesis.Stream.fromStreamArn(
this,
'ImportedStream',
'arn:aws:kinesis:us-east-1:123456789012:stream/my-existing-stream'
);
// Import encrypted stream with KMS key
const importedEncryptedStream = kinesis.Stream.fromStreamAttributes(this, 'ImportedEncryptedStream', {
streamArn: 'arn:aws:kinesis:us-east-1:123456789012:stream/my-encrypted-stream',
encryptionKey: kms.Key.fromKeyArn(
this,
'ImportedKey',
'arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012'
)
});Configure server-side encryption for Kinesis streams using various encryption options.
enum StreamEncryption {
/** Records in the stream are not encrypted */
UNENCRYPTED = 'NONE',
/** Server-side encryption with a KMS key managed by the user */
KMS = 'KMS',
/** Server-side encryption with a master key managed by Amazon Kinesis */
MANAGED = 'MANAGED'
}Usage Examples:
// Unencrypted stream (not recommended for production)
const unencryptedStream = new kinesis.Stream(this, 'UnencryptedStream', {
encryption: kinesis.StreamEncryption.UNENCRYPTED
});
// Managed encryption (uses AWS managed key)
const managedEncryptedStream = new kinesis.Stream(this, 'ManagedEncryptedStream', {
encryption: kinesis.StreamEncryption.MANAGED
});
// User-managed KMS encryption (CDK creates key automatically)
const kmsEncryptedStream = new kinesis.Stream(this, 'KMSEncryptedStream', {
encryption: kinesis.StreamEncryption.KMS
});
// User-managed KMS encryption with custom key
const customKey = new kms.Key(this, 'StreamKey', {
description: 'Key for Kinesis stream encryption'
});
const customEncryptedStream = new kinesis.Stream(this, 'CustomEncryptedStream', {
encryption: kinesis.StreamEncryption.KMS,
encryptionKey: customKey
});Configure stream capacity using provisioned or on-demand modes.
enum StreamMode {
/** Specify the provisioned capacity mode with fixed shard count */
PROVISIONED = 'PROVISIONED',
/** Specify the on-demand capacity mode with automatic scaling */
ON_DEMAND = 'ON_DEMAND'
}Usage Examples:
// Provisioned mode with specific shard count
const provisionedStream = new kinesis.Stream(this, 'ProvisionedStream', {
streamMode: kinesis.StreamMode.PROVISIONED,
shardCount: 5
});
// On-demand mode (auto-scaling)
const onDemandStream = new kinesis.Stream(this, 'OnDemandStream', {
streamMode: kinesis.StreamMode.ON_DEMAND
// shardCount is not allowed with ON_DEMAND mode
});Access stream properties and attributes after creation or import.
interface IStream extends IResource {
/** The ARN of the stream */
readonly streamArn: string;
/** The name of the stream */
readonly streamName: string;
/** Optional KMS encryption key associated with this stream */
readonly encryptionKey?: kms.IKey;
}Usage Examples:
const stream = new kinesis.Stream(this, 'MyStream', {
streamName: 'my-app-stream'
});
// Access stream properties
console.log(`Stream ARN: ${stream.streamArn}`);
console.log(`Stream Name: ${stream.streamName}`);
// Use stream ARN in other resources
const policy = new iam.PolicyStatement({
actions: ['kinesis:PutRecord'],
resources: [stream.streamArn]
});For advanced use cases requiring direct CloudFormation resource access.
/**
* CloudFormation AWS::Kinesis::Stream resource
*/
class CfnStream extends CfnResource {
constructor(scope: Construct, id: string, props?: CfnStreamProps);
/** The ARN of the stream */
readonly attrArn: string;
}
interface CfnStreamProps {
/** Stream name */
name?: string;
/** Retention period in hours */
retentionPeriodHours?: number;
/** Number of shards */
shardCount?: number;
/** Stream encryption configuration */
streamEncryption?: CfnStream.StreamEncryptionProperty;
/** Stream mode configuration */
streamModeDetails?: CfnStream.StreamModeDetailsProperty;
}
namespace CfnStream {
interface StreamEncryptionProperty {
/** Encryption type ('KMS' or 'NONE') */
encryptionType: string;
/** KMS key ID or ARN */
keyId?: string;
}
interface StreamModeDetailsProperty {
/** Stream mode ('PROVISIONED' or 'ON_DEMAND') */
streamMode: string;
}
}The Stream construct validates configuration properties and throws errors for invalid combinations.
Shard Count Validation:
shardCount can only be specified when streamMode is PROVISIONED (default)shardCount cannot be specified with ON_DEMAND modeshardCount is 1 for PROVISIONED mode// Valid: Provisioned mode with shard count
const provisionedStream = new kinesis.Stream(this, 'ValidStream', {
streamMode: kinesis.StreamMode.PROVISIONED,
shardCount: 5
});
// Valid: On-demand mode (no shard count)
const onDemandStream = new kinesis.Stream(this, 'ValidOnDemand', {
streamMode: kinesis.StreamMode.ON_DEMAND
});
// Error: Cannot specify shardCount with ON_DEMAND mode
// This will throw: "streamMode must be set to PROVISIONED (default) when specifying shardCount"
const invalidStream = new kinesis.Stream(this, 'InvalidStream', {
streamMode: kinesis.StreamMode.ON_DEMAND,
shardCount: 3 // This causes an error
});Retention Period Validation:
// Valid retention periods
const validStream = new kinesis.Stream(this, 'ValidRetention', {
retentionPeriod: Duration.hours(168) // 7 days
});
// Error: Retention period out of range
// This will throw: "retentionPeriod must be between 24 and 8760 hours. Received X"
const invalidRetention = new kinesis.Stream(this, 'InvalidRetention', {
retentionPeriod: Duration.hours(12) // Less than 24 hours
});Encryption Key Validation:
encryptionKey is specified, encryption must be set to KMSconst customKey = new kms.Key(this, 'MyKey');
// Valid: KMS encryption with custom key
const validEncrypted = new kinesis.Stream(this, 'ValidEncrypted', {
encryption: kinesis.StreamEncryption.KMS,
encryptionKey: customKey
});
// Error: Encryption key specified without KMS encryption
// This will throw: "encryptionKey is specified, so 'encryption' must be set to KMS"
const invalidEncryption = new kinesis.Stream(this, 'InvalidEncryption', {
encryption: kinesis.StreamEncryption.UNENCRYPTED,
encryptionKey: customKey // This causes an error
});When no encryption properties are specified, AWS CDK automatically enables KMS encryption with AWS managed keys in supported regions.
Regional Encryption Defaults:
alias/aws/kinesis managed keycn-north-1, cn-northwest-1): Streams remain unencrypted by default// Default behavior varies by region
const defaultStream = new kinesis.Stream(this, 'DefaultStream', {
streamName: 'my-stream'
// No encryption specified - behavior depends on region:
// - Most regions: KMS encryption with AWS managed key
// - China regions: Unencrypted
});
// Explicit unencrypted (not recommended for production)
const unencryptedStream = new kinesis.Stream(this, 'UnencryptedStream', {
encryption: kinesis.StreamEncryption.UNENCRYPTED
});
// Explicit managed encryption (same as default in supported regions)
const managedEncryption = new kinesis.Stream(this, 'ManagedStream', {
encryption: kinesis.StreamEncryption.MANAGED
});Implementation Notes:
AwsCdkKinesisEncryptedStreamsUnsupportedRegionstrue in unsupported regions (China)Fn::If intrinsic function