Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
import json
import csv
import ipaddress
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment
from datetime import datetime

# === 读取安全组配置 ===
with open('security_groups.json') as f:
    sg_data = json.load(f)

# === 读取访问过的IP ===
with open('used_ips.csv') as f:
    used_ips = [row[0].strip('"') for row in csv.reader(f) if row and row[0] != 'pkt_srcaddr']

used_ips = [ipaddress.ip_address(ip) for ip in used_ips if ip]

used_rules = []
unused_rules = []

# === 匹配安全组规则 ===
for sg in sg_data:
    sg_id = sg["GroupId"]
    sg_name = sg.get("GroupName", "")
    ip_ranges = sg.get("IpRanges", [])

    # 处理嵌套数组(每个元素可能是 ["1.2.3.4/32"])
    flat_ranges = []
    for entry in ip_ranges:
        if isinstance(entry, list):
            flat_ranges.extend(entry)
        elif isinstance(entry, str):
            flat_ranges.append(entry)

    for cidr in flat_ranges:
        try:
            network = ipaddress.ip_network(cidr)
            if any(ip in network for ip in used_ips):
                used_rules.append({
                    "SecurityGroupId": sg_id,
                    "GroupName": sg_name,
                    "CIDR": cidr
                })
            else:
                unused_rules.append({
                    "SecurityGroupId": sg_id,
                    "GroupName": sg_name,
                    "CIDR": cidr
                })
        except ValueError:
            continue

# === 创建 Excel 文件 ===
wb = Workbook()

# 已使用 sheet
ws_used = wb.active
ws_used.title = "已使用规则"
ws_used.append(["SecurityGroupId", "GroupName", "CIDR"])
for rule in used_rules:
    ws_used.append([rule["SecurityGroupId"], rule["GroupName"], rule["CIDR"]])

# 未使用 sheet
ws_unused = wb.create_sheet("未使用规则")
ws_unused.append(["SecurityGroupId", "GroupName", "CIDR"])
for rule in unused_rules:
    ws_unused.append([rule["SecurityGroupId"], rule["GroupName"], rule["CIDR"]])

# === 美化表头 ===
for ws in [ws_used, ws_unused]:
    for cell in ws[1]:
        cell.font = Font(bold=True)
        cell.alignment = Alignment(horizontal="center")
    for col in ws.columns:
        max_length = max(len(str(cell.value)) if cell.value else 0 for cell in col)
        ws.column_dimensions[col[0].column_letter].width = max_length + 2

# === 输出文件名带时间戳 ===
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"sg_analysis_{timestamp}.xlsx"

wb.save(output_file)
print(f"分析完成,结果已保存为 {output_file}")

将文件下载到本地查看

结语:

在现代云环境中,安全组管理是网络安全的重要组成部分。
通过将 VPC Flow Logs + Athena + 自动化分析脚本 结合使用,能够实现安全组规则的动态可视化管理:
不仅可以识别出实际使用的规则,也能精准发现长期未被命中的冗余配置,从而降低安全风险、简化维护工作。

...