or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-log-management.mdindex.mdlog-forwarding.mdmetric-extraction.mdpattern-matching.mdquery-definitions.mdresource-policies.md
tile.json

log-forwarding.mddocs/

Log Forwarding and Destinations

Stream log events to external destinations including Lambda functions, Kinesis streams, and cross-account destinations with automatic IAM role management.

Capabilities

SubscriptionFilter

Forward log events from a log group to external destinations like Lambda functions, Kinesis streams, or other AWS services using pattern-based filtering.

/**
 * Forward log events to external destinations based on filter patterns
 */
class SubscriptionFilter extends Construct {
  /**
   * Create a subscription filter
   * @param scope - Construct scope
   * @param id - Construct ID
   * @param props - SubscriptionFilter configuration properties
   */
  constructor(scope: Construct, id: string, props: SubscriptionFilterProps);
}

/**
 * Properties for SubscriptionFilter
 */
interface SubscriptionFilterProps {
  /** Source log group to forward events from */
  readonly logGroup: ILogGroup;
  
  /** Destination to forward log events to */
  readonly destination: ILogSubscriptionDestination;
  
  /** Pattern to match log events for forwarding */
  readonly filterPattern: IFilterPattern;
}

Usage Examples:

import * as logs from '@aws-cdk/aws-logs';
import * as lambda from '@aws-cdk/aws-lambda';
import * as kinesis from '@aws-cdk/aws-kinesis';

const logGroup = new logs.LogGroup(this, 'MyLogGroup');

// Forward to Lambda function
const processingFunction = new lambda.Function(this, 'LogProcessor', {
  runtime: lambda.Runtime.NODEJS_14_X,
  handler: 'index.handler',
  code: lambda.Code.fromInline(`
    exports.handler = async (event) => {
      console.log('Processing logs:', JSON.stringify(event, null, 2));
    };
  `),
});

const lambdaSubscription = new logs.SubscriptionFilter(this, 'LambdaSubscription', {
  logGroup: logGroup,
  destination: new logs.destinations.LambdaDestination(processingFunction),
  filterPattern: logs.FilterPattern.literal('ERROR'),
});

// Forward to Kinesis stream
const logStream = new kinesis.Stream(this, 'LogStream', {
  shardCount: 1,
});

const kinesisSubscription = new logs.SubscriptionFilter(this, 'KinesisSubscription', {
  logGroup: logGroup,
  destination: new logs.destinations.KinesisDestination(logStream),
  filterPattern: logs.FilterPattern.allEvents(),
});

CrossAccountDestination

Enable cross-account log forwarding by creating a destination that can receive log events from other AWS accounts with proper IAM policies.

/**
 * Create a cross-account destination for receiving log events
 */
class CrossAccountDestination extends Construct implements ILogSubscriptionDestination {
  /**
   * Create a cross-account destination
   * @param scope - Construct scope
   * @param id - Construct ID
   * @param props - CrossAccountDestination configuration properties
   */
  constructor(scope: Construct, id: string, props: CrossAccountDestinationProps);

  /** IAM policy document for the destination */
  readonly policyDocument: iam.PolicyDocument;
  
  /** Name of the destination */
  readonly destinationName: string;
  
  /** ARN of the destination */
  readonly destinationArn: string;

  /**
   * Add a policy statement to allow access from external accounts
   * @param statement - IAM policy statement to add
   */
  addToPolicy(statement: iam.PolicyStatement): void;

  /**
   * Configure this destination for use with a subscription filter
   * @param scope - Construct scope
   * @param sourceLogGroup - Source log group
   * @returns Destination configuration
   */
  bind(scope: Construct, sourceLogGroup: ILogGroup): LogSubscriptionDestinationConfig;
}

/**
 * Properties for CrossAccountDestination
 */
interface CrossAccountDestinationProps {
  /** Name of the destination */
  readonly destinationName?: string;
  
  /** IAM role for writing to the target service */
  readonly role: iam.IRole;
  
  /** ARN of the target service (Kinesis stream, Lambda function, etc.) */
  readonly targetArn: string;
}

Usage Examples:

import * as logs from '@aws-cdk/aws-logs';
import * as iam from '@aws-cdk/aws-iam';
import * as kinesis from '@aws-cdk/aws-kinesis';

// Create target Kinesis stream
const centralLogStream = new kinesis.Stream(this, 'CentralLogStream', {
  shardCount: 2,
});

// Create IAM role for writing to Kinesis
const destinationRole = new iam.Role(this, 'DestinationRole', {
  assumedBy: new iam.ServicePrincipal('logs.amazonaws.com'),
  inlinePolicies: {
    KinesisWritePolicy: new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ['kinesis:PutRecord', 'kinesis:PutRecords'],
          resources: [centralLogStream.streamArn],
        }),
      ],
    }),
  },
});

// Create cross-account destination
const crossAccountDestination = new logs.CrossAccountDestination(this, 'LogDestination', {
  destinationName: 'CentralLoggingDestination',
  role: destinationRole,
  targetArn: centralLogStream.streamArn,
});

// Allow specific accounts to use this destination
crossAccountDestination.addToPolicy(new iam.PolicyStatement({
  principals: [
    new iam.AccountPrincipal('123456789012'),
    new iam.AccountPrincipal('210987654321'),
  ],
  actions: ['logs:PutSubscriptionFilter'],
  resources: [crossAccountDestination.destinationArn],
}));

// Use destination in subscription filter (can be in different account)
const subscription = new logs.SubscriptionFilter(this, 'CrossAccountSubscription', {
  logGroup: someLogGroup,
  destination: crossAccountDestination,
  filterPattern: logs.FilterPattern.allEvents(),
});

Destination Interface

Base interface for implementing custom log destinations that can receive log events from subscription filters.

/**
 * Interface for log subscription destinations
 */
interface ILogSubscriptionDestination {
  /**
   * Configure this destination for use with a subscription filter
   * @param scope - Construct scope
   * @param sourceLogGroup - Source log group
   * @returns Destination configuration
   */
  bind(scope: Construct, sourceLogGroup: ILogGroup): LogSubscriptionDestinationConfig;
}

/**
 * Configuration for a log subscription destination
 */
interface LogSubscriptionDestinationConfig {
  /** ARN of the destination */
  readonly arn: string;
  
  /** IAM role for writing to the destination */
  readonly role?: iam.IRole;
}

Built-in Destination Types

Lambda Destination

Forward log events to Lambda functions for real-time processing, alerting, or custom analytics.

import { LambdaDestination } from '@aws-cdk/aws-logs-destinations';

const processingFunction = new lambda.Function(this, 'LogProcessor', {
  runtime: lambda.Runtime.PYTHON_3_9,
  handler: 'index.handler',
  code: lambda.Code.fromInline(`
import json
import base64
import gzip

def handler(event, context):
    # Decode and decompress log data
    compressed_payload = base64.b64decode(event['awslogs']['data'])
    uncompressed_payload = gzip.decompress(compressed_payload)
    log_data = json.loads(uncompressed_payload)
    
    for log_event in log_data['logEvents']:
        print(f"Log: {log_event['message']}")
    
    return {'statusCode': 200}
  `),
});

const lambdaDestination = new LambdaDestination(processingFunction, {
  addPermissions: true, // Automatically add CloudWatch Logs permissions
});

new logs.SubscriptionFilter(this, 'LambdaSubscription', {
  logGroup: logGroup,
  destination: lambdaDestination,
  filterPattern: logs.FilterPattern.literal('ERROR'),
});

Kinesis Destination

Stream log events to Kinesis Data Streams for scalable real-time analytics and processing.

import { KinesisDestination } from '@aws-cdk/aws-logs-destinations';

const analyticsStream = new kinesis.Stream(this, 'AnalyticsStream', {
  shardCount: 3,
  retentionPeriod: cdk.Duration.hours(48),
});

const kinesisDestination = new KinesisDestination(analyticsStream);

new logs.SubscriptionFilter(this, 'KinesisSubscription', {
  logGroup: logGroup,
  destination: kinesisDestination,
  filterPattern: logs.FilterPattern.exists('$.requestId'),
});

Kinesis Data Firehose Destination

Forward log events to Kinesis Data Firehose for delivery to S3, Elasticsearch, or other analytics services.

import { KinesisFirehoseDestination } from '@aws-cdk/aws-logs-destinations';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';

const deliveryStream = new firehose.DeliveryStream(this, 'LogDeliveryStream', {
  destinations: [
    new firehose.destinations.S3Bucket(logsBucket, {
      compression: firehose.Compression.GZIP,
      prefix: 'application-logs/',
    }),
  ],
});

const firehoseDestination = new KinesisFirehoseDestination(deliveryStream);

new logs.SubscriptionFilter(this, 'FirehoseSubscription', {
  logGroup: logGroup,
  destination: firehoseDestination,
  filterPattern: logs.FilterPattern.allEvents(),
});

Advanced Log Forwarding Patterns

Multi-Destination Forwarding

const logGroup = new logs.LogGroup(this, 'ApplicationLogs');

// Send errors to Lambda for immediate processing
const errorProcessor = new lambda.Function(this, 'ErrorProcessor', {
  runtime: lambda.Runtime.NODEJS_14_X,
  handler: 'error-handler.process',
  code: lambda.Code.fromAsset('lambda/error-processor'),
});

new logs.SubscriptionFilter(this, 'ErrorSubscription', {
  logGroup: logGroup,
  destination: new LambdaDestination(errorProcessor),
  filterPattern: logs.FilterPattern.literal('ERROR'),
});

// Send all logs to Kinesis for analytics
const analyticsStream = new kinesis.Stream(this, 'AnalyticsStream');

new logs.SubscriptionFilter(this, 'AnalyticsSubscription', {
  logGroup: logGroup,
  destination: new KinesisDestination(analyticsStream),
  filterPattern: logs.FilterPattern.allEvents(),
});

Centralized Logging Architecture

// Central logging account setup
class CentralLoggingStack extends cdk.Stack {
  public readonly logDestination: logs.CrossAccountDestination;

  constructor(scope: Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    // Central log storage
    const centralStream = new kinesis.Stream(this, 'CentralLogStream', {
      shardCount: 5,
    });

    // IAM role for log forwarding
    const logForwardingRole = new iam.Role(this, 'LogForwardingRole', {
      assumedBy: new iam.ServicePrincipal('logs.amazonaws.com'),
      inlinePolicies: {
        KinesisWrite: new iam.PolicyDocument({
          statements: [
            new iam.PolicyStatement({
              actions: ['kinesis:PutRecord', 'kinesis:PutRecords'],
              resources: [centralStream.streamArn],
            }),
          ],
        }),
      },
    });

    // Cross-account destination
    this.logDestination = new logs.CrossAccountDestination(this, 'CentralDestination', {
      role: logForwardingRole,
      targetArn: centralStream.streamArn,
    });

    // Allow all organization accounts
    this.logDestination.addToPolicy(new iam.PolicyStatement({
      principals: [new iam.OrganizationPrincipal('o-example12345')],
      actions: ['logs:PutSubscriptionFilter'],
      resources: [this.logDestination.destinationArn],
    }));
  }
}

// Application account usage
class ApplicationStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    const appLogGroup = new logs.LogGroup(this, 'AppLogs');

    // Import cross-account destination
    const centralDestination = logs.CrossAccountDestination.fromCrossAccountDestinationArn(
      this,
      'CentralDestination',
      'arn:aws:logs:us-east-1:CENTRAL-ACCOUNT:destination:CentralDestination'
    );

    // Forward logs to central account
    new logs.SubscriptionFilter(this, 'CentralSubscription', {
      logGroup: appLogGroup,
      destination: centralDestination,
      filterPattern: logs.FilterPattern.allEvents(),
    });
  }
}

Real-time Log Processing Pipeline

// Multi-stage log processing
const logGroup = new logs.LogGroup(this, 'ServiceLogs');

// Stage 1: Real-time alerting for critical errors
const alertFunction = new lambda.Function(this, 'AlertFunction', {
  runtime: lambda.Runtime.PYTHON_3_9,
  handler: 'alerts.handler',
  code: lambda.Code.fromAsset('lambda/alerts'),
  environment: {
    SLACK_WEBHOOK_URL: process.env.SLACK_WEBHOOK_URL!,
  },
});

new logs.SubscriptionFilter(this, 'AlertSubscription', {
  logGroup: logGroup,
  destination: new LambdaDestination(alertFunction),
  filterPattern: logs.FilterPattern.anyTerm('FATAL', 'CRITICAL'),
});

// Stage 2: Stream to analytics platform
const analyticsStream = new kinesis.Stream(this, 'AnalyticsStream', {
  shardCount: 2,
});

// Connect to Elasticsearch for search and visualization
const esAnalytics = new lambda.Function(this, 'EsAnalytics', {
  runtime: lambda.Runtime.PYTHON_3_9,
  handler: 'es_processor.handler',
  code: lambda.Code.fromAsset('lambda/elasticsearch'),
});

analyticsStream.grantRead(esAnalytics);

new logs.SubscriptionFilter(this, 'AnalyticsSubscription', {
  logGroup: logGroup,
  destination: new KinesisDestination(analyticsStream),
  filterPattern: logs.FilterPattern.allEvents(),
});