Vorschau
/security-log-analyzer-raspberry-pi-linux.py
#!/usr/bin/env python3
import os
import re
from datetime import datetime, timedelta
from collections import defaultdict
import subprocess
# ----------------------------
# CONFIG
# ----------------------------
AUTH_LOG = "/var/log/auth.log.1"
FAIL2BAN_LOG = "/var/log/fail2ban.log"
UFW_LOG = "/var/log/ufw.log.1"
OUTPUT_DIR = os.path.expanduser("~/security_reports")
DATE = datetime.now().strftime("%Y-%m-%d")
REPORT_FILE = f"security_report_{DATE}.txt"
OUTPUT_PATH = os.path.join(OUTPUT_DIR, REPORT_FILE)
os.makedirs(OUTPUT_DIR, exist_ok=True)
TARGET_DATE = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
REPORT_RETENTION_DAYS = 30
# IPv4 regex (general)
IPV4_REGEX = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
# UFW source IP extractor
UFW_SRC_REGEX = re.compile(r"SRC=([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)")
# ----------------------------
# HELPERS
# ----------------------------
def is_private_or_local_ip(ip):
return (
ip.startswith("10.")
or ip.startswith("127.")
or ip.startswith("169.254.")
or ip.startswith("192.168.")
or ip.startswith("224.") # multicast
or ip.startswith("172.16.")
or ip.startswith("172.17.")
or ip.startswith("172.18.")
or ip.startswith("172.19.")
or ip.startswith("172.20.")
or ip.startswith("172.21.")
or ip.startswith("172.22.")
or ip.startswith("172.23.")
or ip.startswith("172.24.")
or ip.startswith("172.25.")
or ip.startswith("172.26.")
or ip.startswith("172.27.")
or ip.startswith("172.28.")
or ip.startswith("172.29.")
or ip.startswith("172.30.")
or ip.startswith("172.31.")
)
def delete_old_reports():
now = datetime.now()
deleted = []
for filename in os.listdir(OUTPUT_DIR):
if not filename.startswith("security_report_") or not filename.endswith(".txt"):
continue
full_path = os.path.join(OUTPUT_DIR, filename)
if not os.path.isfile(full_path):
continue
file_age = now - datetime.fromtimestamp(os.path.getmtime(full_path))
if file_age > timedelta(days=REPORT_RETENTION_DAYS):
try:
os.remove(full_path)
deleted.append(filename)
except Exception as e:
print(f"[WARN] Could not delete old report {filename}: {e}")
if deleted:
print(f"[OK] Deleted old reports: {', '.join(deleted)}")
else:
print("[OK] No old reports to delete.")
# ----------------------------
# AUTH LOG ANALYSIS
# ----------------------------
def parse_auth_log():
ip_attempts = defaultdict(int)
ip_success = defaultdict(int)
protocols = defaultdict(set)
if not os.path.exists(AUTH_LOG):
return "Auth log not found.\n"
with open(AUTH_LOG, "r", errors="ignore") as f:
for line in f:
if TARGET_DATE not in line:
continue
ips = IPV4_REGEX.findall(line)
if not ips:
continue
ip = ips[0]
# failed login attempts
if "Failed password" in line:
ip_attempts[ip] += 1
protocols[ip].add("sshd")
# successful login
if "Accepted password" in line or "Accepted publickey" in line:
ip_success[ip] += 1
protocols[ip].add("sshd")
output = ["[AUTH LOG ANALYSIS]\n"]
if not ip_attempts:
output.append("No failed SSH login attempts found.")
else:
for ip, count in sorted(ip_attempts.items(), key=lambda x: x[1], reverse=True):
proto = ", ".join(protocols[ip]) if protocols[ip] else "unknown"
output.append(f"{ip} -> {count} failed attempts via {proto}")
output.append("\n[Successful Logins]")
if not ip_success:
output.append("No successful SSH logins found.")
else:
for ip, count in sorted(ip_success.items(), key=lambda x: x[1], reverse=True):
output.append(f"{ip} -> {count} successful logins")
return "\n".join(output) + "\n"
# ----------------------------
# FAIL2BAN ANALYSIS
# ----------------------------
def parse_fail2ban():
output = ["[FAIL2BAN ANALYSIS]\n"]
try:
cmd = [
"journalctl",
"-u",
"fail2ban",
"--since",
TARGET_DATE,
"--no-pager"
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=5
)
bans = []
for line in result.stdout.splitlines():
if " Ban " in line:
ips = IPV4_REGEX.findall(line)
if ips:
bans.append(ips[0])
if not bans:
output.append("Fail2Ban active. No bans detected on target date.")
else:
for ip in sorted(set(bans)):
output.append(f"Banned IP: {ip}")
except Exception as e:
output.append(f"Fail2Ban journal read failed: {e}")
return "\n".join(output) + "\n"
# ----------------------------
# UFW ANALYSIS
# ----------------------------
def parse_ufw():
if not os.path.exists(UFW_LOG):
return "UFW log not found.\n"
blocked_ips = []
with open(UFW_LOG, "r", errors="ignore") as f:
for line in f:
if TARGET_DATE not in line:
continue
if "BLOCK" not in line and "DROP" not in line:
continue
match = UFW_SRC_REGEX.search(line)
if not match:
continue
ip = match.group(1)
if is_private_or_local_ip(ip):
continue
blocked_ips.append(ip)
output = ["[UFW ANALYSIS]\n"]
if not blocked_ips:
output.append("No external blocked traffic detected.")
else:
for ip in sorted(set(blocked_ips)):
output.append(f"Blocked IP: {ip}")
return "\n".join(output) + "\n"
# ----------------------------
# REPORT GENERATION
# ----------------------------
def generate_report():
report = []
report.append(f"SECURITY REPORT - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
report.append(f"ANALYZED DATE - {TARGET_DATE}\n")
report.append("=" * 60 + "\n")
report.append(parse_auth_log())
report.append(parse_fail2ban())
report.append(parse_ufw())
report.append("\nEND OF REPORT\n")
return "\n".join(report)
# ----------------------------
# MAIN
# ----------------------------
def main():
delete_old_reports()
report = generate_report()
with open(OUTPUT_PATH, "w", encoding="utf-8-sig") as f:
f.write(report)
print(f"[OK] Security report saved to: {OUTPUT_PATH}")
if __name__ == "__main__":
main()