Python Essentials for AWS DevOps: Automating IAM Security Alerts

As an AWS Certified Solutions architect that specializes in backend engineering, Python has become my go-to language for building robust DevOps automation. Its clean syntax, extensive AWS SDK support through boto3, and seamless integration with Lambda make it perfect for infrastructure automation. In this post, I'll walk you through building a real-world security automation: monitoring IAM changes and alerting your security team in real-time.

Why Python for AWS Lambda?

Python strikes an ideal balance for serverless functions. It offers:

  • Fast cold starts compared to JVM-based languages

  • Native AWS support with boto3 pre-installed in Lambda environments

  • Readable code that's easy for teams to maintain

  • Rich ecosystem of libraries for testing and validation

The Use Case: IAM Change Detection

Identity and Access Management (IAM) changes are critical security events. Whether it's a new user creation, policy modification, or role assumption pattern, your security team needs immediate visibility. Let's build a Lambda function that detects these changes via CloudTrail and sends targeted email alerts through SNS.

Architecture Overview

Here's what we're building:

  1. CloudTrail captures all IAM API calls

  2. EventBridge filters for IAM-related events

  3. Lambda function processes events and formats alerts

  4. SNS topic delivers emails to security personnel

Building the Lambda Function

Let's start with the core Lambda handler. This function will receive CloudTrail events, parse them, and trigger SNS notifications:

import json
import boto3
import os
from datetime import datetime

# Initialize AWS clients
sns_client = boto3.client('sns')
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']

def lambda_handler(event, context):
    """
    Main Lambda handler for processing IAM change events
    """
    try:
        # Parse the CloudTrail event
        detail = event.get('detail', {})
        event_name = detail.get('eventName', 'Unknown')
        user_identity = detail.get('userIdentity', {})
        event_time = detail.get('eventTime', datetime.now().isoformat())
        source_ip = detail.get('sourceIPAddress', 'Unknown')
        
        # Determine the severity based on the action
        severity = determine_severity(event_name)
        
        # Format the alert message
        message = format_alert_message(
            event_name=event_name,
            user_identity=user_identity,
            event_time=event_time,
            source_ip=source_ip,
            severity=severity,
            detail=detail
        )
        
        # Send SNS notification
        response = send_sns_alert(message, severity)
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Alert sent successfully',
                'messageId': response['MessageId']
            })
        }
        
    except Exception as e:
        print(f"Error processing event: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def determine_severity(event_name):
    """
    Classify IAM events by severity level
    """
    critical_events = [
        'DeleteUserPolicy',
        'PutUserPolicy',
        'AttachUserPolicy',
        'CreateAccessKey',
        'DeleteAccountPasswordPolicy'
    ]
    
    high_events = [
        'CreateUser',
        'DeleteUser',
        'CreateRole',
        'DeleteRole',
        'UpdateAssumeRolePolicy'
    ]
    
    if event_name in critical_events:
        return 'CRITICAL'
    elif event_name in high_events:
        return 'HIGH'
    else:
        return 'MEDIUM'

def format_alert_message(event_name, user_identity, event_time, source_ip, severity, detail):
    """
    Create a human-readable alert message
    """
    principal = user_identity.get('principalId', 'Unknown')
    user_type = user_identity.get('type', 'Unknown')
    account_id = user_identity.get('accountId', 'Unknown')
    
    # Extract relevant parameters from the event
    request_params = detail.get('requestParameters', {})
    
    message = f"""
IAM Security Alert - {severity} Priority

Event Details:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Action Taken: {event_name}
Timestamp: {event_time}
Severity: {severity}

Identity Information:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Principal ID: {principal}
User Type: {user_type}
Account ID: {account_id}
Source IP: {source_ip}

Request Parameters:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
{json.dumps(request_params, indent=2)}

Please review this change immediately and verify it aligns with your security policies.
"""
    
    return message

def send_sns_alert(message, severity):
    """
    Send formatted message to SNS topic
    """
    subject = f"[AWS Security Alert - {severity}] IAM Change Detected"
    
    response = sns_client.publish(
        TopicArn=SNS_TOPIC_ARN,
        Subject=subject,
        Message=message,
        MessageAttributes={
            'severity': {
                'DataType': 'String',
                'StringValue': severity
            }
        }
    )
    
    return response

Setting Up the EventBridge Rule

To trigger this Lambda function, create an EventBridge rule that filters for IAM events:

{
  "source": ["aws.iam"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["iam.amazonaws.com"]
  }
}

IAM Permissions for Lambda

Your Lambda execution role needs these permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sns:Publish"
      ],
      "Resource": "arn:aws:sns:us-east-1:123456789012:iam-alerts"
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}

Testing Your Lambda Function

Before deploying to production, test with a sample CloudTrail event:

sample_event = {
    "detail": {
        "eventName": "CreateUser",
        "eventTime": "2024-12-11T10:30:00Z",
        "userIdentity": {
            "type": "IAMUser",
            "principalId": "AIDAI23EXAMPLE",
            "accountId": "123456789012"
        },
        "sourceIPAddress": "203.0.113.12",
        "requestParameters": {
            "userName": "new-developer"
        }
    }
}

Best Practices

When building production Lambda functions for security monitoring:

  • Use environment variables for configuration (SNS topics, severity thresholds)

  • Implement structured logging with JSON format for better CloudWatch Insights queries

  • Add retry logic for SNS publish failures

  • Set appropriate Lambda timeouts (30 seconds is usually sufficient)

  • Monitor Lambda errors with CloudWatch alarms

  • Use Lambda layers for shared code and dependencies

What's Next: Python in CI/CD Pipelines

Security automation is just one piece of the DevOps puzzle. In my next post, I'll dive into using Python for automated testing in CI/CD pipelines. We'll explore how Playwright, the modern browser automation framework, can revolutionize your end-to-end testing strategy. I'll show you how to write maintainable test suites that catch bugs before they reach production, integrate seamlessly with AWS CodeBuild, and provide fast feedback loops for your development team.

Stay tuned for hands-on examples of parallel test execution, visual regression testing, and building a robust testing infrastructure that scales with your application.

Have questions about implementing IAM monitoring or Lambda automation? Drop a comment below or reach out on LinkedIn. I'd love to hear how you're using Python in your AWS environments!

Previous
Previous

Python & Playwright in AWS CI/CD: End-to-End Testing Made Simple

Next
Next

Holiday Traditions - Egg Nog!