diff --git a/AGENTS.md b/AGENTS.md
index 889c3d7..67617a3 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -225,6 +225,13 @@ When `unknown_base_reverse_dns.csv` has new entries, follow this order rather th
- `find_unknown_base_reverse_dns.py` — regenerates `unknown_base_reverse_dns.csv` from `base_reverse_dns.csv` by subtracting what is already mapped or known-unknown. Enforces the no-full-IP privacy rule at ingest. Translates non-domain-shaped `source_name` rows (raw MMDB `as_name` strings surfaced by the ASN-fallback path in `utils.py:get_ip_address_info` when the IP had no PTR and the `as_domain` was uncategorized) to their corresponding `as_domain` via the bundled MMDB, so the row enters the pipeline as a researchable domain (and drops out automatically if that `as_domain` is already mapped). Run after merging a batch.
- `detect_psl_overrides.py` — scans the lists for clustered IP-containing patterns, auto-adds brand suffixes to `psl_overrides.txt`, folds affected entries to their base, and removes any remaining full-IP entries. Run before the collector on any new batch.
- `collect_domain_info.py` — the bulk enrichment collector described above. Respects `psl_overrides.txt` and skips full-IP entries. Two derived columns surface drift signals that are also useful during initial classification: `rebrand_signal` combines a body-text regex (matches "now X", "formerly known as X", "is now part of X", etc.) with a path/alt-text regex (matches "rebrand", "brand-launch", "brand-announcement", "name-change", "our-new-name") so that image-only acquisition banners — `
` — also fire. `external_links` lists the homepage's non-self, non-social outbound link hosts; useful as review context but not a flag trigger by default in the drift sweep (most external links are to partners / customers / vendors and don't indicate a rebrand).
+
+ **Search fallback (`--use-search-fallback`, off by default).** A meaningful share of KU domains return a Cloudflare / DDoS-Guard / "Are you a robot?" / px-captcha interstitial instead of real homepage content — even after the curl-style relaxed-TLS fallback runs. For those rows we have neither homepage signal nor (often) a usable as_name, and they fall through to KU. With `--use-search-fallback` enabled, the collector instead asks DuckDuckGo for `site:` and uses the top result whose host belongs to the input domain (exact match or subdomain — never a third-party page). Title and description from that result populate the row, and `title_source` is set to `search` so reviewers can audit what came from DDG vs. the homepage. Requires `pip install ddgs` (or `pip install .[build]`); the script runs without ddgs as long as the flag isn't passed.
+
+ Two safety rails to be aware of when using this:
+
+ - **Same-domain SEO-spam guard.** Top results that point at a *different* host than the input domain are silently skipped. The classifier's data-not-instructions rule still applies — search-engine snippets are untrusted text — but the same-domain check at least guarantees the snippet was published on a page belonging to the operator we're trying to identify, not a parasitic SEO site that scraped the domain name.
+ - **Stale snippets are real.** DuckDuckGo's index can lag a homepage rebrand by months. When you see a row classified via `title_source=search` whose category disagrees with the current homepage you can reach manually, prefer the manual verification — the search snippet is a recovery aid, not a tiebreaker against fresh content.
- `classify_unknown_domains.py` — regex-based multilingual classifier that consumes a `collect_domain_info.py` TSV and emits map / ambiguous / known-unknown additions. Useful for both lookup paths into `base_reverse_dns_map.csv`: the original PTR-side flow (classifying reverse-DNS base domains discovered from DMARC report source IPs) and the MMDB-coverage flow (classifying ASN domains lifted from the bundled IPinfo Lite MMDB). Detectors cover all 44 industry types in the README, and every detector aims for **concept parity across the same broad language pool** — see the concept-parity rule below. The classifier is the regex baseline of step 4 of the unknown-domain workflow (see "Workflow for classifying unknown domains" above) — it catches the obvious cases at scale and leaves the genuinely ambiguous to manual / LLM review.
**Three output buckets**. Per-row, the classifier returns one of three states:
diff --git a/parsedmarc/resources/maps/collect_domain_info.py b/parsedmarc/resources/maps/collect_domain_info.py
index 8bd5057..6594484 100644
--- a/parsedmarc/resources/maps/collect_domain_info.py
+++ b/parsedmarc/resources/maps/collect_domain_info.py
@@ -45,6 +45,14 @@ import urllib3
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
+# Optional import — only needed when --use-search-fallback is passed. The
+# script runs without ddgs as long as the flag isn't requested. Install via
+# `pip install ddgs` (or `pip install .[build]` from the repo root).
+try:
+ from ddgs import DDGS as _DDGS
+except ImportError:
+ _DDGS = None
+
# Suppress the InsecureRequestWarning emitted whenever the fallback fetch
# uses verify=False. It is a known and intentional fallback-only signal.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -70,6 +78,7 @@ FIELDS = [
"ip_whois_netname",
"ip_whois_country",
"error",
+ "title_source",
]
USER_AGENT = (
@@ -777,11 +786,174 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
return out
-def _collect_one(domain: str, whois_timeout: float, http_timeout: float) -> dict:
+# Title patterns that indicate the homepage fetch returned a bot-block /
+# WAF interstitial / parked / placeholder page rather than the real
+# operator's content. Triggers a search-fallback lookup when
+# --use-search-fallback is passed. The patterns intentionally overlap with
+# classify_unknown_domains.py's TITLE_NOISE_RE / PARKED_PAGE_RE — search
+# fallback is the recovery path for exactly the rows that filter excludes.
+_SEARCH_FALLBACK_TRIGGER_RE = re.compile(
+ r"(?i)(?:"
+ # Cloudflare / WAF / bot-detection interstitials
+ r"attention required! \| cloudflare|"
+ r"just a moment|are you a robot|checking your browser|"
+ r"please enable javascript|"
+ r"ddos[- ]guard|px-captcha|vercel security checkpoint|"
+ # Generic blocked / unavailable
+ r"access denied|access to this page has been denied|"
+ r"site is not available|page is not available|"
+ r"403 forbidden|401 unauthorized|"
+ r"bad gateway|503 service|"
+ # Registrar / hosting parking placeholders
+ r"this domain (?:name )?(?:has been |is )registered with|"
+ r"your domain (?:is |has )(?:expired|parked)|"
+ r"domain (?:has )?expired|domain (?:is )?parked|"
+ r"this domain is parked|parked free, courtesy of|"
+ r"domain parking|"
+ # Default-server / unconfigured pages
+ r"automatically generated default|default server page|"
+ r"default landing page|default web page|"
+ r"successfully deployed by|"
+ r"welcome to apache|apache http server test page|welcome to nginx|"
+ r"just another wordpress site|"
+ r"hostinger horizons|"
+ # For-sale parking
+ r"website is for sale|domain is for sale|domain (?:name )?for sale|"
+ r"buy this domain"
+ r")"
+)
+
+
+def _registrable_root(host: str) -> str:
+ """Return a coarse 'registrable' root for SEO-spam matching.
+
+ We compare the *last two* labels of the input domain to the *last two*
+ labels of the search result's host. That's not a full PSL lookup — it
+ correctly equates `www.foo.com` with `foo.com` and `sub.foo.co.uk` with
+ `foo.co.uk` for ccTLD pairs we care about, but it would equate
+ `foo.com.au` with `com.au`. The same-root check is paired with an exact
+ second-level match where available, so the false-equate risk is bounded.
+ """
+ parts = host.lower().strip().split(".")
+ if len(parts) <= 2:
+ return host.lower().strip()
+ return ".".join(parts[-2:])
+
+
+def _hosts_match(input_domain: str, result_host: str) -> bool:
+ """Return True iff the search-result host belongs to the input domain.
+
+ Anti-SEO-spam guard: the search engine often returns multiple results
+ for a `site:foo.com` query, and the top hit isn't always on `foo.com`
+ — sometimes it's a third-party page that scraped or talks about the
+ domain. We accept a result only when the result's host is exactly the
+ input domain or a subdomain of it.
+ """
+ if not result_host:
+ return False
+ a = input_domain.lower().strip().rstrip(".")
+ b = result_host.lower().strip().rstrip(".")
+ if a == b:
+ return True
+ return b.endswith("." + a)
+
+
+def _search_fallback_fetch(domain: str, max_results: int = 5) -> dict:
+ """Recover title + description from a DuckDuckGo search result.
+
+ Returns the same shape as ``_fetch_homepage`` (minus the rebrand_signal /
+ external_links extraction, which both require body HTML we don't have
+ when going through search). Rate-limited by ddgs's own internal
+ throttling — no extra sleep needed.
+
+ The same-domain guard (`_hosts_match`) is the SEO-spam defense:
+ search results that point at a *different* host than the input
+ domain are silently skipped, and we keep walking down the result
+ list until we find one whose host belongs to the input domain or
+ we exhaust the result set.
+
+ If `_DDGS` is None (ddgs not installed) the function returns an
+ empty result rather than raising — the caller decides how to handle
+ that (the CLI flag check happens upstream).
+ """
+ out = {
+ "title": "",
+ "description": "",
+ "final_url": "",
+ "title_source": "",
+ }
+ if _DDGS is None:
+ return out
+ try:
+ with _DDGS() as engine:
+ results = list(engine.text(f"site:{domain}", max_results=max_results))
+ except Exception as e:
+ # Network / rate-limit / parse errors all fall through. The
+ # caller treats an empty result the same way as a no-search-result.
+ out["error"] = f"search: {type(e).__name__}: {e}"[:200]
+ return out
+ for r in results:
+ href = r.get("href", "") or ""
+ host = _hostname_from_url(href)
+ if not _hosts_match(domain, host):
+ continue
+ out["title"] = (r.get("title") or "").strip()
+ # ddgs's body field is the search snippet — DDG calls it "abstract"
+ # in the JSON API; the python wrapper exposes it as 'body'.
+ out["description"] = (r.get("body") or "").strip()
+ out["final_url"] = href
+ out["title_source"] = "search"
+ return out
+ return out
+
+
+def _looks_bot_blocked(meta: dict) -> bool:
+ """Decide whether a homepage-fetch result warrants a search-fallback.
+
+ Triggers when the title/description match one of the bot-block /
+ parking patterns OR both fields are empty (typical of WAF interstitials
+ that strip / entirely). The combined check is broader
+ than just the regex because some interstitials produce no extractable
+ metadata at all.
+ """
+ title = (meta.get("title") or "").strip()
+ desc = (meta.get("description") or "").strip()
+ if not (title or desc):
+ return True
+ return bool(
+ _SEARCH_FALLBACK_TRIGGER_RE.search(title)
+ or _SEARCH_FALLBACK_TRIGGER_RE.search(desc)
+ )
+
+
+def _collect_one(
+ domain: str,
+ whois_timeout: float,
+ http_timeout: float,
+ use_search_fallback: bool = False,
+) -> dict:
row = {k: "" for k in FIELDS}
row["domain"] = domain
row.update(_parse_whois(_run_whois(domain, whois_timeout)))
row.update(_fetch_homepage(domain, http_timeout))
+ if row.get("title") or row.get("description"):
+ row["title_source"] = "homepage"
+ # Search fallback: when the homepage fetch returned a bot-block /
+ # parked / placeholder / empty result, ask DuckDuckGo for the
+ # `site:` snippet. Same-domain guard prevents SEO-spam
+ # contamination (a third-party page that scraped the domain).
+ if use_search_fallback and _looks_bot_blocked(row):
+ sf = _search_fallback_fetch(domain)
+ if sf["title"] or sf["description"]:
+ row["title"] = sf["title"]
+ row["description"] = sf["description"]
+ # Preserve the homepage-fetched final_url if we had one — it
+ # represents what the *server* redirected us to, which is more
+ # useful for redirect-target / rebrand analysis than the search
+ # result's href.
+ if not row.get("final_url"):
+ row["final_url"] = sf["final_url"]
+ row["title_source"] = "search"
ips = _resolve_ips(domain)
row["ips"] = ",".join(ips[:4])
# WHOIS the first resolved IP — usually reveals the hosting ASN / provider,
@@ -898,7 +1070,28 @@ def _main():
default=0,
help="Only process the first N pending domains (0 = all)",
)
+ p.add_argument(
+ "--use-search-fallback",
+ action="store_true",
+ help=(
+ "When the homepage fetch returns a bot-block / parked / "
+ "placeholder / empty page, fall back to a DuckDuckGo "
+ "site: search and use the top result's title and "
+ "description (only if the result host belongs to the input "
+ "domain, anti-SEO-spam guard). Requires the `ddgs` package "
+ "(pip install ddgs, or pip install .[build]). Off by default "
+ "because it adds ~0.5–1s of latency per fallback row and "
+ "depends on a third-party search service."
+ ),
+ )
args = p.parse_args()
+ if args.use_search_fallback and _DDGS is None:
+ print(
+ "error: --use-search-fallback requires the `ddgs` package "
+ "(pip install ddgs, or pip install .[build]).",
+ file=sys.stderr,
+ )
+ sys.exit(1)
mapped = _load_mapped(args.map)
overrides = _load_psl_overrides(args.psl_overrides) if args.psl_overrides else []
@@ -930,7 +1123,13 @@ def _main():
writer.writeheader()
with ThreadPoolExecutor(max_workers=args.workers) as ex:
futures = {
- ex.submit(_collect_one, d, args.whois_timeout, args.http_timeout): d
+ ex.submit(
+ _collect_one,
+ d,
+ args.whois_timeout,
+ args.http_timeout,
+ args.use_search_fallback,
+ ): d
for d in pending
}
for i, fut in enumerate(as_completed(futures), 1):
diff --git a/pyproject.toml b/pyproject.toml
index b29c8ee..0cb7027 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -55,6 +55,12 @@ dependencies = [
[project.optional-dependencies]
build = [
+ # Used only by maintainer tooling under parsedmarc/resources/maps/ —
+ # `collect_domain_info.py --use-search-fallback` falls back to a
+ # DuckDuckGo search when the homepage fetch returns a bot-block / parked
+ # / empty page. Optional import; the script runs without it as long as
+ # the fallback flag isn't passed.
+ "ddgs>=9.0.0",
"hatch>=1.14.0",
"myst-parser[linkify]",
"nose",