Vorschau

/security-log-analyzer-raspberry-pi-linux.py

#!/usr/bin/env python3
import os
import re
from datetime import datetime, timedelta
from collections import defaultdict
import subprocess

# ----------------------------
# CONFIG
# ----------------------------

AUTH_LOG = "/var/log/auth.log.1"
FAIL2BAN_LOG = "/var/log/fail2ban.log"
UFW_LOG = "/var/log/ufw.log.1"

OUTPUT_DIR = os.path.expanduser("~/security_reports")

DATE = datetime.now().strftime("%Y-%m-%d")
REPORT_FILE = f"security_report_{DATE}.txt"
OUTPUT_PATH = os.path.join(OUTPUT_DIR, REPORT_FILE)

os.makedirs(OUTPUT_DIR, exist_ok=True)

TARGET_DATE = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

REPORT_RETENTION_DAYS = 30

# IPv4 regex (general)
IPV4_REGEX = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")

# UFW source IP extractor
UFW_SRC_REGEX = re.compile(r"SRC=([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)")


# ----------------------------
# HELPERS
# ----------------------------

def is_private_or_local_ip(ip):
    return (
        ip.startswith("10.")
        or ip.startswith("127.")
        or ip.startswith("169.254.")
        or ip.startswith("192.168.")
        or ip.startswith("224.")   # multicast
        or ip.startswith("172.16.")
        or ip.startswith("172.17.")
        or ip.startswith("172.18.")
        or ip.startswith("172.19.")
        or ip.startswith("172.20.")
        or ip.startswith("172.21.")
        or ip.startswith("172.22.")
        or ip.startswith("172.23.")
        or ip.startswith("172.24.")
        or ip.startswith("172.25.")
        or ip.startswith("172.26.")
        or ip.startswith("172.27.")
        or ip.startswith("172.28.")
        or ip.startswith("172.29.")
        or ip.startswith("172.30.")
        or ip.startswith("172.31.")
    )


def delete_old_reports():
    now = datetime.now()
    deleted = []

    for filename in os.listdir(OUTPUT_DIR):
        if not filename.startswith("security_report_") or not filename.endswith(".txt"):
            continue

        full_path = os.path.join(OUTPUT_DIR, filename)

        if not os.path.isfile(full_path):
            continue

        file_age = now - datetime.fromtimestamp(os.path.getmtime(full_path))

        if file_age > timedelta(days=REPORT_RETENTION_DAYS):
            try:
                os.remove(full_path)
                deleted.append(filename)
            except Exception as e:
                print(f"[WARN] Could not delete old report {filename}: {e}")

    if deleted:
        print(f"[OK] Deleted old reports: {', '.join(deleted)}")
    else:
        print("[OK] No old reports to delete.")


# ----------------------------
# AUTH LOG ANALYSIS
# ----------------------------

def parse_auth_log():
    ip_attempts = defaultdict(int)
    ip_success = defaultdict(int)
    protocols = defaultdict(set)

    if not os.path.exists(AUTH_LOG):
        return "Auth log not found.\n"

    with open(AUTH_LOG, "r", errors="ignore") as f:
        for line in f:
            if TARGET_DATE not in line:
                continue

            ips = IPV4_REGEX.findall(line)
            if not ips:
                continue

            ip = ips[0]

            # failed login attempts
            if "Failed password" in line:
                ip_attempts[ip] += 1
                protocols[ip].add("sshd")

            # successful login
            if "Accepted password" in line or "Accepted publickey" in line:
                ip_success[ip] += 1
                protocols[ip].add("sshd")

    output = ["[AUTH LOG ANALYSIS]\n"]

    if not ip_attempts:
        output.append("No failed SSH login attempts found.")
    else:
        for ip, count in sorted(ip_attempts.items(), key=lambda x: x[1], reverse=True):
            proto = ", ".join(protocols[ip]) if protocols[ip] else "unknown"
            output.append(f"{ip} -> {count} failed attempts via {proto}")

    output.append("\n[Successful Logins]")
    if not ip_success:
        output.append("No successful SSH logins found.")
    else:
        for ip, count in sorted(ip_success.items(), key=lambda x: x[1], reverse=True):
            output.append(f"{ip} -> {count} successful logins")

    return "\n".join(output) + "\n"


# ----------------------------
# FAIL2BAN ANALYSIS
# ----------------------------

def parse_fail2ban():
    output = ["[FAIL2BAN ANALYSIS]\n"]

    try:
        cmd = [
            "journalctl",
            "-u",
            "fail2ban",
            "--since",
            TARGET_DATE,
            "--no-pager"
        ]

        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=5
        )

        bans = []

        for line in result.stdout.splitlines():
            if " Ban " in line:
                ips = IPV4_REGEX.findall(line)
                if ips:
                    bans.append(ips[0])

        if not bans:
            output.append("Fail2Ban active. No bans detected on target date.")
        else:
            for ip in sorted(set(bans)):
                output.append(f"Banned IP: {ip}")

    except Exception as e:
        output.append(f"Fail2Ban journal read failed: {e}")

    return "\n".join(output) + "\n"


# ----------------------------
# UFW ANALYSIS
# ----------------------------

def parse_ufw():
    if not os.path.exists(UFW_LOG):
        return "UFW log not found.\n"

    blocked_ips = []

    with open(UFW_LOG, "r", errors="ignore") as f:
        for line in f:
            if TARGET_DATE not in line:
                continue

            if "BLOCK" not in line and "DROP" not in line:
                continue

            match = UFW_SRC_REGEX.search(line)
            if not match:
                continue

            ip = match.group(1)

            if is_private_or_local_ip(ip):
                continue

            blocked_ips.append(ip)

    output = ["[UFW ANALYSIS]\n"]

    if not blocked_ips:
        output.append("No external blocked traffic detected.")
    else:
        for ip in sorted(set(blocked_ips)):
            output.append(f"Blocked IP: {ip}")

    return "\n".join(output) + "\n"


# ----------------------------
# REPORT GENERATION
# ----------------------------

def generate_report():
    report = []

    report.append(f"SECURITY REPORT - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
    report.append(f"ANALYZED DATE    - {TARGET_DATE}\n")
    report.append("=" * 60 + "\n")

    report.append(parse_auth_log())
    report.append(parse_fail2ban())
    report.append(parse_ufw())

    report.append("\nEND OF REPORT\n")

    return "\n".join(report)


# ----------------------------
# MAIN
# ----------------------------

def main():
    delete_old_reports()

    report = generate_report()

    with open(OUTPUT_PATH, "w", encoding="utf-8-sig") as f:
        f.write(report)

    print(f"[OK] Security report saved to: {OUTPUT_PATH}")


if __name__ == "__main__":
    main()