Skip to content

feat: Support for Security Hub #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ba032ab
update: start format aws security hub
kaykhan Nov 14, 2024
4a22905
chore
kaykhan Nov 14, 2024
667cbe6
feat: security hub
kaykhan Nov 14, 2024
1725f62
feat: security hub
kaykhan Nov 14, 2024
c97dfb8
feat: security hub
kaykhan Nov 14, 2024
355524b
feat: security hub
kaykhan Nov 14, 2024
e978494
feat: security hub
kaykhan Nov 14, 2024
550268a
feat: security hub
kaykhan Nov 14, 2024
bf19f81
feat: security hub
kaykhan Nov 14, 2024
abf7972
feat: security hub
kaykhan Nov 14, 2024
ef9e59a
feat: security hub
kaykhan Nov 14, 2024
a0c9c53
feat: security hub
kaykhan Nov 15, 2024
3e98f0a
feat: security hub
kaykhan Nov 15, 2024
78ce6cf
update: automatically switch status from new to notified
kaykhan Nov 15, 2024
04623a3
update: automatically switch status from new to notified
kaykhan Nov 15, 2024
20f55fd
update: automatically switch status from new to notified
kaykhan Nov 15, 2024
ec51114
update: automatically switch status from new to notified
kaykhan Nov 15, 2024
d0dcb6c
update: automatically switch status from new to notified
kaykhan Nov 15, 2024
0610640
update: automatically switch status from new to notified
kaykhan Nov 15, 2024
47efa94
update: automatically switch status from new to notified
kaykhan Nov 15, 2024
eb26ec4
update: automatically switch status from new to notified#
kaykhan Nov 15, 2024
51b7081
Merge branch 'master' into kay/aws-security-hub
bryantbiggs Jan 6, 2025
0357c20
fix: lint
kaykhan Jan 7, 2025
d48acab
refactor: parsing of each notification into its own function
kaykhan Mar 11, 2025
daf5ea8
Merge branch 'master' into kay/aws-security-hub
kaykhan Mar 11, 2025
e140b8f
fix: lint
kaykhan Mar 11, 2025
fef326c
Merge branch 'kay/aws-security-hub' of https://github.com/kaykhan/ter…
kaykhan Mar 11, 2025
b017387
fix: lint
kaykhan Mar 11, 2025
2b128e4
fix: unit tests
kaykhan Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 170 additions & 23 deletions functions/notify_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
# Create client so its cached/frozen between invocations
KMS_CLIENT = boto3.client("kms", region_name=REGION)

SECURITY_HUB_CLIENT = boto3.client('securityhub', region_name=REGION)


class AwsService(Enum):
"""AWS service supported by function"""

cloudwatch = "cloudwatch"
guardduty = "guardduty"
securityhub = "securityhub"


def decrypt_url(encrypted_url: str) -> str:
Expand Down Expand Up @@ -123,6 +126,148 @@ def format_cloudwatch_alarm(message: Dict[str, Any], region: str) -> Dict[str, A
}


def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, Any]:
"""
Format AWS Security Hub finding event into Slack message format

:params message: SNS message body containing SecurityHub finding event
:params region: AWS region where the event originated from
:returns: formatted Slack message payload
"""
service_url = get_service_url(region=region, service="securityhub")
finding = message["detail"]["findings"][0]

# Switch Status From New To Notified To Prevent Repeated Messages
try:
compliance_status = finding["Compliance"].get("Status", "UNKNOWN")
workflow_status = finding["Workflow"].get("Status", "UNKNOWN")
if compliance_status == "FAILED" and workflow_status == "NEW":
notified = SECURITY_HUB_CLIENT.batch_update_findings(
FindingIdentifiers=[{
'Id': finding.get('Id'),
'ProductArn': finding.get("ProductArn")
}],
Workflow={"Status": "NOTIFIED"}
)
logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}")
except Exception as e:
logging.error(f"Failed to update finding status: {str(e)}")
pass

if finding.get("ProductName") == "Inspector":
severity = finding["Severity"].get("Label", "INFORMATIONAL")
compliance_status = finding["Compliance"].get("Status", "UNKNOWN")

Id = finding.get("Id", "No ID Provided")
title = finding.get("Title", "No Title Provided")
description = finding.get("Description", "No Description Provided")
control_id = finding['ProductFields'].get('ControlId', 'N/A')
control_url = service_url + f"#/controls/{control_id}"
aws_account_id = finding.get('AwsAccountId', 'Unknown Account')
first_observed = finding.get('FirstObservedAt', 'Unknown Date')
last_updated = finding.get('UpdatedAt', 'Unknown Date')
affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource')
remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#")

finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A"
double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='')
finding_url = f"{service_url}{finding_base_path}{double_encoded_id}"
generator_id = finding.get("GeneratorId", "Unknown Generator")

color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value
if compliance_status == "PASSED":
color = "#4BB543"

slack_message = {
"color": color,
"fallback": f"Inspector Finding: {title}",
"fields": [
{"title": "Title", "value": f"`{title}`", "short": False},
{"title": "Description", "value": f"`{description}`", "short": False},
{"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True},
{"title": "Severity", "value": f"`{severity}`", "short": True},
{"title": "Control ID", "value": f"`{control_id}`", "short": True},
{"title": "Account ID", "value": f"`{aws_account_id}`", "short": True},
{"title": "First Observed", "value": f"`{first_observed}`", "short": True},
{"title": "Last Updated", "value": f"`{last_updated}`", "short": True},
{"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False},
{"title": "Generator", "value": f"`{generator_id}`", "short": False},
{"title": "Control Url", "value": f"`{control_url}`", "short": False},
{"title": "Finding Url", "value": f"`{finding_url}`", "short": False},
{"title": "Remediation", "value": f"`{remediation_url}`", "short": False},
],
"text": f"AWS Inspector Finding - {title}",
}

return slack_message

if finding.get("ProductName") == "Security Hub":
severity = finding["Severity"].get("Label", "INFORMATIONAL")
compliance_status = finding["Compliance"].get("Status", "UNKNOWN")

Id = finding.get("Id", "No ID Provided")
title = finding.get("Title", "No Title Provided")
description = finding.get("Description", "No Description Provided")
control_id = finding['ProductFields'].get('ControlId', 'N/A')
control_url = service_url + f"#/controls/{control_id}"
aws_account_id = finding.get('AwsAccountId', 'Unknown Account')
first_observed = finding.get('FirstObservedAt', 'Unknown Date')
last_updated = finding.get('UpdatedAt', 'Unknown Date')
affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource')
remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#")
generator_id = finding.get("GeneratorId", "Unknown Generator")

finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A"
double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='')
finding_url = f"{service_url}{finding_base_path}{double_encoded_id}"

color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value
if compliance_status == "PASSED":
color = "#4BB543"

slack_message = {
"color": color,
"fallback": f"Security Hub Finding: {title}",
"fields": [
{"title": "Title", "value": f"`{title}`", "short": False},
{"title": "Description", "value": f"`{description}`", "short": False},
{"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True},
{"title": "Severity", "value": f"`{severity}`", "short": True},
{"title": "Control ID", "value": f"`{control_id}`", "short": True},
{"title": "Account ID", "value": f"`{aws_account_id}`", "short": True},
{"title": "First Observed", "value": f"`{first_observed}`", "short": True},
{"title": "Last Updated", "value": f"`{last_updated}`", "short": True},
{"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False},
{"title": "Generator", "value": f"`{generator_id}`", "short": False},
{"title": "Control Url", "value": f"`{control_url}`", "short": False},
{"title": "Finding Url", "value": f"`{finding_url}`", "short": False},
{"title": "Remediation", "value": f"`{remediation_url}`", "short": False},
],
"text": f"AWS Security Hub Finding - {title}",
}

return slack_message

return format_default(message=message)


class SecurityHubSeverity(Enum):
"""Maps Security Hub finding severity to Slack message format color"""

CRITICAL = "danger"
HIGH = "danger"
MEDIUM = "warning"
LOW = "#777777"
INFORMATIONAL = "#439FE0"

@staticmethod
def get(name, default):
try:
return SecurityHubSeverity[name]
except KeyError:
return default


class GuardDutyFindingSeverity(Enum):
"""Maps GuardDuty finding severity to Slack message format color"""

Expand Down Expand Up @@ -358,6 +503,28 @@ def format_default(
return attachments


def parse_notification(message: Dict[str, Any], subject: Optional[str], region: str) -> Optional[Dict]:
"""
Parse notification message and format into Slack message payload

:params message: SNS message body notification payload
:params subject: Optional subject line for Slack notification
:params region: AWS region where the event originated from
:returns: Slack message payload
"""
if "AlarmName" in message:
return format_cloudwatch_alarm(message=message, region=region)
if isinstance(message, Dict) and message.get("detail-type") == "GuardDuty Finding":
return format_guardduty_finding(message=message, region=message["region"])
if isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported":
return format_aws_security_hub(message=message, region=message["region"])
if isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event":
return format_aws_health(message=message, region=message["region"])
if subject == "Notification from AWS Backup":
return format_aws_backup(message=str(message))
return format_default(message=message, subject=subject)


def get_slack_message_payload(
message: Union[str, Dict], region: str, subject: Optional[str] = None
) -> Dict:
Expand Down Expand Up @@ -389,31 +556,10 @@ def get_slack_message_payload(

message = cast(Dict[str, Any], message)

if "AlarmName" in message:
notification = format_cloudwatch_alarm(message=message, region=region)
attachment = notification

elif (
isinstance(message, Dict) and message.get("detail-type") == "GuardDuty Finding"
):
notification = format_guardduty_finding(
message=message, region=message["region"]
)
attachment = notification

elif isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event":
notification = format_aws_health(message=message, region=message["region"])
attachment = notification

elif subject == "Notification from AWS Backup":
notification = format_aws_backup(message=str(message))
attachment = notification

elif "attachments" in message or "text" in message:
if "attachments" in message or "text" in message:
payload = {**payload, **message}

else:
attachment = format_default(message=message, subject=subject)
attachment = parse_notification(message, subject, region)

if attachment:
payload["attachments"] = [attachment] # type: ignore
Expand Down Expand Up @@ -453,6 +599,7 @@ def lambda_handler(event: Dict[str, Any], context: Dict[str, Any]) -> str:
:param context: lambda expected context object
:returns: none
"""

if os.environ.get("LOG_EVENTS", "False") == "True":
logging.info("Event logging enabled: %s", json.dumps(event))

Expand Down
10 changes: 9 additions & 1 deletion main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,22 @@ locals {
resources = [var.kms_key_arn]
}

lambda_policy_document_securityhub = {
sid = "AllowSecurityHub"
effect = "Allow"
actions = ["securityhub:BatchUpdateFindings"]
resources = ["*"]
}

lambda_handler = try(split(".", basename(var.lambda_source_path))[0], "notify_slack")
}

data "aws_iam_policy_document" "lambda" {
count = var.create ? 1 : 0

dynamic "statement" {
for_each = concat([local.lambda_policy_document], var.kms_key_arn != "" ? [local.lambda_policy_document_kms] : [])
for_each = concat([local.lambda_policy_document,
local.lambda_policy_document_securityhub], var.kms_key_arn != "" ? [local.lambda_policy_document_kms] : [])
content {
sid = statement.value.sid
effect = statement.value.effect
Expand Down