or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cloudwatch-metrics.mdiam-permissions.mdindex.mdstream-management.md
tile.json

stream-management.mddocs/

Stream Management

Core functionality for creating, configuring, and importing Kinesis data streams with support for encryption, capacity modes, and retention settings.

Capabilities

Stream Creation

Creates a new Kinesis data stream with configurable properties including shard count, retention period, and encryption.

/**
 * Creates a new Kinesis data stream
 * @param scope - The parent construct (usually 'this')
 * @param id - Construct identifier
 * @param props - Stream configuration properties
 */
class Stream extends StreamBase {
  constructor(scope: Construct, id: string, props?: StreamProps);
}

interface StreamProps {
  /** Enforces a particular physical stream name */
  readonly streamName?: string;
  /** The number of hours for data records to remain accessible (default: 24 hours) */
  readonly retentionPeriod?: Duration;
  /** The number of shards for the stream (default: 1, only for PROVISIONED mode) */
  readonly shardCount?: number;
  /** The kind of server-side encryption to apply */
  readonly encryption?: StreamEncryption;
  /** External KMS key to use for stream encryption */
  readonly encryptionKey?: kms.IKey;
  /** The capacity mode of this stream (default: PROVISIONED) */
  readonly streamMode?: StreamMode;
}

Usage Examples:

import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kms from '@aws-cdk/aws-kms';
import { Duration } from '@aws-cdk/core';

// Basic stream with default settings
const basicStream = new kinesis.Stream(this, 'BasicStream');

// Configured stream with custom properties
const configuredStream = new kinesis.Stream(this, 'ConfiguredStream', {
  streamName: 'my-application-events',
  shardCount: 4,
  retentionPeriod: Duration.hours(168), // 7 days
  streamMode: kinesis.StreamMode.PROVISIONED
});

// On-demand capacity stream
const onDemandStream = new kinesis.Stream(this, 'OnDemandStream', {
  streamName: 'auto-scaling-stream',
  streamMode: kinesis.StreamMode.ON_DEMAND,
  retentionPeriod: Duration.hours(24)
  // Note: shardCount cannot be specified with ON_DEMAND mode
});

Stream Import

Import existing Kinesis streams created outside of CDK for use in your CDK application.

/**
 * Import an existing Kinesis Stream by ARN
 * @param scope - The parent construct
 * @param id - Construct identifier
 * @param streamArn - ARN of the existing stream
 * @returns IStream interface for the imported stream
 */
static fromStreamArn(scope: Construct, id: string, streamArn: string): IStream;

/**
 * Import an existing stream with detailed attributes
 * @param scope - The parent construct
 * @param id - Construct identifier
 * @param attrs - Stream attributes including ARN and optional encryption key
 * @returns IStream interface for the imported stream
 */
static fromStreamAttributes(scope: Construct, id: string, attrs: StreamAttributes): IStream;

interface StreamAttributes {
  /** The ARN of the stream */
  readonly streamArn: string;
  /** The KMS key securing the contents if encryption is enabled */
  readonly encryptionKey?: kms.IKey;
}

Usage Examples:

// Import by ARN only
const importedStream = kinesis.Stream.fromStreamArn(
  this, 
  'ImportedStream',
  'arn:aws:kinesis:us-east-1:123456789012:stream/my-existing-stream'
);

// Import encrypted stream with KMS key
const importedEncryptedStream = kinesis.Stream.fromStreamAttributes(this, 'ImportedEncryptedStream', {
  streamArn: 'arn:aws:kinesis:us-east-1:123456789012:stream/my-encrypted-stream',
  encryptionKey: kms.Key.fromKeyArn(
    this, 
    'ImportedKey',
    'arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012'
  )
});

Encryption Configuration

Configure server-side encryption for Kinesis streams using various encryption options.

enum StreamEncryption {
  /** Records in the stream are not encrypted */
  UNENCRYPTED = 'NONE',
  /** Server-side encryption with a KMS key managed by the user */
  KMS = 'KMS',
  /** Server-side encryption with a master key managed by Amazon Kinesis */
  MANAGED = 'MANAGED'
}

Usage Examples:

// Unencrypted stream (not recommended for production)
const unencryptedStream = new kinesis.Stream(this, 'UnencryptedStream', {
  encryption: kinesis.StreamEncryption.UNENCRYPTED
});

// Managed encryption (uses AWS managed key)
const managedEncryptedStream = new kinesis.Stream(this, 'ManagedEncryptedStream', {
  encryption: kinesis.StreamEncryption.MANAGED
});

// User-managed KMS encryption (CDK creates key automatically)
const kmsEncryptedStream = new kinesis.Stream(this, 'KMSEncryptedStream', {
  encryption: kinesis.StreamEncryption.KMS
});

// User-managed KMS encryption with custom key
const customKey = new kms.Key(this, 'StreamKey', {
  description: 'Key for Kinesis stream encryption'
});

const customEncryptedStream = new kinesis.Stream(this, 'CustomEncryptedStream', {
  encryption: kinesis.StreamEncryption.KMS,
  encryptionKey: customKey
});

Capacity Modes

Configure stream capacity using provisioned or on-demand modes.

enum StreamMode {
  /** Specify the provisioned capacity mode with fixed shard count */
  PROVISIONED = 'PROVISIONED',
  /** Specify the on-demand capacity mode with automatic scaling */
  ON_DEMAND = 'ON_DEMAND'
}

Usage Examples:

// Provisioned mode with specific shard count
const provisionedStream = new kinesis.Stream(this, 'ProvisionedStream', {
  streamMode: kinesis.StreamMode.PROVISIONED,
  shardCount: 5
});

// On-demand mode (auto-scaling)
const onDemandStream = new kinesis.Stream(this, 'OnDemandStream', {
  streamMode: kinesis.StreamMode.ON_DEMAND
  // shardCount is not allowed with ON_DEMAND mode
});

Stream Properties

Access stream properties and attributes after creation or import.

interface IStream extends IResource {
  /** The ARN of the stream */
  readonly streamArn: string;
  /** The name of the stream */
  readonly streamName: string;
  /** Optional KMS encryption key associated with this stream */
  readonly encryptionKey?: kms.IKey;
}

Usage Examples:

const stream = new kinesis.Stream(this, 'MyStream', {
  streamName: 'my-app-stream'
});

// Access stream properties
console.log(`Stream ARN: ${stream.streamArn}`);
console.log(`Stream Name: ${stream.streamName}`);

// Use stream ARN in other resources
const policy = new iam.PolicyStatement({
  actions: ['kinesis:PutRecord'],
  resources: [stream.streamArn]
});

CloudFormation Resources

Low-Level Stream Resource

For advanced use cases requiring direct CloudFormation resource access.

/**
 * CloudFormation AWS::Kinesis::Stream resource
 */
class CfnStream extends CfnResource {
  constructor(scope: Construct, id: string, props?: CfnStreamProps);
  /** The ARN of the stream */
  readonly attrArn: string;
}

interface CfnStreamProps {
  /** Stream name */
  name?: string;
  /** Retention period in hours */
  retentionPeriodHours?: number;
  /** Number of shards */
  shardCount?: number;
  /** Stream encryption configuration */
  streamEncryption?: CfnStream.StreamEncryptionProperty;
  /** Stream mode configuration */
  streamModeDetails?: CfnStream.StreamModeDetailsProperty;
}

namespace CfnStream {
  interface StreamEncryptionProperty {
    /** Encryption type ('KMS' or 'NONE') */
    encryptionType: string;
    /** KMS key ID or ARN */
    keyId?: string;
  }

  interface StreamModeDetailsProperty {
    /** Stream mode ('PROVISIONED' or 'ON_DEMAND') */
    streamMode: string;
  }
}

Validation and Error Handling

Stream Configuration Validation

The Stream construct validates configuration properties and throws errors for invalid combinations.

Shard Count Validation:

  • shardCount can only be specified when streamMode is PROVISIONED (default)
  • shardCount cannot be specified with ON_DEMAND mode
  • Default shardCount is 1 for PROVISIONED mode
// Valid: Provisioned mode with shard count
const provisionedStream = new kinesis.Stream(this, 'ValidStream', {
  streamMode: kinesis.StreamMode.PROVISIONED,
  shardCount: 5
});

// Valid: On-demand mode (no shard count)
const onDemandStream = new kinesis.Stream(this, 'ValidOnDemand', {
  streamMode: kinesis.StreamMode.ON_DEMAND
});

// Error: Cannot specify shardCount with ON_DEMAND mode
// This will throw: "streamMode must be set to PROVISIONED (default) when specifying shardCount"
const invalidStream = new kinesis.Stream(this, 'InvalidStream', {
  streamMode: kinesis.StreamMode.ON_DEMAND,
  shardCount: 3 // This causes an error
});

Retention Period Validation:

  • Must be between 24 and 8760 hours (1 day to 365 days)
  • Default is 24 hours if not specified
// Valid retention periods
const validStream = new kinesis.Stream(this, 'ValidRetention', {
  retentionPeriod: Duration.hours(168) // 7 days
});

// Error: Retention period out of range
// This will throw: "retentionPeriod must be between 24 and 8760 hours. Received X"
const invalidRetention = new kinesis.Stream(this, 'InvalidRetention', {
  retentionPeriod: Duration.hours(12) // Less than 24 hours
});

Encryption Key Validation:

  • If encryptionKey is specified, encryption must be set to KMS
  • Throws error if encryption key provided with other encryption types
const customKey = new kms.Key(this, 'MyKey');

// Valid: KMS encryption with custom key
const validEncrypted = new kinesis.Stream(this, 'ValidEncrypted', {
  encryption: kinesis.StreamEncryption.KMS,
  encryptionKey: customKey
});

// Error: Encryption key specified without KMS encryption
// This will throw: "encryptionKey is specified, so 'encryption' must be set to KMS"
const invalidEncryption = new kinesis.Stream(this, 'InvalidEncryption', {
  encryption: kinesis.StreamEncryption.UNENCRYPTED,
  encryptionKey: customKey // This causes an error
});

Automatic Encryption Behavior

When no encryption properties are specified, AWS CDK automatically enables KMS encryption with AWS managed keys in supported regions.

Regional Encryption Defaults:

  • Supported regions: KMS encryption is enabled by default using alias/aws/kinesis managed key
  • Unsupported regions (cn-north-1, cn-northwest-1): Streams remain unencrypted by default
  • Automatic detection: CDK uses CloudFormation conditions to detect region capabilities
// Default behavior varies by region
const defaultStream = new kinesis.Stream(this, 'DefaultStream', {
  streamName: 'my-stream'
  // No encryption specified - behavior depends on region:
  // - Most regions: KMS encryption with AWS managed key
  // - China regions: Unencrypted
});

// Explicit unencrypted (not recommended for production)
const unencryptedStream = new kinesis.Stream(this, 'UnencryptedStream', {
  encryption: kinesis.StreamEncryption.UNENCRYPTED
});

// Explicit managed encryption (same as default in supported regions)
const managedEncryption = new kinesis.Stream(this, 'ManagedStream', {
  encryption: kinesis.StreamEncryption.MANAGED
});

Implementation Notes:

  • CDK creates a CloudFormation condition AwsCdkKinesisEncryptedStreamsUnsupportedRegions
  • The condition evaluates to true in unsupported regions (China)
  • Encryption is automatically applied using Fn::If intrinsic function