场景描述:

在云上环境中,随着业务发展和实例数量的增长,安全组规则往往会逐渐膨胀。许多历史遗留的 IP 白名单或端口规则可能已不再使用,但仍被保留在配置中。这不仅导致规则管理复杂,还可能带来潜在的安全风险。

为了实现精细化的安全管理,可以借助 VPC Flow Logs(VPC 流日志) 对网络流量进行分析,识别哪些安全组规则仍在被访问、哪些长期未使用,从而为规则清理提供数据依据。

方案概述:

为满足用户需求,我们将执行以下步骤:

  1. 开始VPC Flow Logs
  2. 将日志存储方式修改为S3
  3. 使用Athena进行日志分析
  4. 通过shell脚本获取安全组信息、Athena查询结果
  5. 使用Python进行数据分析并导出为execl格式

费用说明:

VPC Flow Logs Pricing

https://aws.amazon.com/cn/cloudwatch/pricing/

Athena

https://aws.amazon.com/cn/athena/pricing/

操作流程:

开启VPC Flow Logs日志

日志记录格式中需选择一下字段

${interface-id} ${srcport} ${dstport} ${protocol} ${bytes} ${packets} ${start} ${end} ${action} ${subnet-id} ${vpc-id} ${instance-id} ${pkt-dstaddr} ${pkt-srcaddr}

使用Athena进行日志分析

设置查询结果保存位置

创建数据库、表

# 创建数据库
CREATE DATABASE huami_vpc_flow_logs

# 创建数据表
CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs_parquet (
  version int,
  account_id string,
  interface_id string,
  srcaddr string,
  dstaddr string,
  srcport int,
  dstport int,
  protocol int,
  packets bigint,
  bytes bigint,
  start bigint,
  `end` bigint,
  action string,
  log_status string,
  vpc_id string,
  subnet_id string,
  instance_id string,
  tcp_flags int,
  type string,
  pkt_srcaddr string,
  pkt_dstaddr string
)
STORED AS PARQUET
LOCATION 's3://xing-huami-vpclog/AWSLogs/aws-account-id=893420598334/aws-service=vpcflowlogs/aws-region=us-east-1/'
TBLPROPERTIES (
  'classification'='parquet'
);


# Athena识别分区
MSCK REPAIR TABLE vpc_flow_logs_parquet;

# 测试查询语句
SELECT pkt_srcaddr, COUNT(*) AS hits
FROM vpc_flow_logs_parquet
WHERE action = 'ACCEPT'
  AND pkt_srcaddr NOT LIKE '10.%'
  AND pkt_srcaddr NOT LIKE '172.16.%'
  AND pkt_srcaddr NOT LIKE '192.168.%'
GROUP BY pkt_srcaddr
ORDER BY hits DESC
LIMIT 50;

EC2中操作

查询安全组相关信息

#!/bin/bash
set -e

# 安全组 ID (多个用空格分开)
SG_IDS=("sg-0fdc415d85c37c406" "sg-09c4c9941f40d6903")

echo "开始导出安全组信息..."

aws ec2 describe-security-groups \
  --group-ids ${SG_IDS[@]} \
  --query "SecurityGroups[*].{GroupId:GroupId,GroupName:GroupName,IpRanges:IpPermissions[*].IpRanges[*].CidrIp}" \
  --output json > security_groups.json

echo "安全组信息已保存到 security_groups.json"

调用Athena查询访问情况

#!/bin/bash
set -e

DB="huami_vpc_flow_logs"
OUTPUT="s3://xing-huami-vpclog/athena-results/"
QUERY="SELECT DISTINCT pkt_srcaddr FROM vpc_flow_logs_parquet WHERE action = 'ACCEPT' AND pkt_srcaddr NOT LIKE '10.%' AND pkt_srcaddr NOT LIKE '172.16.%' AND pkt_srcaddr NOT LIKE '192.168.%' AND from_unixtime(start) > current_timestamp - interval '30' day;"

echo "开始执行 Athena 查询..."

# 执行查询
QID=$(aws athena start-query-execution \
  --query-string "$QUERY" \
  --query-execution-context Database=$DB \
  --result-configuration OutputLocation=$OUTPUT \
  --query "QueryExecutionId" \
  --output text)

echo "查询已启动,ID: $QID"

#等待查询完成
STATE="RUNNING"
while [ "$STATE" == "RUNNING" ] || [ "$STATE" == "QUEUED" ]; do
  sleep 5
  STATE=$(aws athena get-query-execution --query-execution-id $QID --query "QueryExecution.Status.State" --output text)
  echo "当前状态: $STATE"
done

if [ "$STATE" == "SUCCEEDED" ]; then
  echo "查询成功,下载结果..."
  aws s3 cp ${OUTPUT}${QID}.csv ./used_ips.csv
  echo "已下载到 ./used_ips.csv"
else
  echo "查询失败: $STATE"
fi

使用Python过滤安全组中未使用网段

执行脚本前需要安装openpyxl依赖包

pip3 install openpyxl
import json
import csv
import ipaddress
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment
from datetime import datetime

# === 读取安全组配置 ===
with open('security_groups.json') as f:
    sg_data = json.load(f)

# === 读取访问过的IP ===
with open('used_ips.csv') as f:
    used_ips = [row[0].strip('"') for row in csv.reader(f) if row and row[0] != 'pkt_srcaddr']

used_ips = [ipaddress.ip_address(ip) for ip in used_ips if ip]

used_rules = []
unused_rules = []

# === 匹配安全组规则 ===
for sg in sg_data:
    sg_id = sg["GroupId"]
    sg_name = sg.get("GroupName", "")
    ip_ranges = sg.get("IpRanges", [])

    # 处理嵌套数组(每个元素可能是 ["1.2.3.4/32"])
    flat_ranges = []
    for entry in ip_ranges:
        if isinstance(entry, list):
            flat_ranges.extend(entry)
        elif isinstance(entry, str):
            flat_ranges.append(entry)

    for cidr in flat_ranges:
        try:
            network = ipaddress.ip_network(cidr)
            if any(ip in network for ip in used_ips):
                used_rules.append({
                    "SecurityGroupId": sg_id,
                    "GroupName": sg_name,
                    "CIDR": cidr
                })
            else:
                unused_rules.append({
                    "SecurityGroupId": sg_id,
                    "GroupName": sg_name,
                    "CIDR": cidr
                })
        except ValueError:
            continue

# === 创建 Excel 文件 ===
wb = Workbook()

# 已使用 sheet
ws_used = wb.active
ws_used.title = "已使用规则"
ws_used.append(["SecurityGroupId", "GroupName", "CIDR"])
for rule in used_rules:
    ws_used.append([rule["SecurityGroupId"], rule["GroupName"], rule["CIDR"]])

# 未使用 sheet
ws_unused = wb.create_sheet("未使用规则")
ws_unused.append(["SecurityGroupId", "GroupName", "CIDR"])
for rule in unused_rules:
    ws_unused.append([rule["SecurityGroupId"], rule["GroupName"], rule["CIDR"]])

# === 美化表头 ===
for ws in [ws_used, ws_unused]:
    for cell in ws[1]:
        cell.font = Font(bold=True)
        cell.alignment = Alignment(horizontal="center")
    for col in ws.columns:
        max_length = max(len(str(cell.value)) if cell.value else 0 for cell in col)
        ws.column_dimensions[col[0].column_letter].width = max_length + 2

# === 输出文件名带时间戳 ===
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"sg_analysis_{timestamp}.xlsx"

wb.save(output_file)
print(f"分析完成,结果已保存为 {output_file}")

将文件下载到本地查看

结语:

在现代云环境中,安全组管理是网络安全的重要组成部分。
通过将 VPC Flow Logs + Athena + 自动化分析脚本 结合使用,能够实现安全组规则的动态可视化管理:
不仅可以识别出实际使用的规则,也能精准发现长期未被命中的冗余配置,从而降低安全风险、简化维护工作。

在与 ECS、EC2、Auto Scaling 等架构结合时,这种方式同样适用。
即使在无法直接使用原生监控或审计功能的情况下,我们依然可以通过 日志分析与自定义告警仪表盘 的方式,实现同样高效、可追踪的安全治理目标。