Raspberry Pi Secure Web Gateway
This project turns a Raspberry Pi into a Secure Web Gateway (SWG) that intercepts downloads, checks risky files with VirusTotal, and blocks malicious content before it reaches the endpoint. It runs as a mitmproxy add-on and implements a practical “hold → scan → release/deny” policy with caching and rate limiting — fast enough for everyday personal use.
Overview
- Targeted scanning: Only risky file types are intercepted by extension/MIME (e.g.,
.exe,.dll,.msi,.jar, archives, scripts). - Size cap: Files up to
32 MBare scanned (policy default). Oversize downloads are denied with a clear message. (VirusTotal accepts max 32MB) - Fast path: Hash (SHA-256) lookup on VirusTotal first. If known, the verdict is instant.
- Unknown path: Upload the file and short-poll analysis for a quick verdict; fall back to policy (DENY) if analysis isn’t ready.
- Decision policy: Deny if any engine marks malicious or suspicious. Deny on timeouts or scanner errors (as a security measure).
- Cache & rate limit: 24h verdict cache (SQLite) and a conservative 4 requests/minute gate to VT (Limited by VT because of the free API).
- Auditability: Per-file JSON full analysis summaries saved on disk; console prints a readable multi-line decision log for a quick read.
How it works
- Identify risky downloads. The add-on inspects response headers and filenames to decide if the file merits scanning (extensions like
.exe,.zip; MIME hints likeapplication/x-dosexec). If not risky, the file passes immediately. - Buffer & hash. The response body is buffered, then hashed with SHA-256 — this becomes the stable key for cache, logs, and VT lookups.
- Check the cache. A local SQLite cache stores verdict and per-engine stats for 24 hours to avoid re-scanning popular files.
- VirusTotal fast path. Query the file report by hash. If present, parse the counts (malicious/suspicious/undetected), build a summary, and make a decision.
- Upload & short poll (unknowns). If unknown or undetected only, upload the file and poll for a short window. Fetch the final report and summarize.
- Decide. If any engine reports malicious/suspicious → DENY. Unknown/timeout → DENY. Otherwise → ALLOW.
- Record & present. Save a JSON summary (VT stats, timestamps, decision). Print a readable console block. If denied, return a HTML “Blocked” page with the reason and SHA-256.
Policy highlights
- Risky file filters: extensions include (
.exe,.dll,.msi,.vbs,.ps1,.jar,.bat,.scr,.apk,.elf, archives like.zip/.rar/.7z, and macro-enabled docs). MIME hints include PE, JAR, ZIP, 7z, CAB,application/octet-stream. - Max scan size:
32 MB. Larger files are denied with a size-cap message. - Rate limiting: 4 VirusTotal requests per minute; extra calls wait before sending.
- Cache TTL: 24 hours (SQLite file on disk, set to 24 for testing purposes).
- Fail-open/closed: By default, scanner errors result in deny. Letting files pass if the scanner fails is a huge security problem.
Logging & artifacts
- Drop dir: temporary file copies for audit (e.g.,
/var/lib/proxy-drops/). - Verdict cache:
/var/lib/proxy-drops/vt_cache.db(SQLite). - VirusTotal summaries: per-file JSON in
/var/log/pi_watcher/vt/with stats and a permalink. - Console summaries: Easy to read blocks to respond quickly with severity, file type, client, URL, stats, decision, and elapsed time.
Running it (transparent mode)
The add-on is a single Python file you pass to mitmdump in transparent mode. Set your API key as an environment variable or inside the script, and route HTTP/HTTPS traffic through the Pi (policy-enforced). Below are typical launch commands and a minimal iptables redirect pattern for a lab setup.
# Install dependencies
sudo apt update && sudo apt install -y mitmproxy python3-requests
# (Example) Redirect HTTP/HTTPS to mitmproxy on the Pi's interface
# HTTPS interception requires configuring trust/certs on clients.
sudo iptables -t nat -C PREROUTING -i wlan1 -p tcp -m multiport --dports 80,443 -j REDIRECT --to-ports 8080 \ ||
sudo iptables -t nat -A PREROUTING -i wlan1 -p tcp -m multiport --dports 80,443 -j REDIRECT --to-ports 8080
# Run mitmproxy in transparent mode with the gate
stdbuf -oL -eL sudo mitmdump -s vt_gate.py --mode transparent -p 8080 --listen-host 0.0.0.0 --showhost \
2>&1 | sudo tee -a /var/log/pi_watcher/gate.log
#tail command to watch gate.log for vt scans
tail -F /var/log/pi_watcher/gate.log | awk '
/^=+$/ {print; show=!show; next}
show {print}
'
Testing malware samples
I test malware posted online and my own against the SWG. This is a a sample i found online. Downloading it on my MacBook, when the file finishes scanning, i get a blocked page and the file never reaches my device.
Testing malware that Windows Defender does not detect
This is a custom reverse shell code loader that is undetected by Windows Defender and can lead to complete control of the machine.
The Secure Web Gateway successfully blocks the download. Without the SWG, even having one of the most popular Anti-Virus could still lead to a victim machine being completely compromised with a few other tools available only. Windows Defender can be completely bypassed if this reverse shell is combined with an AMSI bypass which will allow any tool to run undetectable from memory.
Block and error pages
Denied downloads receive a simple HTML page stating that the download was blocked along with the following info: verdict, policy reason, SHA-256, filename, URL, client).
If the scanner is failing for any reason, files are blocked as a security measure.
mitmproxy addon python code
View mitmproxy add-on source
# vt_gate.py
from mitmproxy import http, ctx
import hashlib, os, time, json, sqlite3, re, threading
from datetime import datetime
from urllib.parse import urlparse, parse_qs, unquote
try:
import requests
except Exception:
requests = None
# ---------------- CONFIG ----------------
VT_API_KEY = "---API KEY---" # your key
VT_RPM = 3 # requests/min (3 for the limit)
VT_MAX_CONCURRENCY = 2 # max simultaneous VT operations (Free API limits)
VT_TIMEOUT = 20 # per HTTP call timeout (s)
VT_POLL_TIMEOUT = 12 # poll this long for analysis completion (s)
MIN_ENGINES_FOR_CLEAN = 60 # require at least this many responders to call harmless
CACHE_TTL_SECS = 24 * 3600 # reuse known verdicts for 24h
MAX_SCAN_BYTES = 32 * 1024 * 1024 # 32 MB cap
FAIL_OPEN = False # False = deny on internal errors/timeouts; True = allow
DROP_DIR = "/var/lib/proxy-drops"
VT_LOG_DIR = "/var/log/pi_watcher/vt"
CACHE_DB = "/var/lib/proxy-drops/vt_cache.db"
# Risky extensions — only scanned when:
# - CD=attachment, or
# - top-level navigation.
RISKY_EXTS = {
# Windows executables & components
".exe", ".dll", ".sys", ".msi", ".msp", ".scr", ".com", ".cpl", ".ocx", ".drv", ".pif",
# Windows scripts / PowerShell / misc
".bat", ".cmd", ".vbs", ".vbe", ".wsf", ".wsc", ".wsh", ".hta", ".lnk", ".reg",
".ps1", ".psm1", ".psd1",
# Office/docs with active content or common exploit targets
".docm", ".xlsm", ".pptm", ".doc", ".xls", ".ppt", ".rtf",
# Archives / installers / disk images
".zip", ".rar", ".7z", ".cab", ".iso", ".img", ".msix", ".msixbundle",
".apk", ".apks", ".xapk",
# Linux/Unix & cross-platform
".sh", ".run", ".bin", ".elf", ".so", ".deb", ".rpm", ".jar", ".war", ".ear",
# macOS
".dmg", ".pkg", ".command", ".kext",
}
# MIME prefixes/exacts to skip (page assets / safe-ish plumbing)
SKIP_MIME_PREFIXES = (
"text/", "image/", "video/", "audio/", "font/",
)
SKIP_MIME_EXACT = {
# web assets
"application/javascript", "text/javascript", "application/json", "text/css",
"application/font-woff", "application/font-woff2",
# security plumbing / certs / OCSP / CRL / protobuf
"application/x-protobuf",
"application/ocsp-response",
"application/x-x509-ca-cert", "application/pkix-cert", "application/pkix-crl",
"application/pkcs7-mime", "application/pkcs7-signature",
}
# Sec-Fetch-Dest values that are page assets (skip)
DEST_SKIP = {"script", "style", "image", "font", "track", "embed", "object", "iframe", "worker", "manifest"}
# ----------------------------------------
os.makedirs(DROP_DIR, exist_ok=True)
os.makedirs(VT_LOG_DIR, exist_ok=True)
_vt_times = [] # sliding window timestamps for rate limiting
_vt_sem = threading.BoundedSemaphore(VT_MAX_CONCURRENCY)
def short(h): return (h or "")[:8]
def nowts(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def status(tag, ident, msg):
ctx.log.info(f"[{nowts()}] [{tag}] #{short(ident)} {msg}")
def pretty_console(summary):
h = summary.get("sha256","")
size = summary.get("size")
fkb = f"{size/1024:.0f} KB" if isinstance(size, int) else "?"
sev = summary.get("severity","UNKNOWN")
ftype = summary.get("filetype","?")
client = summary.get("client","?")
host = summary.get("host","?")
url = summary.get("url","?")
vt = summary.get("vt",{}) or {}
stats = vt.get("stats",{}) or {}
link = vt.get("permalink")
src = vt.get("source") or "n/a"
total = stats.get("_total", "?")
lines = []
lines.append("\n" + "="*72)
lines.append(f"[{sev}] {short(h)} • {ftype} • {fkb}")
lines.append(f"Client: {client} Host: {host}")
lines.append(f"URL: {url}")
lines.append("")
if vt.get("found") or vt.get("status")==200:
lines.append(f"VT[{src}]: malicious={stats.get('malicious',0)} suspicious={stats.get('suspicious',0)} undetected={stats.get('undetected',0)} harmless={stats.get('harmless',0)} total={total}")
fam = summary.get("family_hint")
if fam:
lines.append(f"Top family: {fam}")
if link:
lines.append(f"Permalink: {link}")
else:
lines.append("VT: (no data)")
lines.append("")
lines.append(f"Decision: {summary.get('decision','UNKNOWN')} ({summary.get('policy_reason','')})")
lines.append(f"Elapsed: {summary.get('elapsed','?')}s")
lines.append("="*72 + "\n")
for l in lines:
ctx.log.info(l)
def sha256_bytes(b: bytes):
h = hashlib.sha256()
h.update(b)
return h.hexdigest()
def vt_rate_gate():
now = time.time()
while _vt_times and (now - _vt_times[0] > 60.0):
_vt_times.pop(0)
if len(_vt_times) >= VT_RPM:
wait = 60.0 - (now - _vt_times[0]) + 0.05
time.sleep(max(0.0, min(wait, 30.0)))
_vt_times.append(time.time())
def vt_http(method, url, **kw):
"""Wrapper for requests with rate limit + concurrency + basic backoff. Returns (resp or None)."""
if not requests:
return None
with _vt_sem:
vt_rate_gate()
try:
r = requests.request(method, url, timeout=VT_TIMEOUT, **kw)
except Exception:
return None
if r.status_code in (429, 500, 502, 503, 504):
time.sleep(3)
return None
return r
def vt_file_report(sha):
r = vt_http("GET", f"https://www.virustotal.com/api/v3/files/{sha}",
headers={"x-apikey": VT_API_KEY})
if not r:
return None, None, "http/backoff"
if r.status_code != 200:
return None, r.status_code, r.text
try:
j = r.json()
except Exception as e:
return None, 200, f"json error: {e}"
return j, 200, None
def vt_upload_and_poll(path, sha, status_cb):
status_cb("uploading…")
r = vt_http(
"POST",
"https://www.virustotal.com/api/v3/files",
headers={"x-apikey": VT_API_KEY},
files={"file": (os.path.basename(path), open(path, "rb"))},
)
if not r:
return None, {"error": "upload http/backoff"}
if r.status_code not in (200,201,202):
time.sleep(2)
return None, {"error": f"upload http {r.status_code}", "text": r.text}
try:
up = r.json()
except Exception as e:
up = {"error": f"upload json parse: {e}", "text": r.text}
analysis_id = None
if isinstance(up, dict):
data = up.get("data") or {}
analysis_id = data.get("id") or (data.get("attributes") or {}).get("id")
analysis = None
if analysis_id:
t0 = time.time()
while time.time() - t0 < VT_POLL_TIMEOUT:
status_cb("polling…")
r2 = vt_http("GET", f"https://www.virustotal.com/api/v3/analyses/{analysis_id}",
headers={"x-apikey": VT_API_KEY})
if not r2 or r2.status_code != 200:
break
try:
j2 = r2.json()
except Exception:
break
st = (j2.get("data",{}).get("attributes",{}) or {}).get("status")
if st == "completed":
analysis = j2
break
time.sleep(2)
status_cb("fetching report…")
rep, code, err = vt_file_report(sha)
return {"upload": up, "analysis": analysis, "report": rep, "code": code, "err": err}, None
def _extract_stats_from_analysis(analysis):
if not analysis or not isinstance(analysis, dict):
return {}, 0
attr = (analysis.get("data", {}) or {}).get("attributes", {}) or {}
stats = attr.get("stats", {}) or {}
out = {k:int(stats.get(k,0)) for k in ("malicious","suspicious","undetected","harmless","timeout","failure")}
total = sum(out.values())
out["_total"] = total
return out, total
def _extract_stats_from_report(rep):
if not rep or not isinstance(rep, dict):
return {}, 0
attrs = (rep.get("data",{}) or {}).get("attributes",{}) or {}
s = attrs.get("last_analysis_stats",{}) or {}
out = {k:int(s.get(k,0)) for k in ("malicious","suspicious","undetected","harmless","timeout","failure")}
total = sum(out.values())
out["_total"] = total
return out, total
def vt_summarize(sha, vt_bundle):
rep = vt_bundle.get("report")
analysis = vt_bundle.get("analysis")
code = vt_bundle.get("code")
a_stats, a_total = _extract_stats_from_analysis(analysis)
r_stats, r_total = _extract_stats_from_report(rep)
used = None
stats = {}
if a_total > 0:
used = "analysis"
stats = a_stats
elif r_total > 0:
used = "report"
stats = r_stats
else:
return {"enabled": True, "found": False, "status": code, "verdict": "unknown", "source": "none"}
verdict = "harmless"
if stats.get("malicious",0) > 0:
verdict = "malicious"
elif stats.get("suspicious",0) > 0:
verdict = "suspicious"
elif stats.get("_total",0) < MIN_ENGINES_FOR_CLEAN:
verdict = "unknown"
attrs = ((rep or {}).get("data",{}) or {}).get("attributes",{}) if isinstance(rep, dict) else {}
summary = {
"enabled": True,
"found": True,
"status": 200 if used else code,
"permalink": f"https://www.virustotal.com/gui/file/{sha}",
"meaningful_name": attrs.get("meaningful_name") if isinstance(attrs, dict) else None,
"reputation": attrs.get("reputation") if isinstance(attrs, dict) else None,
"stats": stats,
"file_attributes": {
"type_description": (attrs or {}).get("type_description") if isinstance(attrs, dict) else None,
"size": (attrs or {}).get("size") if isinstance(attrs, dict) else None,
"first_submission_date": (attrs or {}).get("first_submission_date") if isinstance(attrs, dict) else None,
"last_analysis_date": (attrs or {}).get("last_analysis_date") if isinstance(attrs, dict) else None,
},
"raw": vt_bundle,
"verdict": verdict,
"source": used or "none",
}
return summary
def save_vt_json(sha, vt_summary):
try:
p = os.path.join(VT_LOG_DIR, f"{sha}.json")
with open(p, "w") as f:
json.dump(vt_summary, f, indent=2)
except Exception:
pass
def block_page(flow, reason_text):
html = f"""<!doctype html><html><head><meta charset="utf-8"><title>Download blocked</title>
<meta http-equiv="Cache-Control" content="no-store" />
<style>body{{font-family:system-ui,-apple-system,Segoe UI,Roboto,Ubuntu,sans-serif;background:#fafafa;color:#111}}
.card{{max-width:720px;margin:6vh auto;background:#fff;border-radius:12px;box-shadow:0 6px 20px rgba(0,0,0,.08);padding:1.25rem 1.5rem}}
pre{{white-space:pre-wrap;background:#f5f5f5;border-radius:8px;padding:1rem;overflow-x:auto}}
h2{{margin:.2rem 0 .6rem 0}} .muted{{color:#666;font-size:.9rem}}</style></head>
<body><div class="card">
<h2>Download blocked</h2>
<p class="muted">This file was blocked by the Secure Web Gate due to security policy.</p>
<pre>{reason_text}</pre>
</div></body></html>"""
flow.response = http.Response.make(
403,
html.encode("utf-8"),
{
"Content-Type": "text/html; charset=utf-8",
"Cache-Control": "no-store, max-age=0",
"Pragma": "no-cache",
"X-Blocked-By": "PiSWG",
},
)
def _get_candidate_filename(flow: http.HTTPFlow):
cd = (flow.response.headers.get("content-disposition","") or "")
has_attachment = "attachment" in cd.lower()
# 1) Content-Disposition
fn = None
m = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', cd, flags=re.I)
if m:
fn = unquote(m.group(1))
# 2) Path segment
if not fn:
pcs = flow.request.path_components or []
if pcs:
cand = pcs[-1]
if cand and "." in cand:
fn = cand
# 3) Query parameters
if not fn:
q = parse_qs(urlparse(flow.request.pretty_url).query)
for k in ("filename","file","name","download","attachment","attname"):
vals = q.get(k) or []
if vals:
v = unquote(vals[-1]).strip('\'"')
if "." in v:
fn = v
break
ext = ""
if fn and "." in fn:
ext = "." + fn.rsplit(".", 1)[-1].lower()
return fn or "", ext, has_attachment
def _is_top_level_navigation(flow: http.HTTPFlow):
dest = (flow.request.headers.get("sec-fetch-dest","") or "").lower()
mode = (flow.request.headers.get("sec-fetch-mode","") or "").lower()
# Strict: must be a document navigation
return dest == "document" and mode == "navigate"
def _is_asset_request(flow: http.HTTPFlow, ctype_lower: str):
dest = (flow.request.headers.get("sec-fetch-dest","") or "").lower()
if dest in DEST_SKIP:
return f"sec-fetch-dest={dest}"
if ctype_lower:
if ctype_lower in SKIP_MIME_EXACT:
return f"mime={ctype_lower}"
for pref in SKIP_MIME_PREFIXES:
if ctype_lower.startswith(pref):
return f"mime={ctype_lower}"
return None
def should_scan(flow: http.HTTPFlow):
"""Apply download triggers & skips. Return (True/False, reason_string, filename, ext)."""
resp = flow.response
req = flow.request
ctype = (resp.headers.get("content-type","") or "").lower()
# Skip obvious page assets early
skip = _is_asset_request(flow, ctype)
if skip:
return (False, f"skip asset ({skip})", "", "")
# Extract filename/extension and Content-Disposition flag
filename, ext, has_attachment = _get_candidate_filename(flow)
# Primary triggers:
# A) Content-Disposition: attachment => scan
if has_attachment:
return (True, "trigger=cd:attachment", filename, ext)
# B) Top-level navigation + risky extension => scan
if _is_top_level_navigation(flow) and ext in RISKY_EXTS:
return (True, "trigger=top-level+risky-ext", filename, ext)
# (Removed) C) risky-ext anywhere
# (Removed) D) top-level+binary-ctype heuristic
return (False, "no trigger matched", filename, ext)
class VTGate:
def load(self, loader):
if not requests or not VT_API_KEY:
ctx.log.warn("VT disabled: 'requests' missing or API key empty. (Set both!)")
ctx.log.info(
f"VT gate ready. MAX_SCAN_BYTES={MAX_SCAN_BYTES} bytes; VT_RPM={VT_RPM}/min; "
f"poll={VT_POLL_TIMEOUT}s; min_engines={MIN_ENGINES_FOR_CLEAN}; concurrency={VT_MAX_CONCURRENCY}"
)
def _cache_conn(self):
con = sqlite3.connect(CACHE_DB)
con.execute("CREATE TABLE IF NOT EXISTS cache (hash TEXT PRIMARY KEY, verdict TEXT, stats TEXT, ts INTEGER)")
return con
def _cache_get(self, sha):
try:
con = self._cache_conn()
cur = con.execute("SELECT verdict, stats, ts FROM cache WHERE hash=?", (sha,))
row = cur.fetchone()
con.close()
if not row: return None
verdict, stats_json, ts = row
if time.time() - ts > CACHE_TTL_SECS:
return None
stats = json.loads(stats_json) if stats_json else {}
return {"verdict": verdict, "stats": stats}
except Exception:
return None
def _cache_put(self, sha, verdict, stats):
if verdict == "unknown":
return
try:
con = self._cache_conn()
con.execute("INSERT OR REPLACE INTO cache(hash, verdict, stats, ts) VALUES(?,?,?,?)",
(sha, verdict, json.dumps(stats or {}), int(time.time())))
con.commit()
con.close()
except Exception:
pass
def response(self, flow: http.HTTPFlow):
# Consider successful responses we can read (200 or 206 partials)
if not flow.response or flow.response.status_code not in (200, 206):
return
# Apply triggers/skips
scan, why, filename, ext = should_scan(flow)
if not scan:
status("SKIP", "----", f"{why} ct={flow.response.headers.get('content-type','?')} url={flow.request.pretty_url}")
return
# Basic metadata
try:
client = f"{flow.client_conn.address[0]}:{flow.client_conn.address[1]}"
except Exception:
client = "unknown"
url = flow.request.pretty_url
host = flow.request.host
ct_hdr = flow.response.headers.get("content-type", "?")
length_hdr = flow.response.headers.get("content-length")
size_hdr = int(length_hdr) if (length_hdr and length_hdr.isdigit()) else None
# Enforce size cap early
if size_hdr and size_hdr > MAX_SCAN_BYTES:
reason = f"oversize header: {size_hdr} > {MAX_SCAN_BYTES}"
status("DENY", "oversize", f"{reason} url={url}")
block_page(flow, f"Policy: size cap exceeded.\n{reason}\nURL: {url}\nClient: {client}")
return
t0 = time.time()
status("INFO", "pending", f"buffering body… ({why}) filename={filename or '-'} ext={ext or '-'} ct={ct_hdr} url={url}")
body = flow.response.get_content(strict=False) or b""
size = len(body)
if size > MAX_SCAN_BYTES:
reason = f"oversize body: {size} > {MAX_SCAN_BYTES}"
status("DENY", "oversize", f"{reason} url={url}")
block_page(flow, f"Policy: size cap exceeded.\n{reason}\nURL: {url}\nClient: {client}")
return
sha = sha256_bytes(body)
status("INFO", sha, f"hash={sha} size={size}")
# write temp drop (for audit)
tmp_path = os.path.join(DROP_DIR, f"{sha}.{int(time.time())}")
try:
with open(tmp_path, "wb") as f:
f.write(body)
except Exception as e:
status("WARN", sha, f"could not write temp file: {e}")
vt_summary = None
decision = "ALLOW"
policy_reason = "harmless"
family_hint = None
try:
# cache
cached = self._cache_get(sha)
if cached:
stats_c = cached.get("stats") or {}
verdict_c = cached.get("verdict") or "unknown"
status("INFO", sha, f"cache hit: {verdict_c} (mal={stats_c.get('malicious',0)} susp={stats_c.get('suspicious',0)} total={stats_c.get('_total','?')})")
if verdict_c == "harmless":
rep, code, err = vt_file_report(sha)
if code == 200 and isinstance(rep, dict):
bundle = {"upload": None, "analysis": None, "report": rep, "code": code, "err": err}
vt_summary = vt_summarize(sha, bundle)
if vt_summary.get("verdict") in ("malicious","suspicious","unknown"):
status("INFO", sha, f"cache override after recheck: {vt_summary['verdict']}")
else:
vt_summary["raw"] = {"source": "cache"}
else:
vt_summary = {"enabled": True, "found": False, "status": code, "verdict": "unknown", "source": "cache-recheck-failed"}
else:
vt_summary = {
"enabled": True, "found": True, "status": 200,
"permalink": f"https://www.virustotal.com/gui/file/{sha}",
"stats": {k:int(stats_c.get(k,0)) for k in ("malicious","suspicious","undetected","harmless","timeout","failure")},
"verdict": verdict_c, "source": "cache",
}
if not vt_summary or vt_summary.get("verdict") == "unknown":
if not requests or not VT_API_KEY:
raise RuntimeError("VT not available (requests missing or API key empty)")
# 1) Hash lookup
status("VT", sha, "lookup…")
rep, code, err = vt_file_report(sha)
bundle = {"upload": None, "analysis": None, "report": rep, "code": code, "err": err}
vt_summary = vt_summarize(sha, bundle)
# 2) If unknown or undetected, upload + poll
stats = (vt_summary or {}).get("stats", {})
if vt_summary.get("verdict") in ("unknown","harmless") and stats.get("malicious",0)==0 and stats.get("suspicious",0)==0:
status("VT", sha, "upload (unknown/undetected)…")
upres, up_err = vt_upload_and_poll(tmp_path, sha, lambda m: status("VT", sha, m))
if up_err:
raise RuntimeError(f"VT upload/poll error: {up_err}")
vt_summary = vt_summarize(sha, upres)
# decision policy (fail-closed on zeros/unknown)
stats = (vt_summary or {}).get("stats",{})
total = int(stats.get("_total",0))
mal = int(stats.get("malicious",0))
susp = int(stats.get("suspicious",0))
if (mal > 0) or (susp > 0):
decision = "DENY"
policy_reason = "VT: malicious/suspicious"
elif vt_summary.get("verdict") == "harmless" and total >= MIN_ENGINES_FOR_CLEAN:
decision = "ALLOW"
policy_reason = "harmless"
else:
decision = "DENY"
policy_reason = "timeout/unknown/min-engines"
# cache store (don’t cache unknown)
if vt_summary and vt_summary.get("status")==200 and vt_summary.get("verdict") != "unknown":
self._cache_put(sha, vt_summary.get("verdict","unknown"), vt_summary.get("stats",{}))
# save full VT JSON
if vt_summary:
save_vt_json(sha, vt_summary)
# family hint from report (best-effort)
try:
rep = ((vt_summary or {}).get("raw") or {}).get("report") or {}
res = (rep.get("data",{}).get("attributes",{}).get("last_analysis_results",{}) or {})
for v in ("Microsoft","Kaspersky","BitDefender","ESET-NOD32","CrowdStrike","Sophos","Avast","Fortinet"):
if v in res and res[v].get("result"):
family_hint = res[v]["result"]
break
except Exception:
pass
except Exception as e:
status("ERROR", sha, f"scan error: {e}")
decision = "ALLOW" if FAIL_OPEN else "DENY"
policy_reason = "scanner_error (fail-open)" if FAIL_OPEN else "scanner_error (fail-closed)"
vt_summary = vt_summary or {"enabled": bool(requests and VT_API_KEY), "error": str(e), "found": False}
elapsed = f"{(time.time()-t0):.1f}"
# Finalize: allow or block
summary = {
"sha256": sha,
"size": size,
"filetype": (vt_summary or {}).get("file_attributes",{}).get("type_description") or (ext or "unknown"),
"client": client,
"host": host,
"url": url,
"vt": vt_summary,
"decision": decision,
"policy_reason": policy_reason,
"elapsed": elapsed,
"family_hint": family_hint,
"severity": "HIGH" if decision=="DENY" else "LOW",
}
pretty_console(summary)
if decision == "DENY":
block_page(
flow,
f"{policy_reason}\n"
f"SHA256: {sha}\n"
f"Filename: {filename or '-'}\n"
f"Content-Type: {ct_hdr}\n"
f"URL: {url}\n"
f"Client: {client}"
)
addons = [VTGate()]