Comprehensive CloudWatch metrics integration with 20+ pre-defined stream metrics for monitoring performance, throughput, and errors.
Create custom CloudWatch metrics for Kinesis streams with specific metric names and properties.
/**
* Return stream metric based on its metric name
* @param metricName - Name of the stream metric
* @param props - Optional properties for the metric configuration
* @returns CloudWatch Metric instance
*/
metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric;Usage Examples:
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
const stream = new kinesis.Stream(this, 'MyStream');
// Create custom metric
const customMetric = stream.metric('GetRecords.Success', {
statistic: 'Average',
period: Duration.minutes(5)
});
// Create alarm from custom metric
customMetric.createAlarm(this, 'GetRecordsAlarm', {
threshold: 90,
evaluationPeriods: 2,
comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_THRESHOLD
});Monitor incoming data flow to Kinesis streams including bytes and record counts.
/**
* The number of bytes successfully put to the stream over the specified time period
* Includes bytes from PutRecord and PutRecords operations
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of records successfully put to the stream over the specified time period
* Includes record counts from PutRecord and PutRecords operations
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;Usage Examples:
const stream = new kinesis.Stream(this, 'MyStream');
// Monitor incoming bytes with custom configuration
const incomingBytesMetric = stream.metricIncomingBytes({
statistic: 'Sum',
period: Duration.minutes(1)
});
// Create dashboard widget
new cloudwatch.GraphWidget({
title: 'Stream Throughput',
left: [incomingBytesMetric],
right: [stream.metricIncomingRecords()]
});
// Create high volume alarm
stream.metricIncomingRecords().createAlarm(this, 'HighVolumeAlarm', {
threshold: 10000,
evaluationPeriods: 3,
comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
});Monitor single record put operations including success rates, latency, and data volume.
/**
* The number of bytes put to the stream using PutRecord operation
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The time taken per PutRecord operation
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of successful PutRecord operations per stream
* Average reflects the percentage of successful writes
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric;Usage Examples:
const stream = new kinesis.Stream(this, 'MyStream');
// Monitor PutRecord performance
const putRecordLatency = stream.metricPutRecordLatency({
statistic: 'Average'
});
const putRecordSuccess = stream.metricPutRecordSuccess({
statistic: 'Average'
});
// Create composite alarm for PutRecord health
const putRecordAlarm = new cloudwatch.CompositeAlarm(this, 'PutRecordHealthAlarm', {
alarmRule: cloudwatch.AlarmRule.anyOf(
putRecordLatency.createAlarm(this, 'HighLatencyAlarm', {
threshold: 1000, // milliseconds
comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
}),
putRecordSuccess.createAlarm(this, 'LowSuccessAlarm', {
threshold: 95, // percentage
comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_THRESHOLD
})
)
});Monitor batch record put operations including success rates, latency, and failure analysis.
/**
* The number of bytes put to the stream using PutRecords operation
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordsBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The time taken per PutRecords operation
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordsLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of PutRecords operations where at least one record succeeded
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordsSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The total number of records sent in a PutRecords operation
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordsTotalRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of successful records in a PutRecords operation
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordsSuccessfulRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of records rejected due to internal failures in PutRecords operation
* Occasional internal failures are expected and should be retried
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordsFailedRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of records rejected due to throttling in PutRecords operation
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricPutRecordsThrottledRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;Usage Examples:
const stream = new kinesis.Stream(this, 'MyStream');
// Comprehensive PutRecords monitoring
const totalRecords = stream.metricPutRecordsTotalRecords();
const successfulRecords = stream.metricPutRecordsSuccessfulRecords();
const failedRecords = stream.metricPutRecordsFailedRecords();
const throttledRecords = stream.metricPutRecordsThrottledRecords();
// Create success rate metric
const successRate = new cloudwatch.MathExpression({
expression: '(successful / total) * 100',
usingMetrics: {
successful: successfulRecords,
total: totalRecords
}
});
// Monitor batch operation health
const batchHealthDashboard = new cloudwatch.Dashboard(this, 'BatchHealthDashboard', {
widgets: [
[
new cloudwatch.GraphWidget({
title: 'PutRecords Success Rate',
left: [successRate],
leftYAxis: { min: 0, max: 100 }
})
],
[
new cloudwatch.GraphWidget({
title: 'PutRecords Failures',
left: [failedRecords],
right: [throttledRecords]
})
]
]
});Monitor data consumption from Kinesis streams including throughput, latency, and iterator age.
/**
* The number of bytes retrieved from the stream
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricGetRecordsBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of records retrieved from the shard
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricGetRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The time taken per GetRecords operation
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricGetRecordsLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of successful GetRecords operations per stream
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricGetRecordsSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The age of the last record in all GetRecords calls
* Age is the difference between current time and when the last record was written
* A value of zero indicates records are completely caught up with the stream
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default maximum over 5 minutes
*/
metricGetRecordsIteratorAgeMilliseconds(props?: cloudwatch.MetricOptions): cloudwatch.Metric;Usage Examples:
const stream = new kinesis.Stream(this, 'MyStream');
// Monitor consumer lag
const iteratorAge = stream.metricGetRecordsIteratorAgeMilliseconds({
statistic: 'Maximum'
});
// Create lag alarm
const lagAlarm = iteratorAge.createAlarm(this, 'ConsumerLagAlarm', {
threshold: 60000, // 1 minute in milliseconds
evaluationPeriods: 3,
comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
alarmDescription: 'Consumer is falling behind stream'
});
// Monitor consumption efficiency
const getRecordsSuccess = stream.metricGetRecordsSuccess();
const getRecordsLatency = stream.metricGetRecordsLatency();
// Create consumer performance dashboard
const consumerDashboard = new cloudwatch.Dashboard(this, 'ConsumerDashboard', {
widgets: [
[
new cloudwatch.GraphWidget({
title: 'Consumer Performance',
left: [getRecordsSuccess],
right: [getRecordsLatency]
})
],
[
new cloudwatch.SingleValueWidget({
title: 'Iterator Age (ms)',
metrics: [iteratorAge],
period: Duration.minutes(5)
})
]
]
});Monitor throttling events for both read and write operations to identify capacity issues.
/**
* The number of GetRecords calls throttled for the stream
* When Minimum statistic is 1, all records were throttled during the time period
* When Maximum statistic is 0, no records were throttled during the time period
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricReadProvisionedThroughputExceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
/**
* The number of records rejected due to throttling for the stream
* Includes throttling from PutRecord and PutRecords operations
* When Minimum statistic is non-zero, records were being throttled during the time period
* When Maximum statistic is 0, no records were being throttled during the time period
* @param props - Optional metric configuration properties
* @returns CloudWatch Metric with default average over 5 minutes
*/
metricWriteProvisionedThroughputExceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric;Usage Examples:
const stream = new kinesis.Stream(this, 'MyStream', {
streamMode: kinesis.StreamMode.PROVISIONED,
shardCount: 2
});
// Monitor throttling events
const readThrottling = stream.metricReadProvisionedThroughputExceeded({
statistic: 'Sum'
});
const writeThrottling = stream.metricWriteProvisionedThroughputExceeded({
statistic: 'Sum'
});
// Create throttling alarms
const readThrottleAlarm = readThrottling.createAlarm(this, 'ReadThrottleAlarm', {
threshold: 0,
evaluationPeriods: 2,
comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
alarmDescription: 'Read operations are being throttled'
});
const writeThrottleAlarm = writeThrottling.createAlarm(this, 'WriteThrottleAlarm', {
threshold: 0,
evaluationPeriods: 2,
comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
alarmDescription: 'Write operations are being throttled'
});
// Auto-scaling based on throttling
const scalingPolicy = new applicationautoscaling.TargetTrackingScalingPolicy(this, 'ScalingPolicy', {
// Configure auto-scaling based on throttling metrics
targetValue: 0,
customMetric: writeThrottling
});Configure metric properties for customized monitoring and alerting.
interface cloudwatch.MetricOptions {
/** The AWS account ID of the metric */
account?: string;
/** The AWS region of the metric */
region?: string;
/** The statistic to apply (Average, Sum, Maximum, Minimum, SampleCount) */
statistic?: string;
/** The period over which the metric is aggregated */
period?: Duration;
/** Additional dimensions for the metric */
dimensionsMap?: { [key: string]: string };
/** Label for the metric */
label?: string;
/** Color for the metric when displayed */
color?: string;
}Usage Examples:
const stream = new kinesis.Stream(this, 'MyStream');
// Customize metric properties
const customMetric = stream.metricIncomingRecords({
statistic: 'Sum',
period: Duration.minutes(1),
label: 'Records/minute',
color: cloudwatch.Color.BLUE
});
// Regional metric
const regionalMetric = stream.metricGetRecordsSuccess({
region: 'us-west-2',
account: '123456789012'
});
// Multi-dimensional metric
const dimensionalMetric = stream.metric('CustomMetric', {
dimensionsMap: {
StreamName: stream.streamName,
Environment: 'Production',
Application: 'DataProcessor'
}
});Create comprehensive monitoring dashboards with Kinesis stream metrics.
const stream = new kinesis.Stream(this, 'MyStream');
// Create comprehensive monitoring dashboard
const dashboard = new cloudwatch.Dashboard(this, 'KinesisDashboard', {
dashboardName: 'Kinesis-Stream-Monitoring',
widgets: [
[
new cloudwatch.GraphWidget({
title: 'Throughput',
left: [
stream.metricIncomingRecords({ label: 'Incoming Records' }),
stream.metricGetRecords({ label: 'Retrieved Records' })
],
period: Duration.minutes(5),
width: 12
})
],
[
new cloudwatch.GraphWidget({
title: 'Latency',
left: [
stream.metricPutRecordLatency({ label: 'PutRecord Latency' }),
stream.metricPutRecordsLatency({ label: 'PutRecords Latency' }),
stream.metricGetRecordsLatency({ label: 'GetRecords Latency' })
],
period: Duration.minutes(5),
width: 12
})
],
[
new cloudwatch.SingleValueWidget({
title: 'Iterator Age',
metrics: [stream.metricGetRecordsIteratorAgeMilliseconds()],
width: 6
}),
new cloudwatch.SingleValueWidget({
title: 'Success Rate',
metrics: [stream.metricPutRecordSuccess()],
width: 6
})
]
]
});