Raspberry Pi Secure Web Gateway

Transparent download gate that holds, scans, and either releases or blocks files based on VirusTotal verdicts — in real time.
Raspberry Pi Mitmproxy Linux Alerting VirusTotal API Python Code

This project turns a Raspberry Pi into a Secure Web Gateway (SWG) that intercepts downloads, checks risky files with VirusTotal, and blocks malicious content before it reaches the endpoint. It runs as a mitmproxy add-on and implements a practical “hold → scan → release/deny” policy with caching and rate limiting — fast enough for everyday personal use.

Flow diagram: Client → Pi (mitmproxy add-on) → VirusTotal → Allow/Block → Client mitmdump running on the left. On the right, a dedicated console for files scanned and their results.

Overview

How it works

  1. Identify risky downloads. The add-on inspects response headers and filenames to decide if the file merits scanning (extensions like .exe, .zip; MIME hints like application/x-dosexec). If not risky, the file passes immediately.
  2. Buffer & hash. The response body is buffered, then hashed with SHA-256 — this becomes the stable key for cache, logs, and VT lookups.
  3. Check the cache. A local SQLite cache stores verdict and per-engine stats for 24 hours to avoid re-scanning popular files.
  4. VirusTotal fast path. Query the file report by hash. If present, parse the counts (malicious/suspicious/undetected), build a summary, and make a decision.
  5. Upload & short poll (unknowns). If unknown or undetected only, upload the file and poll for a short window. Fetch the final report and summarize.
  6. Decide. If any engine reports malicious/suspicious → DENY. Unknown/timeout → DENY. Otherwise → ALLOW.
  7. Record & present. Save a JSON summary (VT stats, timestamps, decision). Print a readable console block. If denied, return a HTML “Blocked” page with the reason and SHA-256.

Policy highlights

Logging & artifacts

Running it (transparent mode)

The add-on is a single Python file you pass to mitmdump in transparent mode. Set your API key as an environment variable or inside the script, and route HTTP/HTTPS traffic through the Pi (policy-enforced). Below are typical launch commands and a minimal iptables redirect pattern for a lab setup.

# Install dependencies
sudo apt update && sudo apt install -y mitmproxy python3-requests

# (Example) Redirect HTTP/HTTPS to mitmproxy on the Pi's interface
# HTTPS interception requires configuring trust/certs on clients.
sudo iptables -t nat -C PREROUTING -i wlan1 -p tcp -m multiport --dports 80,443 -j REDIRECT --to-ports 8080 \  || 
sudo iptables -t nat -A PREROUTING -i wlan1 -p tcp -m multiport --dports 80,443 -j REDIRECT --to-ports 8080

# Run mitmproxy in transparent mode with the gate
stdbuf -oL -eL sudo mitmdump -s vt_gate.py --mode transparent -p 8080 --listen-host 0.0.0.0 --showhost \
  2>&1 | sudo tee -a /var/log/pi_watcher/gate.log

#tail command to watch gate.log for vt scans
tail -F /var/log/pi_watcher/gate.log | awk '
/^=+$/ {print; show=!show; next}
show {print}
'

Testing malware samples

I test malware posted online and my own against the SWG. This is a a sample i found online. Downloading it on my MacBook, when the file finishes scanning, i get a blocked page and the file never reaches my device.

Placeholder screenshot of the gateway block page
“Download blocked” page that is shown when a file is blocked.

Testing malware that Windows Defender does not detect

This is a custom reverse shell code loader that is undetected by Windows Defender and can lead to complete control of the machine.

Placeholder screenshot of the gateway block page
“Download blocked” page that is shown when a file is blocked.

The Secure Web Gateway successfully blocks the download. Without the SWG, even having one of the most popular Anti-Virus could still lead to a victim machine being completely compromised with a few other tools available only. Windows Defender can be completely bypassed if this reverse shell is combined with an AMSI bypass which will allow any tool to run undetectable from memory.

Block and error pages

Denied downloads receive a simple HTML page stating that the download was blocked along with the following info: verdict, policy reason, SHA-256, filename, URL, client).

Placeholder screenshot of the gateway block page
“Download blocked” page that is shown when a file is blocked.

If the scanner is failing for any reason, files are blocked as a security measure.

A file blocked because the scanner failed.
“Download blocked” page but because the scanner failed.

mitmproxy addon python code

View mitmproxy add-on source

# vt_gate.py

from mitmproxy import http, ctx
import hashlib, os, time, json, sqlite3, re, threading
from datetime import datetime
from urllib.parse import urlparse, parse_qs, unquote

try:
    import requests
except Exception:
    requests = None

# ---------------- CONFIG ----------------
VT_API_KEY = "---API KEY---"  # your key
VT_RPM = 3                           # requests/min (3 for the limit)
VT_MAX_CONCURRENCY = 2               # max simultaneous VT operations (Free API limits)
VT_TIMEOUT = 20                      # per HTTP call timeout (s)
VT_POLL_TIMEOUT = 12                 # poll this long for analysis completion (s)
MIN_ENGINES_FOR_CLEAN = 60           # require at least this many responders to call harmless
CACHE_TTL_SECS = 24 * 3600           # reuse known verdicts for 24h
MAX_SCAN_BYTES = 32 * 1024 * 1024    # 32 MB cap
FAIL_OPEN = False                    # False = deny on internal errors/timeouts; True = allow

DROP_DIR = "/var/lib/proxy-drops"
VT_LOG_DIR = "/var/log/pi_watcher/vt"
CACHE_DB = "/var/lib/proxy-drops/vt_cache.db"

# Risky extensions — only scanned when:
#   - CD=attachment, or
#   - top-level navigation.
RISKY_EXTS = {
    # Windows executables & components
    ".exe", ".dll", ".sys", ".msi", ".msp", ".scr", ".com", ".cpl", ".ocx", ".drv", ".pif",
    # Windows scripts / PowerShell / misc
    ".bat", ".cmd", ".vbs", ".vbe", ".wsf", ".wsc", ".wsh", ".hta", ".lnk", ".reg",
    ".ps1", ".psm1", ".psd1",
    # Office/docs with active content or common exploit targets
    ".docm", ".xlsm", ".pptm", ".doc", ".xls", ".ppt", ".rtf",
    # Archives / installers / disk images
    ".zip", ".rar", ".7z", ".cab", ".iso", ".img", ".msix", ".msixbundle",
    ".apk", ".apks", ".xapk",
    # Linux/Unix & cross-platform
    ".sh", ".run", ".bin", ".elf", ".so", ".deb", ".rpm", ".jar", ".war", ".ear",
    # macOS
    ".dmg", ".pkg", ".command", ".kext",
}

# MIME prefixes/exacts to skip (page assets / safe-ish plumbing)
SKIP_MIME_PREFIXES = (
    "text/", "image/", "video/", "audio/", "font/",
)
SKIP_MIME_EXACT = {
    # web assets
    "application/javascript", "text/javascript", "application/json", "text/css",
    "application/font-woff", "application/font-woff2",
    # security plumbing / certs / OCSP / CRL / protobuf
    "application/x-protobuf",
    "application/ocsp-response",
    "application/x-x509-ca-cert", "application/pkix-cert", "application/pkix-crl",
    "application/pkcs7-mime", "application/pkcs7-signature",
}

# Sec-Fetch-Dest values that are page assets (skip)
DEST_SKIP = {"script", "style", "image", "font", "track", "embed", "object", "iframe", "worker", "manifest"}

# ----------------------------------------

os.makedirs(DROP_DIR, exist_ok=True)
os.makedirs(VT_LOG_DIR, exist_ok=True)

_vt_times = []  # sliding window timestamps for rate limiting
_vt_sem = threading.BoundedSemaphore(VT_MAX_CONCURRENCY)

def short(h): return (h or "")[:8]
def nowts(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def status(tag, ident, msg):
    ctx.log.info(f"[{nowts()}] [{tag}] #{short(ident)} {msg}")

def pretty_console(summary):
    h = summary.get("sha256","")
    size = summary.get("size")
    fkb = f"{size/1024:.0f} KB" if isinstance(size, int) else "?"
    sev = summary.get("severity","UNKNOWN")
    ftype = summary.get("filetype","?")
    client = summary.get("client","?")
    host = summary.get("host","?")
    url = summary.get("url","?")
    vt = summary.get("vt",{}) or {}
    stats = vt.get("stats",{}) or {}
    link = vt.get("permalink")
    src = vt.get("source") or "n/a"
    total = stats.get("_total", "?")

    lines = []
    lines.append("\n" + "="*72)
    lines.append(f"[{sev}] {short(h)} • {ftype} • {fkb}")
    lines.append(f"Client: {client}    Host: {host}")
    lines.append(f"URL: {url}")
    lines.append("")
    if vt.get("found") or vt.get("status")==200:
        lines.append(f"VT[{src}]: malicious={stats.get('malicious',0)}  suspicious={stats.get('suspicious',0)}  undetected={stats.get('undetected',0)}  harmless={stats.get('harmless',0)}  total={total}")
        fam = summary.get("family_hint")
        if fam:
            lines.append(f"Top family: {fam}")
        if link:
            lines.append(f"Permalink: {link}")
    else:
        lines.append("VT: (no data)")
    lines.append("")
    lines.append(f"Decision: {summary.get('decision','UNKNOWN')}  ({summary.get('policy_reason','')})")
    lines.append(f"Elapsed: {summary.get('elapsed','?')}s")
    lines.append("="*72 + "\n")
    for l in lines:
        ctx.log.info(l)

def sha256_bytes(b: bytes):
    h = hashlib.sha256()
    h.update(b)
    return h.hexdigest()

def vt_rate_gate():
    now = time.time()
    while _vt_times and (now - _vt_times[0] > 60.0):
        _vt_times.pop(0)
    if len(_vt_times) >= VT_RPM:
        wait = 60.0 - (now - _vt_times[0]) + 0.05
        time.sleep(max(0.0, min(wait, 30.0)))
    _vt_times.append(time.time())

def vt_http(method, url, **kw):
    """Wrapper for requests with rate limit + concurrency + basic backoff. Returns (resp or None)."""
    if not requests:
        return None
    with _vt_sem:
        vt_rate_gate()
        try:
            r = requests.request(method, url, timeout=VT_TIMEOUT, **kw)
        except Exception:
            return None
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(3)
            return None
        return r

def vt_file_report(sha):
    r = vt_http("GET", f"https://www.virustotal.com/api/v3/files/{sha}",
                headers={"x-apikey": VT_API_KEY})
    if not r:
        return None, None, "http/backoff"
    if r.status_code != 200:
        return None, r.status_code, r.text
    try:
        j = r.json()
    except Exception as e:
        return None, 200, f"json error: {e}"
    return j, 200, None

def vt_upload_and_poll(path, sha, status_cb):
    status_cb("uploading…")
    r = vt_http(
        "POST",
        "https://www.virustotal.com/api/v3/files",
        headers={"x-apikey": VT_API_KEY},
        files={"file": (os.path.basename(path), open(path, "rb"))},
    )
    if not r:
        return None, {"error": "upload http/backoff"}
    if r.status_code not in (200,201,202):
        time.sleep(2)
        return None, {"error": f"upload http {r.status_code}", "text": r.text}
    try:
        up = r.json()
    except Exception as e:
        up = {"error": f"upload json parse: {e}", "text": r.text}

    analysis_id = None
    if isinstance(up, dict):
        data = up.get("data") or {}
        analysis_id = data.get("id") or (data.get("attributes") or {}).get("id")

    analysis = None
    if analysis_id:
        t0 = time.time()
        while time.time() - t0 < VT_POLL_TIMEOUT:
            status_cb("polling…")
            r2 = vt_http("GET", f"https://www.virustotal.com/api/v3/analyses/{analysis_id}",
                         headers={"x-apikey": VT_API_KEY})
            if not r2 or r2.status_code != 200:
                break
            try:
                j2 = r2.json()
            except Exception:
                break
            st = (j2.get("data",{}).get("attributes",{}) or {}).get("status")
            if st == "completed":
                analysis = j2
                break
            time.sleep(2)

    status_cb("fetching report…")
    rep, code, err = vt_file_report(sha)
    return {"upload": up, "analysis": analysis, "report": rep, "code": code, "err": err}, None

def _extract_stats_from_analysis(analysis):
    if not analysis or not isinstance(analysis, dict):
        return {}, 0
    attr = (analysis.get("data", {}) or {}).get("attributes", {}) or {}
    stats = attr.get("stats", {}) or {}
    out = {k:int(stats.get(k,0)) for k in ("malicious","suspicious","undetected","harmless","timeout","failure")}
    total = sum(out.values())
    out["_total"] = total
    return out, total

def _extract_stats_from_report(rep):
    if not rep or not isinstance(rep, dict):
        return {}, 0
    attrs = (rep.get("data",{}) or {}).get("attributes",{}) or {}
    s = attrs.get("last_analysis_stats",{}) or {}
    out = {k:int(s.get(k,0)) for k in ("malicious","suspicious","undetected","harmless","timeout","failure")}
    total = sum(out.values())
    out["_total"] = total
    return out, total

def vt_summarize(sha, vt_bundle):
    rep = vt_bundle.get("report")
    analysis = vt_bundle.get("analysis")
    code = vt_bundle.get("code")

    a_stats, a_total = _extract_stats_from_analysis(analysis)
    r_stats, r_total = _extract_stats_from_report(rep)

    used = None
    stats = {}
    if a_total > 0:
        used = "analysis"
        stats = a_stats
    elif r_total > 0:
        used = "report"
        stats = r_stats
    else:
        return {"enabled": True, "found": False, "status": code, "verdict": "unknown", "source": "none"}

    verdict = "harmless"
    if stats.get("malicious",0) > 0:
        verdict = "malicious"
    elif stats.get("suspicious",0) > 0:
        verdict = "suspicious"
    elif stats.get("_total",0) < MIN_ENGINES_FOR_CLEAN:
        verdict = "unknown"

    attrs = ((rep or {}).get("data",{}) or {}).get("attributes",{}) if isinstance(rep, dict) else {}
    summary = {
        "enabled": True,
        "found": True,
        "status": 200 if used else code,
        "permalink": f"https://www.virustotal.com/gui/file/{sha}",
        "meaningful_name": attrs.get("meaningful_name") if isinstance(attrs, dict) else None,
        "reputation": attrs.get("reputation") if isinstance(attrs, dict) else None,
        "stats": stats,
        "file_attributes": {
            "type_description": (attrs or {}).get("type_description") if isinstance(attrs, dict) else None,
            "size": (attrs or {}).get("size") if isinstance(attrs, dict) else None,
            "first_submission_date": (attrs or {}).get("first_submission_date") if isinstance(attrs, dict) else None,
            "last_analysis_date": (attrs or {}).get("last_analysis_date") if isinstance(attrs, dict) else None,
        },
        "raw": vt_bundle,
        "verdict": verdict,
        "source": used or "none",
    }
    return summary

def save_vt_json(sha, vt_summary):
    try:
        p = os.path.join(VT_LOG_DIR, f"{sha}.json")
        with open(p, "w") as f:
            json.dump(vt_summary, f, indent=2)
    except Exception:
        pass

def block_page(flow, reason_text):
    html = f"""<!doctype html><html><head><meta charset="utf-8"><title>Download blocked</title>
<meta http-equiv="Cache-Control" content="no-store" />
<style>body{{font-family:system-ui,-apple-system,Segoe UI,Roboto,Ubuntu,sans-serif;background:#fafafa;color:#111}}
.card{{max-width:720px;margin:6vh auto;background:#fff;border-radius:12px;box-shadow:0 6px 20px rgba(0,0,0,.08);padding:1.25rem 1.5rem}}
pre{{white-space:pre-wrap;background:#f5f5f5;border-radius:8px;padding:1rem;overflow-x:auto}}
h2{{margin:.2rem 0 .6rem 0}} .muted{{color:#666;font-size:.9rem}}</style></head>
<body><div class="card">
<h2>Download blocked</h2>
<p class="muted">This file was blocked by the Secure Web Gate due to security policy.</p>
<pre>{reason_text}</pre>
</div></body></html>"""
    flow.response = http.Response.make(
        403,
        html.encode("utf-8"),
        {
            "Content-Type": "text/html; charset=utf-8",
            "Cache-Control": "no-store, max-age=0",
            "Pragma": "no-cache",
            "X-Blocked-By": "PiSWG",
        },
    )

def _get_candidate_filename(flow: http.HTTPFlow):
    cd = (flow.response.headers.get("content-disposition","") or "")
    has_attachment = "attachment" in cd.lower()

    # 1) Content-Disposition
    fn = None
    m = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', cd, flags=re.I)
    if m:
        fn = unquote(m.group(1))

    # 2) Path segment
    if not fn:
        pcs = flow.request.path_components or []
        if pcs:
            cand = pcs[-1]
            if cand and "." in cand:
                fn = cand

    # 3) Query parameters
    if not fn:
        q = parse_qs(urlparse(flow.request.pretty_url).query)
        for k in ("filename","file","name","download","attachment","attname"):
            vals = q.get(k) or []
            if vals:
                v = unquote(vals[-1]).strip('\'"')
                if "." in v:
                    fn = v
                    break

    ext = ""
    if fn and "." in fn:
        ext = "." + fn.rsplit(".", 1)[-1].lower()
    return fn or "", ext, has_attachment

def _is_top_level_navigation(flow: http.HTTPFlow):
    dest = (flow.request.headers.get("sec-fetch-dest","") or "").lower()
    mode = (flow.request.headers.get("sec-fetch-mode","") or "").lower()
    # Strict: must be a document navigation
    return dest == "document" and mode == "navigate"

def _is_asset_request(flow: http.HTTPFlow, ctype_lower: str):
    dest = (flow.request.headers.get("sec-fetch-dest","") or "").lower()
    if dest in DEST_SKIP:
        return f"sec-fetch-dest={dest}"
    if ctype_lower:
        if ctype_lower in SKIP_MIME_EXACT:
            return f"mime={ctype_lower}"
        for pref in SKIP_MIME_PREFIXES:
            if ctype_lower.startswith(pref):
                return f"mime={ctype_lower}"
    return None

def should_scan(flow: http.HTTPFlow):
    """Apply download triggers & skips. Return (True/False, reason_string, filename, ext)."""
    resp = flow.response
    req = flow.request
    ctype = (resp.headers.get("content-type","") or "").lower()

    # Skip obvious page assets early
    skip = _is_asset_request(flow, ctype)
    if skip:
        return (False, f"skip asset ({skip})", "", "")

    # Extract filename/extension and Content-Disposition flag
    filename, ext, has_attachment = _get_candidate_filename(flow)

    # Primary triggers:
    # A) Content-Disposition: attachment => scan
    if has_attachment:
        return (True, "trigger=cd:attachment", filename, ext)

    # B) Top-level navigation + risky extension => scan
    if _is_top_level_navigation(flow) and ext in RISKY_EXTS:
        return (True, "trigger=top-level+risky-ext", filename, ext)

    # (Removed) C) risky-ext anywhere
    # (Removed) D) top-level+binary-ctype heuristic

    return (False, "no trigger matched", filename, ext)

class VTGate:
    def load(self, loader):
        if not requests or not VT_API_KEY:
            ctx.log.warn("VT disabled: 'requests' missing or API key empty. (Set both!)")
        ctx.log.info(
            f"VT gate ready. MAX_SCAN_BYTES={MAX_SCAN_BYTES} bytes; VT_RPM={VT_RPM}/min; "
            f"poll={VT_POLL_TIMEOUT}s; min_engines={MIN_ENGINES_FOR_CLEAN}; concurrency={VT_MAX_CONCURRENCY}"
        )

    def _cache_conn(self):
        con = sqlite3.connect(CACHE_DB)
        con.execute("CREATE TABLE IF NOT EXISTS cache (hash TEXT PRIMARY KEY, verdict TEXT, stats TEXT, ts INTEGER)")
        return con

    def _cache_get(self, sha):
        try:
            con = self._cache_conn()
            cur = con.execute("SELECT verdict, stats, ts FROM cache WHERE hash=?", (sha,))
            row = cur.fetchone()
            con.close()
            if not row: return None
            verdict, stats_json, ts = row
            if time.time() - ts > CACHE_TTL_SECS:
                return None
            stats = json.loads(stats_json) if stats_json else {}
            return {"verdict": verdict, "stats": stats}
        except Exception:
            return None

    def _cache_put(self, sha, verdict, stats):
        if verdict == "unknown":
            return
        try:
            con = self._cache_conn()
            con.execute("INSERT OR REPLACE INTO cache(hash, verdict, stats, ts) VALUES(?,?,?,?)",
                        (sha, verdict, json.dumps(stats or {}), int(time.time())))
            con.commit()
            con.close()
        except Exception:
            pass

    def response(self, flow: http.HTTPFlow):
        # Consider successful responses we can read (200 or 206 partials)
        if not flow.response or flow.response.status_code not in (200, 206):
            return

        # Apply triggers/skips
        scan, why, filename, ext = should_scan(flow)
        if not scan:
            status("SKIP", "----", f"{why} ct={flow.response.headers.get('content-type','?')} url={flow.request.pretty_url}")
            return

        # Basic metadata
        try:
            client = f"{flow.client_conn.address[0]}:{flow.client_conn.address[1]}"
        except Exception:
            client = "unknown"

        url = flow.request.pretty_url
        host = flow.request.host
        ct_hdr = flow.response.headers.get("content-type", "?")
        length_hdr = flow.response.headers.get("content-length")
        size_hdr = int(length_hdr) if (length_hdr and length_hdr.isdigit()) else None

        # Enforce size cap early
        if size_hdr and size_hdr > MAX_SCAN_BYTES:
            reason = f"oversize header: {size_hdr} > {MAX_SCAN_BYTES}"
            status("DENY", "oversize", f"{reason} url={url}")
            block_page(flow, f"Policy: size cap exceeded.\n{reason}\nURL: {url}\nClient: {client}")
            return

        t0 = time.time()
        status("INFO", "pending", f"buffering body… ({why}) filename={filename or '-'} ext={ext or '-'} ct={ct_hdr} url={url}")
        body = flow.response.get_content(strict=False) or b""
        size = len(body)
        if size > MAX_SCAN_BYTES:
            reason = f"oversize body: {size} > {MAX_SCAN_BYTES}"
            status("DENY", "oversize", f"{reason} url={url}")
            block_page(flow, f"Policy: size cap exceeded.\n{reason}\nURL: {url}\nClient: {client}")
            return

        sha = sha256_bytes(body)
        status("INFO", sha, f"hash={sha} size={size}")

        # write temp drop (for audit)
        tmp_path = os.path.join(DROP_DIR, f"{sha}.{int(time.time())}")
        try:
            with open(tmp_path, "wb") as f:
                f.write(body)
        except Exception as e:
            status("WARN", sha, f"could not write temp file: {e}")

        vt_summary = None
        decision = "ALLOW"
        policy_reason = "harmless"
        family_hint = None

        try:
            # cache
            cached = self._cache_get(sha)
            if cached:
                stats_c = cached.get("stats") or {}
                verdict_c = cached.get("verdict") or "unknown"
                status("INFO", sha, f"cache hit: {verdict_c} (mal={stats_c.get('malicious',0)} susp={stats_c.get('suspicious',0)} total={stats_c.get('_total','?')})")
                if verdict_c == "harmless":
                    rep, code, err = vt_file_report(sha)
                    if code == 200 and isinstance(rep, dict):
                        bundle = {"upload": None, "analysis": None, "report": rep, "code": code, "err": err}
                        vt_summary = vt_summarize(sha, bundle)
                        if vt_summary.get("verdict") in ("malicious","suspicious","unknown"):
                            status("INFO", sha, f"cache override after recheck: {vt_summary['verdict']}")
                        else:
                            vt_summary["raw"] = {"source": "cache"}
                    else:
                        vt_summary = {"enabled": True, "found": False, "status": code, "verdict": "unknown", "source": "cache-recheck-failed"}
                else:
                    vt_summary = {
                        "enabled": True, "found": True, "status": 200,
                        "permalink": f"https://www.virustotal.com/gui/file/{sha}",
                        "stats": {k:int(stats_c.get(k,0)) for k in ("malicious","suspicious","undetected","harmless","timeout","failure")},
                        "verdict": verdict_c, "source": "cache",
                    }

            if not vt_summary or vt_summary.get("verdict") == "unknown":
                if not requests or not VT_API_KEY:
                    raise RuntimeError("VT not available (requests missing or API key empty)")

                # 1) Hash lookup
                status("VT", sha, "lookup…")
                rep, code, err = vt_file_report(sha)
                bundle = {"upload": None, "analysis": None, "report": rep, "code": code, "err": err}
                vt_summary = vt_summarize(sha, bundle)

                # 2) If unknown or undetected, upload + poll
                stats = (vt_summary or {}).get("stats", {})
                if vt_summary.get("verdict") in ("unknown","harmless") and stats.get("malicious",0)==0 and stats.get("suspicious",0)==0:
                    status("VT", sha, "upload (unknown/undetected)…")
                    upres, up_err = vt_upload_and_poll(tmp_path, sha, lambda m: status("VT", sha, m))
                    if up_err:
                        raise RuntimeError(f"VT upload/poll error: {up_err}")
                    vt_summary = vt_summarize(sha, upres)

            # decision policy (fail-closed on zeros/unknown)
            stats = (vt_summary or {}).get("stats",{})
            total = int(stats.get("_total",0))
            mal = int(stats.get("malicious",0))
            susp = int(stats.get("suspicious",0))
            if (mal > 0) or (susp > 0):
                decision = "DENY"
                policy_reason = "VT: malicious/suspicious"
            elif vt_summary.get("verdict") == "harmless" and total >= MIN_ENGINES_FOR_CLEAN:
                decision = "ALLOW"
                policy_reason = "harmless"
            else:
                decision = "DENY"
                policy_reason = "timeout/unknown/min-engines"

            # cache store (don’t cache unknown)
            if vt_summary and vt_summary.get("status")==200 and vt_summary.get("verdict") != "unknown":
                self._cache_put(sha, vt_summary.get("verdict","unknown"), vt_summary.get("stats",{}))

            # save full VT JSON
            if vt_summary:
                save_vt_json(sha, vt_summary)

            # family hint from report (best-effort)
            try:
                rep = ((vt_summary or {}).get("raw") or {}).get("report") or {}
                res = (rep.get("data",{}).get("attributes",{}).get("last_analysis_results",{}) or {})
                for v in ("Microsoft","Kaspersky","BitDefender","ESET-NOD32","CrowdStrike","Sophos","Avast","Fortinet"):
                    if v in res and res[v].get("result"):
                        family_hint = res[v]["result"]
                        break
            except Exception:
                pass

        except Exception as e:
            status("ERROR", sha, f"scan error: {e}")
            decision = "ALLOW" if FAIL_OPEN else "DENY"
            policy_reason = "scanner_error (fail-open)" if FAIL_OPEN else "scanner_error (fail-closed)"
            vt_summary = vt_summary or {"enabled": bool(requests and VT_API_KEY), "error": str(e), "found": False}

        elapsed = f"{(time.time()-t0):.1f}"

        # Finalize: allow or block
        summary = {
            "sha256": sha,
            "size": size,
            "filetype": (vt_summary or {}).get("file_attributes",{}).get("type_description") or (ext or "unknown"),
            "client": client,
            "host": host,
            "url": url,
            "vt": vt_summary,
            "decision": decision,
            "policy_reason": policy_reason,
            "elapsed": elapsed,
            "family_hint": family_hint,
            "severity": "HIGH" if decision=="DENY" else "LOW",
        }
        pretty_console(summary)

        if decision == "DENY":
            block_page(
                flow,
                f"{policy_reason}\n"
                f"SHA256: {sha}\n"
                f"Filename: {filename or '-'}\n"
                f"Content-Type: {ct_hdr}\n"
                f"URL: {url}\n"
                f"Client: {client}"
            )

addons = [VTGate()]