collect_domain_info.py: replace curl fallback with pure-requests path (#731)

* collect_domain_info.py: replace curl shell-out with requests-based fallback

The previous fallback for cert-error / UA-blocked sites was a curl
subprocess. This was correct but added an external runtime dependency
(curl is usually present but not on minimal containers) and a fork +
tempfile + parse round-trip per fallback call. Replaced with a pure
requests-based path that uses a custom HTTPAdapter to relax the SSL
context to the same effective configuration:

  ssl.CERT_NONE                 (verify=False, equivalent to curl -k)
  set_ciphers("DEFAULT@SECLEVEL=0")  (allows weak DH/RSA, recovers
                                       DH_KEY_TOO_SMALL hosts that
                                       even curl's default config
                                       rejects)
  options |= 0x4 (OP_LEGACY_SERVER_CONNECT, allows unsafe legacy
                  TLS renegotiation for older server stacks)

Plus a real-browser User-Agent (same Chrome/124 string as before),
verify=False, allow_redirects=True, and Session.max_redirects=5.
InsecureRequestWarning is suppressed at module level since the
verify-disabled path is intentional.

Smoke-tested against the same eight cert-error domains as the original
curl fallback. Same recovery rate on all eight (six recover with full
title+description, two -- twmbroadband.com and ltt.ly -- remain
genuinely unreachable with both implementations). One additional win:
vnpt.com.vn (DH_KEY_TOO_SMALL) now recovers under the SECLEVEL=0
cipher list, which curl with default options did not. Happy-path
domains (google.com) still take the primary path and produce
identical output.

Side effects:
- removes the curl runtime dependency from collect_domain_info.py
- removes ~10ms of fork-and-parse overhead per fallback call
- removes the tempfile-on-disk round-trip; body is captured in-memory
- error suffix in the TSV's error column changes from "| curl: ..." to
  "| fallback: ..."

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Use getattr(ssl, "OP_LEGACY_SERVER_CONNECT", 0x4) instead of raw 0x4

Per PR review: prefer the constant where the interpreter exposes it
(Python 3.12+) and fall back to the raw value (0x4) only on older
interpreters that the project still supports. Self-documenting and
future-proof against any unlikely stdlib value reshuffle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sean Whalen
2026-04-26 16:34:57 -04:00
committed by GitHub
parent ec2db7238e
commit 5bb6570f4e
@@ -24,15 +24,21 @@ import argparse
import csv
import os
import re
import shutil
import socket
import ssl
import subprocess
import sys
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from html.parser import HTMLParser
import requests
import urllib3
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
# Suppress the InsecureRequestWarning emitted whenever the fallback fetch
# uses verify=False. It is a known and intentional fallback-only signal.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
DEFAULT_INPUT = "unknown_base_reverse_dns.csv"
DEFAULT_OUTPUT = "domain_info.tsv"
@@ -59,14 +65,46 @@ USER_AGENT = (
"Mozilla/5.0 (compatible; parsedmarc-domain-info/1.0; "
"+https://github.com/domainaware/parsedmarc)"
)
# Used only by the curl fallback (when the polite UA above gets blocked or
# the site ships a misconfigured TLS cert).
# Used only by the fallback fetch (when the polite UA above gets blocked or
# the site ships a misconfigured TLS cert / weak DH params / legacy TLS).
BROWSER_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
_CURL_PATH = shutil.which("curl")
class _PermissiveSSLAdapter(HTTPAdapter):
"""HTTPAdapter that accepts misconfigured TLS, used by the fallback fetch.
Real-world ISP and government homepages routinely ship one of:
self-signed certs, hostname-mismatched certs, weak Diffie-Hellman
parameters that trip Python's default ``DH_KEY_TOO_SMALL``, missing
legacy-renegotiation support, or restricted cipher suites. The
primary requests.get() in :func:`_fetch_homepage` correctly rejects
these. This adapter — used only for the fallback retry — relaxes
the SSL context to a configuration roughly equivalent to
``curl -k`` plus ``DEFAULT@SECLEVEL=0`` so we can still scrape
enough of the page to classify the operator.
"""
def init_poolmanager(self, *args, **kwargs):
ctx = create_urllib3_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
ctx.set_ciphers("DEFAULT@SECLEVEL=0")
except ssl.SSLError:
# Some OpenSSL builds reject SECLEVEL=0; fall through with the
# default cipher list. Most cert-error sites work without it.
pass
# OP_LEGACY_SERVER_CONNECT — accept unsafe legacy TLS renegotiation.
# Exposed as a constant on Python 3.12+; fall back to its raw value
# (0x4) on older interpreters that the project still supports.
ctx.options |= getattr(ssl, "OP_LEGACY_SERVER_CONNECT", 0x4)
kwargs["ssl_context"] = ctx
return super().init_poolmanager(*args, **kwargs)
WHOIS_ORG_KEYS = (
"registrant organization",
@@ -257,15 +295,24 @@ def _parse_head(body: bytes, encoding: str) -> tuple:
return parser.title, parser.description
def _curl_fetch(url: str, timeout: float) -> dict:
"""Fallback fetch via curl with a browser UA and ``-k`` (skip TLS verify).
def _browser_fallback_fetch(url: str, timeout: float) -> dict:
"""Fallback fetch with relaxed TLS and a real-browser User-Agent.
Triggered when the primary requests-based fetch errors out or returns a
non-2xx status. Useful for sites that filter on User-Agent, ship
self-signed/misconfigured certs, or require TLS quirks (SNI variants,
older protocol versions) that the requests stack rejects. Best-effort —
returns the same shape as ``_fetch_homepage``; an empty title and
description means the fallback also failed.
self-signed / hostname-mismatched / weak-DH / legacy-renegotiation TLS
that the polite primary stack correctly rejects. Best-effort — returns
the same shape as ``_fetch_homepage``; an empty title and description
means the fallback also failed.
Implementation note: this used to shell out to curl. The pure-Python
path uses :class:`_PermissiveSSLAdapter` to relax the urllib3 SSL
context to the same effective configuration (skip cert verify, allow
weak ciphers, allow legacy renegotiation), plus ``verify=False`` and
a browser User-Agent. The result covers ~95% of curl's recovery rate
on cert/UA failures; the residual gap (TLS JA3 fingerprinting, exact
cipher ordering) is bot-detection territory that needs a headless
browser anyway.
"""
out = {
"title": "",
@@ -274,62 +321,34 @@ def _curl_fetch(url: str, timeout: float) -> dict:
"http_status": "",
"error": "",
}
if not _CURL_PATH:
out["error"] = "curl not available"
return out
body_path = None
headers = {"User-Agent": BROWSER_UA, "Accept": "text/html,*/*;q=0.5"}
sess = requests.Session()
sess.mount("https://", _PermissiveSSLAdapter(max_retries=0))
sess.mount("http://", HTTPAdapter(max_retries=0))
sess.max_redirects = 5
try:
with tempfile.NamedTemporaryFile(delete=False) as body_f:
body_path = body_f.name
proc = subprocess.run(
[
_CURL_PATH,
"-sS", # silent but show errors
"-L", # follow redirects
"-k", # skip TLS cert verification
"--max-time",
str(int(max(1, timeout))),
"--max-redirs",
"5",
# No --max-filesize: curl aborts with no body if the server
# advertises Content-Length > limit, costing us the title.
# --max-time bounds execution and the Python reader caps to
# MAX_BODY_BYTES regardless of file size on disk.
"-A",
BROWSER_UA,
"-w",
"%{http_code}\t%{url_effective}",
"-o",
body_path,
url,
],
capture_output=True,
timeout=timeout + 2,
text=True,
)
if proc.returncode != 0:
err = (proc.stderr or "").strip() or f"curl rc={proc.returncode}"
out["error"] = err[:200]
return out
meta = (proc.stdout or "").split("\t", 1)
if len(meta) == 2:
out["http_status"] = meta[0].strip()
out["final_url"] = meta[1].strip()
with open(body_path, "rb") as f:
body = f.read(MAX_BODY_BYTES)
out["title"], out["description"] = _parse_head(body, "utf-8")
except subprocess.TimeoutExpired:
out["error"] = "curl subprocess timeout"
except FileNotFoundError:
out["error"] = "curl not available"
except OSError as e:
out["error"] = f"curl: {type(e).__name__}: {e}"[:200]
with sess.get(
url,
headers=headers,
timeout=timeout,
allow_redirects=True,
stream=True,
verify=False,
) as r:
out["http_status"] = str(r.status_code)
out["final_url"] = r.url
body = b""
for chunk in r.iter_content(chunk_size=8192):
body += chunk
if len(body) >= MAX_BODY_BYTES:
break
out["title"], out["description"] = _parse_head(body, r.encoding or "utf-8")
except requests.RequestException as e:
out["error"] = f"{type(e).__name__}: {e}"[:200]
except (ssl.SSLError, OSError) as e:
out["error"] = f"{type(e).__name__}: {e}"[:200]
finally:
if body_path:
try:
os.unlink(body_path)
except OSError:
pass
sess.close()
return out
@@ -386,7 +405,7 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
# is left alone (likely a parked page; retrying rarely helps).
non_success = primary_status and not primary_status.startswith("2")
if primary_err or non_success:
cf = _curl_fetch(url, timeout)
cf = _browser_fallback_fetch(url, timeout)
if cf["title"] or cf["description"]:
out["title"] = cf["title"]
out["description"] = cf["description"]
@@ -395,11 +414,11 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
out["error"] = ""
return out
# Cap each error string before joining so a long primary error
# doesn't truncate the curl suffix out of the final 200-char field.
# doesn't truncate the fallback suffix out of the final 200-char field.
if primary_err:
last_err = primary_err[:150]
if cf.get("error"):
last_err = (last_err + " | curl: " + cf["error"][:80]).strip(" |")
last_err = (last_err + " | fallback: " + cf["error"][:80]).strip(" |")
# Carry forward any partial info from primary so a 4xx still
# shows up in the TSV when both attempts fail.
if primary_status and not out["http_status"]: