A Pulumi package for creating and managing Amazon Web Services (AWS) cloud resources with infrastructure-as-code.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon Kinesis makes it easy to collect, process, and analyze real-time streaming data.
import * as aws from "@pulumi/aws";
import * as kinesis from "@pulumi/aws/kinesis";Real-time data streaming service.
const stream = new aws.kinesis.Stream("data-stream", {
name: "application-events",
shardCount: 2,
retentionPeriod: 24,
streamModeDetails: {
streamMode: "PROVISIONED",
},
tags: {
Environment: "production",
},
});Deliver streaming data to destinations.
const deliveryStream = new aws.kinesis.FirehoseDeliveryStream("s3-delivery", {
name: "event-delivery-stream",
destination: "extended_s3",
extendedS3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
prefix: "events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
errorOutputPrefix: "errors/",
bufferingSize: 5,
bufferingInterval: 300,
compressionFormat: "GZIP",
},
});Real-time analytics on streaming data.
const analyticsApp = new aws.kinesisanalyticsv2.Application("analytics-app", {
name: "stream-analytics",
runtimeEnvironment: "FLINK-1_15",
serviceExecutionRole: analyticsRole.arn,
applicationConfiguration: {
applicationCodeConfiguration: {
codeContent: {
s3ContentLocation: {
bucketArn: codeBucket.arn,
fileKey: "analytics-app.jar",
},
},
codeContentType: "ZIPFILE",
},
environmentProperties: {
propertyGroups: [{
propertyGroupId: "ProducerConfigProperties",
propertyMap: {
"aws.region": "us-west-2",
},
}],
},
},
});const onDemandStream = new aws.kinesis.Stream("on-demand", {
name: "on-demand-stream",
streamModeDetails: {
streamMode: "ON_DEMAND",
},
retentionPeriod: 168, // 7 days
});const transformLambda = new aws.lambda.Function("transform", {
runtime: "python3.11",
handler: "index.handler",
role: lambdaRole.arn,
code: new pulumi.asset.AssetArchive({
"index.py": new pulumi.asset.StringAsset(`
def handler(event, context):
import base64
import json
output = []
for record in event['records']:
# Decode and transform
payload = base64.b64decode(record['data'])
data = json.loads(payload)
# Transform logic here
transformed = json.dumps(data)
output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(transformed.encode()).decode()
})
return {'records': output}
`),
}),
});
const deliveryStream = new aws.kinesis.FirehoseDeliveryStream("transformed", {
name: "transformed-delivery",
destination: "extended_s3",
extendedS3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
processingConfiguration: {
enabled: true,
processors: [{
type: "Lambda",
parameters: [{
parameterName: "LambdaArn",
parameterValue: transformLambda.arn,
}],
}],
},
},
});const consumer = new aws.kinesis.StreamConsumer("consumer", {
name: "event-processor",
streamArn: stream.arn,
});
const eventSourceMapping = new aws.lambda.EventSourceMapping("kinesis-trigger", {
eventSourceArn: consumer.arn,
functionName: processorFunction.arn,
startingPosition: "LATEST",
batchSize: 100,
maximumBatchingWindowInSeconds: 10,
});name - Stream nameshardCount - Number of shards (provisioned mode)streamModeDetails - Stream mode (PROVISIONED or ON_DEMAND)retentionPeriod - Data retention in hours (24-8760)encryptionType - Encryption type (NONE or KMS)kmsKeyId - KMS key ID for encryptionname - Delivery stream namedestination - Destination type (s3, redshift, elasticsearch, etc.)extendedS3Configuration - S3 destination configurationbufferingSize - Buffer size in MB (1-128)bufferingInterval - Buffer interval in seconds (60-900)id - Resource identifierarn - Resource ARNname - Resource nameInstall with Tessl CLI
npx tessl i tessl/npm-pulumi--aws