CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-aws-cdk--aws-events-targets

Event targets for Amazon EventBridge that enable routing events to various AWS services

Overview
Eval results
Files

analytics-targets.mddocs/

Analytics and Logging Targets

Targets for data streaming, analytics, and logging services that can receive and process EventBridge events.

Capabilities

Kinesis Stream Target

Send events to Amazon Kinesis Data Streams for real-time processing.

/**
 * Use a Kinesis stream as a target for Amazon EventBridge rules
 */
class KinesisStream implements events.IRuleTarget {
  constructor(stream: kinesis.IStream, props?: KinesisStreamProps);
  
  /**
   * Returns a RuleTarget that can be used to put records to this Kinesis stream
   * as a result from an EventBridge event
   */
  bind(rule: events.IRule, id?: string): events.RuleTargetConfig;
}

interface KinesisStreamProps {
  /**
   * Partition key path for the Kinesis record
   * The partition key determines which shard the record goes to
   * @default event ID from EventBridge
   */
  readonly partitionKeyPath?: string;
  
  /**
   * The message to send to the Kinesis stream
   * @default the entire EventBridge event
   */
  readonly message?: events.RuleTargetInput;
}

Usage Example:

import * as kinesis from "@aws-cdk/aws-kinesis";
import * as events from "@aws-cdk/aws-events";
import * as targets from "@aws-cdk/aws-events-targets";

// Create Kinesis stream
const stream = new kinesis.Stream(this, "EventStream", {
  streamName: "application-events",
  shardCount: 3,
  retentionPeriod: Duration.days(7),
});

// Rule for user activity events
const userActivityRule = new events.Rule(this, "UserActivityRule", {
  eventPattern: {
    source: ["myapp.users"],
    detailType: ["User Login", "User Logout", "Page View", "Action Performed"],
  },
});

// Send to Kinesis with user ID as partition key
userActivityRule.addTarget(new targets.KinesisStream(stream, {
  partitionKeyPath: "$.detail.userId",
  message: events.RuleTargetInput.fromObject({
    eventId: events.EventField.fromPath("$.id"),
    timestamp: events.EventField.fromPath("$.time"),
    eventType: events.EventField.fromPath("$.detail-type"),
    userId: events.EventField.fromPath("$.detail.userId"),
    sessionId: events.EventField.fromPath("$.detail.sessionId"),
    metadata: events.EventField.fromPath("$.detail.metadata"),
    source: events.EventField.fromPath("$.source"),
    region: events.EventField.fromPath("$.region"),
  }),
}));

// Rule for system metrics with different partition strategy
const metricsRule = new events.Rule(this, "MetricsRule", {
  eventPattern: {
    source: ["myapp.metrics"],
    detailType: ["Performance Metric", "Error Count", "Transaction Volume"],
  },
});

metricsRule.addTarget(new targets.KinesisStream(stream, {
  partitionKeyPath: "$.detail.metricType",
  message: events.RuleTargetInput.fromPath("$.detail"),
}));

// Send entire event for audit logging
const auditRule = new events.Rule(this, "AuditRule", {
  eventPattern: {
    source: ["myapp.security"],
  },
});

auditRule.addTarget(new targets.KinesisStream(stream, {
  partitionKeyPath: "$.detail.resourceId",
  // Default message is the entire event
}));

CloudWatch LogGroup Target

Send events to Amazon CloudWatch Logs for logging and monitoring.

/**
 * Use a CloudWatch LogGroup as a target for Amazon EventBridge rules
 */
class CloudWatchLogGroup implements events.IRuleTarget {
  constructor(logGroup: logs.ILogGroup, props?: LogGroupProps);
  
  /**
   * Returns a RuleTarget that can be used to log events to this CloudWatch LogGroup
   * as a result from an EventBridge event
   */
  bind(rule: events.IRule, id?: string): events.RuleTargetConfig;
}

interface LogGroupProps extends TargetBaseProps {
  /**
   * The event to send to the CloudWatch LogGroup
   * @default the entire EventBridge event
   */
  readonly event?: events.RuleTargetInput;
}

Usage Example:

import * as logs from "@aws-cdk/aws-logs";
import * as events from "@aws-cdk/aws-events";
import * as targets from "@aws-cdk/aws-events-targets";
import * as sqs from "@aws-cdk/aws-sqs";

// Create CloudWatch Log Groups
const applicationLogGroup = new logs.LogGroup(this, "ApplicationLogs", {
  logGroupName: "/aws/events/application",
  retention: logs.RetentionDays.ONE_MONTH,
});

const securityLogGroup = new logs.LogGroup(this, "SecurityLogs", {
  logGroupName: "/aws/events/security",
  retention: logs.RetentionDays.ONE_YEAR,
});

const errorLogGroup = new logs.LogGroup(this, "ErrorLogs", {
  logGroupName: "/aws/events/errors",
  retention: logs.RetentionDays.THREE_MONTHS,
});

// Create dead letter queue for failed log deliveries
const logDlq = new sqs.Queue(this, "LogDeadLetterQueue", {
  queueName: "log-delivery-failures",
});

// Rule for application events with structured logging
const appEventsRule = new events.Rule(this, "ApplicationEventsRule", {
  eventPattern: {
    source: ["myapp"],
    detailType: [
      "Order Created",
      "Payment Processed",
      "User Registration",
      "Data Export",
    ],
  },
});

appEventsRule.addTarget(new targets.CloudWatchLogGroup(applicationLogGroup, {
  deadLetterQueue: logDlq,
  retryAttempts: 3,
  maxEventAge: Duration.hours(1),
  event: events.RuleTargetInput.fromObject({
    timestamp: events.EventField.fromPath("$.time"),
    level: "INFO",
    eventType: events.EventField.fromPath("$.detail-type"),
    source: events.EventField.fromPath("$.source"),
    account: events.EventField.fromPath("$.account"),
    region: events.EventField.fromPath("$.region"),
    details: events.EventField.fromPath("$.detail"),
    eventId: events.EventField.fromPath("$.id"),
  }),
}));

// Rule for security events with detailed logging
const securityRule = new events.Rule(this, "SecurityEventsRule", {
  eventPattern: {
    source: ["aws.guardduty", "aws.securityhub", "myapp.security"],
    detailType: [
      "GuardDuty Finding",
      "Security Hub Findings - Imported",
      "Authentication Failure",
      "Privilege Escalation",
    ],
  },
});

securityRule.addTarget(new targets.CloudWatchLogGroup(securityLogGroup, {
  event: events.RuleTargetInput.fromObject({
    "@timestamp": events.EventField.fromPath("$.time"),
    level: "WARN",
    eventType: events.EventField.fromPath("$.detail-type"),
    source: events.EventField.fromPath("$.source"),
    account: events.EventField.fromPath("$.account"),
    region: events.EventField.fromPath("$.region"),
    severity: events.EventField.fromPath("$.detail.severity"),
    finding: events.EventField.fromPath("$.detail"),
    eventId: events.EventField.fromPath("$.id"),
  }),
}));

// Rule for error events
const errorRule = new events.Rule(this, "ErrorEventsRule", {
  eventPattern: {
    source: ["myapp"],
    detailType: ["Error Occurred", "Exception Thrown", "Service Failure"],
  },
});

errorRule.addTarget(new targets.CloudWatchLogGroup(errorLogGroup, {
  event: events.RuleTargetInput.fromObject({
    "@timestamp": events.EventField.fromPath("$.time"),
    level: "ERROR",
    eventType: events.EventField.fromPath("$.detail-type"),
    source: events.EventField.fromPath("$.source"),
    errorMessage: events.EventField.fromPath("$.detail.error.message"),
    errorCode: events.EventField.fromPath("$.detail.error.code"),
    stackTrace: events.EventField.fromPath("$.detail.error.stackTrace"),
    context: events.EventField.fromPath("$.detail.context"),
    eventId: events.EventField.fromPath("$.id"),
  }),
}));

// Simple logging for AWS service events
const awsEventsRule = new events.Rule(this, "AWSEventsRule", {
  eventPattern: {
    source: [{ prefix: "aws." }],
  },
});

awsEventsRule.addTarget(new targets.CloudWatchLogGroup(applicationLogGroup));

Kinesis Data Firehose Target

Send events to Amazon Kinesis Data Firehose for data delivery to analytics services.

/**
 * Use a Kinesis Data Firehose stream as a target for Amazon EventBridge rules
 */
class KinesisFirehoseStream implements events.IRuleTarget {
  constructor(stream: firehose.CfnDeliveryStream, props?: KinesisFirehoseStreamProps);
  
  /**
   * Returns a RuleTarget that can be used to put records to this Kinesis Data Firehose stream
   * as a result from an EventBridge event
   */
  bind(rule: events.IRule, id?: string): events.RuleTargetConfig;
}

interface KinesisFirehoseStreamProps {
  /**
   * The message to send to the Kinesis Data Firehose stream
   * @default the entire EventBridge event
   */
  readonly message?: events.RuleTargetInput;
}

Usage Example:

import * as firehose from "@aws-cdk/aws-kinesisfirehose";
import * as s3 from "@aws-cdk/aws-s3";
import * as iam from "@aws-cdk/aws-iam";
import * as events from "@aws-cdk/aws-events";
import * as targets from "@aws-cdk/aws-events-targets";

// Create S3 bucket for data storage
const dataLakeBucket = new s3.Bucket(this, "DataLakeBucket", {
  bucketName: "my-analytics-data-lake",
  lifecycleRules: [{
    id: "archive-old-data",
    transitions: [{
      storageClass: s3.StorageClass.GLACIER,
      transitionAfter: Duration.days(90),
    }],
  }],
});

// Create IAM role for Firehose
const firehoseRole = new iam.Role(this, "FirehoseRole", {
  assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
  inlinePolicies: {
    S3Access: new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: [
            "s3:AbortMultipartUpload",
            "s3:GetBucketLocation",
            "s3:GetObject",
            "s3:ListBucket",
            "s3:ListBucketMultipartUploads",
            "s3:PutObject",
          ],
          resources: [
            dataLakeBucket.bucketArn,
            `${dataLakeBucket.bucketArn}/*`,
          ],
        }),
      ],
    }),
  },
});

// Create Kinesis Data Firehose delivery stream
const deliveryStream = new firehose.CfnDeliveryStream(this, "EventDeliveryStream", {
  deliveryStreamName: "event-analytics-stream",
  deliveryStreamType: "DirectPut",
  extendedS3DestinationConfiguration: {
    bucketArn: dataLakeBucket.bucketArn,
    roleArn: firehoseRole.roleArn,
    prefix: "events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
    errorOutputPrefix: "errors/",
    bufferingHints: {
      sizeInMBs: 5,
      intervalInSeconds: 300,
    },
    compressionFormat: "GZIP",
    dataFormatConversionConfiguration: {
      enabled: true,
      outputFormatConfiguration: {
        serializer: {
          parquetSerDe: {},
        },
      },
    },
  },
});

// Rule for analytics events
const analyticsRule = new events.Rule(this, "AnalyticsRule", {
  eventPattern: {
    source: ["myapp"],
    detailType: [
      "User Activity",
      "Transaction",
      "Performance Metric",
      "Business Event",
    ],
  },
});

// Send structured data to Firehose
analyticsRule.addTarget(new targets.KinesisFirehoseStream(deliveryStream, {
  message: events.RuleTargetInput.fromObject({
    eventId: events.EventField.fromPath("$.id"),
    timestamp: events.EventField.fromPath("$.time"),
    eventType: events.EventField.fromPath("$.detail-type"),
    source: events.EventField.fromPath("$.source"),
    account: events.EventField.fromPath("$.account"),
    region: events.EventField.fromPath("$.region"),
    // Flatten the detail for easier querying
    userId: events.EventField.fromPath("$.detail.userId"),
    sessionId: events.EventField.fromPath("$.detail.sessionId"),
    deviceType: events.EventField.fromPath("$.detail.deviceType"),
    userAgent: events.EventField.fromPath("$.detail.userAgent"),
    ipAddress: events.EventField.fromPath("$.detail.ipAddress"),
    // Include full detail as JSON string for complex analysis
    detailJson: events.EventField.fromPath("$.detail"),
  }),
}));

// Send raw events for comprehensive analysis
const rawEventsRule = new events.Rule(this, "RawEventsRule", {
  eventPattern: {
    source: [{ prefix: "myapp." }],
  },
});

rawEventsRule.addTarget(new targets.KinesisFirehoseStream(deliveryStream));

Analytics Patterns

Stream Processing Architecture

// Multi-stream setup for different data types
const realTimeStream = new kinesis.Stream(this, "RealTimeStream", {
  shardCount: 5,
  retentionPeriod: Duration.hours(24),
});

const batchStream = new kinesis.Stream(this, "BatchStream", {
  shardCount: 2,
  retentionPeriod: Duration.days(7),
});

// High-frequency events go to real-time stream
const realtimeRule = new events.Rule(this, "RealtimeRule", {
  eventPattern: {
    source: ["myapp.realtime"],
    detailType: ["Click", "Page View", "API Call"],
  },
});

realtimeRule.addTarget(new targets.KinesisStream(realTimeStream, {
  partitionKeyPath: "$.detail.sessionId",
}));

// Lower-frequency events go to batch stream
const batchRule = new events.Rule(this, "BatchRule", {
  eventPattern: {
    source: ["myapp.batch"],
    detailType: ["Daily Report", "Weekly Summary", "Monthly Metrics"],
  },
});

batchRule.addTarget(new targets.KinesisStream(batchStream, {
  partitionKeyPath: "$.detail.reportType",
}));

Log Aggregation Patterns

// Centralized logging with different retention policies
const debugLogGroup = new logs.LogGroup(this, "DebugLogs", {
  retention: logs.RetentionDays.THREE_DAYS,
});

const infoLogGroup = new logs.LogGroup(this, "InfoLogs", {
  retention: logs.RetentionDays.ONE_WEEK,
});

const errorLogGroup = new logs.LogGroup(this, "ErrorLogs", {
  retention: logs.RetentionDays.SIX_MONTHS,
});

// Route events by severity
const debugRule = new events.Rule(this, "DebugRule", {
  eventPattern: {
    detail: { level: ["DEBUG"] },
  },
});

const infoRule = new events.Rule(this, "InfoRule", {
  eventPattern: {
    detail: { level: ["INFO"] },
  },
});

const errorRule = new events.Rule(this, "ErrorRule", {
  eventPattern: {
    detail: { level: ["ERROR", "FATAL"] },
  },
});

debugRule.addTarget(new targets.CloudWatchLogGroup(debugLogGroup));
infoRule.addTarget(new targets.CloudWatchLogGroup(infoLogGroup));
errorRule.addTarget(new targets.CloudWatchLogGroup(errorLogGroup));

Install with Tessl CLI

npx tessl i tessl/npm-aws-cdk--aws-events-targets

docs

analytics-targets.md

api-targets.md

cicd-targets.md

compute-targets.md

index.md

messaging-targets.md

orchestration-targets.md

system-targets.md

tile.json