Amazon CloudWatch

AWS CloudWatch LogGroup event export

There are three ways to export logs from a log group to an S3 bucket/SIEM/Data Lake or any other destinations. AWS Documentation

  • Subscription filter with Kinesis Data Streams
  • Subscription filter with Amazon Data Firehose
  • Subscription filter with AWS Lambda

Fluency Platform supports all three methods.


Subscription filter with Amazon Data Firehose

Step 1: Setup IAM Roles for CloudWatch and Firehose service

  • Create an S3 bucket to hold the log group data (shared by all log groups)
  • Create a new IAM role and allow Firehose to write to the S3 bucket you created
  • Create a new IAM role and allow CloudWatch to write to the Firehose stream
  • CloudFormation Template

CloudFormation Parameters:

  • CloudWatchRole: fluencyCloudwatchToFireHose
  • FirehoseRole: fluencyFireHoseToS3
  • S3Bucket: {yourcompany}-fluency-cloudwatch-firehose

Step 2: Add LogGroup one by one

  • Create a new Firehose stream
  • Create a new subscription filter for the log group and set the destination to the Firehose stream
  • CloudFormation Template

CloudFormation Parameters:

  • CloudWatchRole: fluencyCloudwatchToFireHose
  • FilterName: passthrough
  • FilterPattern: ""
  • FirehoseRole: fluencyFireHoseToS3
  • LogGroup:
  • S3Bucket: {yourcompany}-fluency-cloudwatch-firehose
  • Name: //S3 object Prefix

Step 3: Configure an "S3 bucket" integration in Fluency

  • Set the bucket and region

  • Set the mode to read

  • Select the AWS authentication method:

    • EC2 Instance Role
    • IAM User
    • Access Key (create a new IAM user)

Step 4: Add an "S3" type data source in Fluency

  • Set the Receiver to CloudwatchEventSplit
  • Set input to JSON
function main({obj, size, source, props}) {
    if (obj.messageType != "DATA_MESSAGE") {
      return null
    }
    let list = []
    let logEvents = obj.logEvents
    if (!logEvents) {
       printf("logEvents field not found")
       return null
    }
    for i, event = range logEvents {
      let envelop = {
         logGroup: obj.logGroup,
         logStream: obj.logStream,
         subscriptionFilters: obj.subscriptionFilters,
         "@message": event.message,
         "@type": "event",
         "@timestamp": event.timestamp,
         "@source": obj.logStream
      }
      list = append(list, envelop)
    }
    return list
}

Pros

  • Easy to setup
  • Cheaper than Kinesis Data Streams

Cons

  • One Firehose for one destination
  • Limited destination options

Subscription filter with Kinesis Data Streams

Step 1: Setup IAM Roles for CloudWatch and Kinesis service

  • Create a new Kinesis stream
  • Create a new IAM role and allow CloudWatch to write to the Kinesis stream
  • Create a new subscription filter for the log group and set the destination to the Kinesis stream
  • CloudFormation Template

CloudFormation Parameters:

  • CloudWatchRole: fluencyCloudwatchToKinesis
  • FilterName: passToKinesis
  • StreamName
  • LogGroup

Step 2: Configure an "AWS Kinesis" integration in Fluency

  • Set the Kinesis stream name and region

  • Set the mode to read

  • Select the AWS authentication method:

    • EC2 Instance Role
    • IAM User
    • Access Key (create a new IAM user)

Step 3: Add a "Kinesis" type data source in Fluency

  • Set the Receiver to CloudwatchEventSplit
  • Set input to JSON

Pros

  • Easy to setup
  • Flexible destination options
  • 24 hours data retention

Cons

  • More expensive (each stream costs $0.10 per hour plus data ingress cost)

Subscription filter with Lambda

  • Create an S3 bucket for the Lambda function to write to

  • Create an IAM role for the Lambda function (execution role)

    • Allow Lambda to write to the S3 bucket
    • Allow Lambda to write to CloudWatch Logs (if logging is needed)
  • Create a Lambda function:

import datetime
import os
import uuid 
import base64
import boto3
import time

def lambda_handler(event, context):
    
    LOGGROUP = os.environ['LOGGROUP']
    DESTINATION_BUCKET = os.environ['BUCKET']
    PREFIX = os.environ['PREFIX']
    currentTime = datetime.datetime.now()
    LOGGROUP = LOGGROUP.replace('/','_')
    OBJECT_PREFIX = os.path.join(PREFIX, LOGGROUP, currentTime.strftime('%Y%m%d').format(os.path.sep))
    encoded_zipped_data = event['awslogs']['data']
    zipped_data = base64.b64decode(encoded_zipped_data)
    
    basename = currentTime.strftime('%H-%M-%S')
    object_path = '{}/{}-{}.json.gz'.format(OBJECT_PREFIX, currentTime.strftime('%H-%M-%S'),uuid.uuid1())
    s3 = boto3.client('s3') 
    s3.put_object(Body=zipped_data, Bucket=DESTINATION_BUCKET, Key=object_path)

CloudFormation Parameters:

  • S3Bucket
  • S3ObjectPrefix
  • LambdaRole
  • LambdaFunctionName
  • LogGroup
  • FilterPattern
  • FilterName

Step 2: Configure an "S3 bucket" integration in Fluency

  • Set the bucket and region

  • Set the mode to read

  • Select the AWS authentication method:

    • EC2 Instance Role
    • IAM User
    • Access Key (create a new IAM user)

Step 3: Add an "S3" type data source in Fluency

  • Set the Receiver to CloudwatchEventSplit
  • Set input to JSON

Pros

  • Flexible destination options
  • Cheapest solution (only pay Lambda invocation cost)

Cons

  • More complex setup