Stream log events to external destinations including Lambda functions, Kinesis streams, and cross-account destinations with automatic IAM role management.
Forward log events from a log group to external destinations like Lambda functions, Kinesis streams, or other AWS services using pattern-based filtering.
/**
* Forward log events to external destinations based on filter patterns
*/
class SubscriptionFilter extends Construct {
/**
* Create a subscription filter
* @param scope - Construct scope
* @param id - Construct ID
* @param props - SubscriptionFilter configuration properties
*/
constructor(scope: Construct, id: string, props: SubscriptionFilterProps);
}
/**
* Properties for SubscriptionFilter
*/
interface SubscriptionFilterProps {
/** Source log group to forward events from */
readonly logGroup: ILogGroup;
/** Destination to forward log events to */
readonly destination: ILogSubscriptionDestination;
/** Pattern to match log events for forwarding */
readonly filterPattern: IFilterPattern;
}Usage Examples:
import * as logs from '@aws-cdk/aws-logs';
import * as lambda from '@aws-cdk/aws-lambda';
import * as kinesis from '@aws-cdk/aws-kinesis';
const logGroup = new logs.LogGroup(this, 'MyLogGroup');
// Forward to Lambda function
const processingFunction = new lambda.Function(this, 'LogProcessor', {
runtime: lambda.Runtime.NODEJS_14_X,
handler: 'index.handler',
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
console.log('Processing logs:', JSON.stringify(event, null, 2));
};
`),
});
const lambdaSubscription = new logs.SubscriptionFilter(this, 'LambdaSubscription', {
logGroup: logGroup,
destination: new logs.destinations.LambdaDestination(processingFunction),
filterPattern: logs.FilterPattern.literal('ERROR'),
});
// Forward to Kinesis stream
const logStream = new kinesis.Stream(this, 'LogStream', {
shardCount: 1,
});
const kinesisSubscription = new logs.SubscriptionFilter(this, 'KinesisSubscription', {
logGroup: logGroup,
destination: new logs.destinations.KinesisDestination(logStream),
filterPattern: logs.FilterPattern.allEvents(),
});Enable cross-account log forwarding by creating a destination that can receive log events from other AWS accounts with proper IAM policies.
/**
* Create a cross-account destination for receiving log events
*/
class CrossAccountDestination extends Construct implements ILogSubscriptionDestination {
/**
* Create a cross-account destination
* @param scope - Construct scope
* @param id - Construct ID
* @param props - CrossAccountDestination configuration properties
*/
constructor(scope: Construct, id: string, props: CrossAccountDestinationProps);
/** IAM policy document for the destination */
readonly policyDocument: iam.PolicyDocument;
/** Name of the destination */
readonly destinationName: string;
/** ARN of the destination */
readonly destinationArn: string;
/**
* Add a policy statement to allow access from external accounts
* @param statement - IAM policy statement to add
*/
addToPolicy(statement: iam.PolicyStatement): void;
/**
* Configure this destination for use with a subscription filter
* @param scope - Construct scope
* @param sourceLogGroup - Source log group
* @returns Destination configuration
*/
bind(scope: Construct, sourceLogGroup: ILogGroup): LogSubscriptionDestinationConfig;
}
/**
* Properties for CrossAccountDestination
*/
interface CrossAccountDestinationProps {
/** Name of the destination */
readonly destinationName?: string;
/** IAM role for writing to the target service */
readonly role: iam.IRole;
/** ARN of the target service (Kinesis stream, Lambda function, etc.) */
readonly targetArn: string;
}Usage Examples:
import * as logs from '@aws-cdk/aws-logs';
import * as iam from '@aws-cdk/aws-iam';
import * as kinesis from '@aws-cdk/aws-kinesis';
// Create target Kinesis stream
const centralLogStream = new kinesis.Stream(this, 'CentralLogStream', {
shardCount: 2,
});
// Create IAM role for writing to Kinesis
const destinationRole = new iam.Role(this, 'DestinationRole', {
assumedBy: new iam.ServicePrincipal('logs.amazonaws.com'),
inlinePolicies: {
KinesisWritePolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ['kinesis:PutRecord', 'kinesis:PutRecords'],
resources: [centralLogStream.streamArn],
}),
],
}),
},
});
// Create cross-account destination
const crossAccountDestination = new logs.CrossAccountDestination(this, 'LogDestination', {
destinationName: 'CentralLoggingDestination',
role: destinationRole,
targetArn: centralLogStream.streamArn,
});
// Allow specific accounts to use this destination
crossAccountDestination.addToPolicy(new iam.PolicyStatement({
principals: [
new iam.AccountPrincipal('123456789012'),
new iam.AccountPrincipal('210987654321'),
],
actions: ['logs:PutSubscriptionFilter'],
resources: [crossAccountDestination.destinationArn],
}));
// Use destination in subscription filter (can be in different account)
const subscription = new logs.SubscriptionFilter(this, 'CrossAccountSubscription', {
logGroup: someLogGroup,
destination: crossAccountDestination,
filterPattern: logs.FilterPattern.allEvents(),
});Base interface for implementing custom log destinations that can receive log events from subscription filters.
/**
* Interface for log subscription destinations
*/
interface ILogSubscriptionDestination {
/**
* Configure this destination for use with a subscription filter
* @param scope - Construct scope
* @param sourceLogGroup - Source log group
* @returns Destination configuration
*/
bind(scope: Construct, sourceLogGroup: ILogGroup): LogSubscriptionDestinationConfig;
}
/**
* Configuration for a log subscription destination
*/
interface LogSubscriptionDestinationConfig {
/** ARN of the destination */
readonly arn: string;
/** IAM role for writing to the destination */
readonly role?: iam.IRole;
}Forward log events to Lambda functions for real-time processing, alerting, or custom analytics.
import { LambdaDestination } from '@aws-cdk/aws-logs-destinations';
const processingFunction = new lambda.Function(this, 'LogProcessor', {
runtime: lambda.Runtime.PYTHON_3_9,
handler: 'index.handler',
code: lambda.Code.fromInline(`
import json
import base64
import gzip
def handler(event, context):
# Decode and decompress log data
compressed_payload = base64.b64decode(event['awslogs']['data'])
uncompressed_payload = gzip.decompress(compressed_payload)
log_data = json.loads(uncompressed_payload)
for log_event in log_data['logEvents']:
print(f"Log: {log_event['message']}")
return {'statusCode': 200}
`),
});
const lambdaDestination = new LambdaDestination(processingFunction, {
addPermissions: true, // Automatically add CloudWatch Logs permissions
});
new logs.SubscriptionFilter(this, 'LambdaSubscription', {
logGroup: logGroup,
destination: lambdaDestination,
filterPattern: logs.FilterPattern.literal('ERROR'),
});Stream log events to Kinesis Data Streams for scalable real-time analytics and processing.
import { KinesisDestination } from '@aws-cdk/aws-logs-destinations';
const analyticsStream = new kinesis.Stream(this, 'AnalyticsStream', {
shardCount: 3,
retentionPeriod: cdk.Duration.hours(48),
});
const kinesisDestination = new KinesisDestination(analyticsStream);
new logs.SubscriptionFilter(this, 'KinesisSubscription', {
logGroup: logGroup,
destination: kinesisDestination,
filterPattern: logs.FilterPattern.exists('$.requestId'),
});Forward log events to Kinesis Data Firehose for delivery to S3, Elasticsearch, or other analytics services.
import { KinesisFirehoseDestination } from '@aws-cdk/aws-logs-destinations';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
const deliveryStream = new firehose.DeliveryStream(this, 'LogDeliveryStream', {
destinations: [
new firehose.destinations.S3Bucket(logsBucket, {
compression: firehose.Compression.GZIP,
prefix: 'application-logs/',
}),
],
});
const firehoseDestination = new KinesisFirehoseDestination(deliveryStream);
new logs.SubscriptionFilter(this, 'FirehoseSubscription', {
logGroup: logGroup,
destination: firehoseDestination,
filterPattern: logs.FilterPattern.allEvents(),
});const logGroup = new logs.LogGroup(this, 'ApplicationLogs');
// Send errors to Lambda for immediate processing
const errorProcessor = new lambda.Function(this, 'ErrorProcessor', {
runtime: lambda.Runtime.NODEJS_14_X,
handler: 'error-handler.process',
code: lambda.Code.fromAsset('lambda/error-processor'),
});
new logs.SubscriptionFilter(this, 'ErrorSubscription', {
logGroup: logGroup,
destination: new LambdaDestination(errorProcessor),
filterPattern: logs.FilterPattern.literal('ERROR'),
});
// Send all logs to Kinesis for analytics
const analyticsStream = new kinesis.Stream(this, 'AnalyticsStream');
new logs.SubscriptionFilter(this, 'AnalyticsSubscription', {
logGroup: logGroup,
destination: new KinesisDestination(analyticsStream),
filterPattern: logs.FilterPattern.allEvents(),
});// Central logging account setup
class CentralLoggingStack extends cdk.Stack {
public readonly logDestination: logs.CrossAccountDestination;
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
// Central log storage
const centralStream = new kinesis.Stream(this, 'CentralLogStream', {
shardCount: 5,
});
// IAM role for log forwarding
const logForwardingRole = new iam.Role(this, 'LogForwardingRole', {
assumedBy: new iam.ServicePrincipal('logs.amazonaws.com'),
inlinePolicies: {
KinesisWrite: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ['kinesis:PutRecord', 'kinesis:PutRecords'],
resources: [centralStream.streamArn],
}),
],
}),
},
});
// Cross-account destination
this.logDestination = new logs.CrossAccountDestination(this, 'CentralDestination', {
role: logForwardingRole,
targetArn: centralStream.streamArn,
});
// Allow all organization accounts
this.logDestination.addToPolicy(new iam.PolicyStatement({
principals: [new iam.OrganizationPrincipal('o-example12345')],
actions: ['logs:PutSubscriptionFilter'],
resources: [this.logDestination.destinationArn],
}));
}
}
// Application account usage
class ApplicationStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
const appLogGroup = new logs.LogGroup(this, 'AppLogs');
// Import cross-account destination
const centralDestination = logs.CrossAccountDestination.fromCrossAccountDestinationArn(
this,
'CentralDestination',
'arn:aws:logs:us-east-1:CENTRAL-ACCOUNT:destination:CentralDestination'
);
// Forward logs to central account
new logs.SubscriptionFilter(this, 'CentralSubscription', {
logGroup: appLogGroup,
destination: centralDestination,
filterPattern: logs.FilterPattern.allEvents(),
});
}
}// Multi-stage log processing
const logGroup = new logs.LogGroup(this, 'ServiceLogs');
// Stage 1: Real-time alerting for critical errors
const alertFunction = new lambda.Function(this, 'AlertFunction', {
runtime: lambda.Runtime.PYTHON_3_9,
handler: 'alerts.handler',
code: lambda.Code.fromAsset('lambda/alerts'),
environment: {
SLACK_WEBHOOK_URL: process.env.SLACK_WEBHOOK_URL!,
},
});
new logs.SubscriptionFilter(this, 'AlertSubscription', {
logGroup: logGroup,
destination: new LambdaDestination(alertFunction),
filterPattern: logs.FilterPattern.anyTerm('FATAL', 'CRITICAL'),
});
// Stage 2: Stream to analytics platform
const analyticsStream = new kinesis.Stream(this, 'AnalyticsStream', {
shardCount: 2,
});
// Connect to Elasticsearch for search and visualization
const esAnalytics = new lambda.Function(this, 'EsAnalytics', {
runtime: lambda.Runtime.PYTHON_3_9,
handler: 'es_processor.handler',
code: lambda.Code.fromAsset('lambda/elasticsearch'),
});
analyticsStream.grantRead(esAnalytics);
new logs.SubscriptionFilter(this, 'AnalyticsSubscription', {
logGroup: logGroup,
destination: new KinesisDestination(analyticsStream),
filterPattern: logs.FilterPattern.allEvents(),
});