mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-06-11 04:59:43 +00:00
collect_domain_info.py: opt-in DuckDuckGo search fallback for bot-blocked rows
A meaningful share of KU domains return a Cloudflare / DDoS-Guard / "Are you a robot?" / px-captcha interstitial instead of real homepage content — even after the curl-style relaxed-TLS fallback runs. For those rows we have neither homepage signal nor (often) a usable as_name, and they fall through to KU even though the operator is a real (often well-known) business that the classifier could trivially handle if it could just see the page. Added an opt-in `--use-search-fallback` flag that asks DuckDuckGo for `site:<domain>` when the homepage fetch returned a bot-block / parking / empty result, and uses the top result's title and description (only if the result host belongs to the input domain — anti-SEO-spam guard). Mechanism - New optional `ddgs` dependency, listed under the `[build]` extras. `from ddgs import DDGS` is wrapped in a try/except — the script runs without ddgs installed as long as `--use-search-fallback` isn't passed; the flag check exits with a helpful install message otherwise. - `_SEARCH_FALLBACK_TRIGGER_RE` — title/description patterns that look like a bot-block / WAF interstitial / parked / placeholder. Triggers the fallback. Same shape as the classifier's TITLE_NOISE_RE / PARKED_PAGE_RE; the search fallback is the recovery path for exactly the rows that filter excludes. - `_looks_bot_blocked()` — combined check: trigger regex matches OR title and description are both empty (typical of WAF interstitials that strip <title>/<meta> entirely). - `_hosts_match()` — same-domain SEO-spam guard. A search result is accepted only when its host is exactly the input domain or a subdomain of it. Third-party SEO-spam pages that scraped the domain name are silently skipped. - `_search_fallback_fetch()` — runs `site:<domain>` through DDG, walks results in rank order, returns the first one whose host passes the guard. Returns empty if no result matches (caller leaves the row's homepage data alone in that case). - `_collect_one()` now takes a `use_search_fallback` flag, calls the fallback after the homepage fetch when the homepage looks bot-blocked, and writes `title_source = "homepage"` or `"search"` so reviewers can audit which rows came from where. - New `title_source` column in the TSV. Smoke test Test set: bbc.com (real homepage, no fallback expected) plus 5 known Cloudflare-walled rows (1800contacts.com, americaneagle.com, broadwaytechnology.com, health.gov.il, mfa.gov.il). Result: bbc.com classified via homepage; the other 5 all recovered title + description via search and got `title_source=search`. The same-domain guard validated independently — for broadwaytechnology.com the guard correctly rejects bloomberg.com and accepts support.broadwaytechnology.com (broadway was acquired by Bloomberg, but the search fallback returns the broadway-domain snippet, not the parent's bloomberg.com product page). Caveats codified in AGENTS.md - Search snippets are still untrusted text (data-not-instructions rule applies the same way it does to homepage HTML). - DDG's index can lag a homepage rebrand by months — when a row classified via `title_source=search` disagrees with a fresh manual fetch, prefer the manual verification. The fallback is a recovery aid, not a tiebreaker against fresh content. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -225,6 +225,13 @@ When `unknown_base_reverse_dns.csv` has new entries, follow this order rather th
|
||||
- `find_unknown_base_reverse_dns.py` — regenerates `unknown_base_reverse_dns.csv` from `base_reverse_dns.csv` by subtracting what is already mapped or known-unknown. Enforces the no-full-IP privacy rule at ingest. Translates non-domain-shaped `source_name` rows (raw MMDB `as_name` strings surfaced by the ASN-fallback path in `utils.py:get_ip_address_info` when the IP had no PTR and the `as_domain` was uncategorized) to their corresponding `as_domain` via the bundled MMDB, so the row enters the pipeline as a researchable domain (and drops out automatically if that `as_domain` is already mapped). Run after merging a batch.
|
||||
- `detect_psl_overrides.py` — scans the lists for clustered IP-containing patterns, auto-adds brand suffixes to `psl_overrides.txt`, folds affected entries to their base, and removes any remaining full-IP entries. Run before the collector on any new batch.
|
||||
- `collect_domain_info.py` — the bulk enrichment collector described above. Respects `psl_overrides.txt` and skips full-IP entries. Two derived columns surface drift signals that are also useful during initial classification: `rebrand_signal` combines a body-text regex (matches "now X", "formerly known as X", "is now part of X", etc.) with a path/alt-text regex (matches "rebrand", "brand-launch", "brand-announcement", "name-change", "our-new-name") so that image-only acquisition banners — `<a href="…/brand-launch-…"><img alt="Brand announcement"></a>` — also fire. `external_links` lists the homepage's non-self, non-social outbound link hosts; useful as review context but not a flag trigger by default in the drift sweep (most external links are to partners / customers / vendors and don't indicate a rebrand).
|
||||
|
||||
**Search fallback (`--use-search-fallback`, off by default).** A meaningful share of KU domains return a Cloudflare / DDoS-Guard / "Are you a robot?" / px-captcha interstitial instead of real homepage content — even after the curl-style relaxed-TLS fallback runs. For those rows we have neither homepage signal nor (often) a usable as_name, and they fall through to KU. With `--use-search-fallback` enabled, the collector instead asks DuckDuckGo for `site:<domain>` and uses the top result whose host belongs to the input domain (exact match or subdomain — never a third-party page). Title and description from that result populate the row, and `title_source` is set to `search` so reviewers can audit what came from DDG vs. the homepage. Requires `pip install ddgs` (or `pip install .[build]`); the script runs without ddgs as long as the flag isn't passed.
|
||||
|
||||
Two safety rails to be aware of when using this:
|
||||
|
||||
- **Same-domain SEO-spam guard.** Top results that point at a *different* host than the input domain are silently skipped. The classifier's data-not-instructions rule still applies — search-engine snippets are untrusted text — but the same-domain check at least guarantees the snippet was published on a page belonging to the operator we're trying to identify, not a parasitic SEO site that scraped the domain name.
|
||||
- **Stale snippets are real.** DuckDuckGo's index can lag a homepage rebrand by months. When you see a row classified via `title_source=search` whose category disagrees with the current homepage you can reach manually, prefer the manual verification — the search snippet is a recovery aid, not a tiebreaker against fresh content.
|
||||
- `classify_unknown_domains.py` — regex-based multilingual classifier that consumes a `collect_domain_info.py` TSV and emits map / ambiguous / known-unknown additions. Useful for both lookup paths into `base_reverse_dns_map.csv`: the original PTR-side flow (classifying reverse-DNS base domains discovered from DMARC report source IPs) and the MMDB-coverage flow (classifying ASN domains lifted from the bundled IPinfo Lite MMDB). Detectors cover all 44 industry types in the README, and every detector aims for **concept parity across the same broad language pool** — see the concept-parity rule below. The classifier is the regex baseline of step 4 of the unknown-domain workflow (see "Workflow for classifying unknown domains" above) — it catches the obvious cases at scale and leaves the genuinely ambiguous to manual / LLM review.
|
||||
|
||||
**Three output buckets**. Per-row, the classifier returns one of three states:
|
||||
|
||||
@@ -45,6 +45,14 @@ import urllib3
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.ssl_ import create_urllib3_context
|
||||
|
||||
# Optional import — only needed when --use-search-fallback is passed. The
|
||||
# script runs without ddgs as long as the flag isn't requested. Install via
|
||||
# `pip install ddgs` (or `pip install .[build]` from the repo root).
|
||||
try:
|
||||
from ddgs import DDGS as _DDGS
|
||||
except ImportError:
|
||||
_DDGS = None
|
||||
|
||||
# Suppress the InsecureRequestWarning emitted whenever the fallback fetch
|
||||
# uses verify=False. It is a known and intentional fallback-only signal.
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
@@ -70,6 +78,7 @@ FIELDS = [
|
||||
"ip_whois_netname",
|
||||
"ip_whois_country",
|
||||
"error",
|
||||
"title_source",
|
||||
]
|
||||
|
||||
USER_AGENT = (
|
||||
@@ -777,11 +786,174 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
|
||||
return out
|
||||
|
||||
|
||||
def _collect_one(domain: str, whois_timeout: float, http_timeout: float) -> dict:
|
||||
# Title patterns that indicate the homepage fetch returned a bot-block /
|
||||
# WAF interstitial / parked / placeholder page rather than the real
|
||||
# operator's content. Triggers a search-fallback lookup when
|
||||
# --use-search-fallback is passed. The patterns intentionally overlap with
|
||||
# classify_unknown_domains.py's TITLE_NOISE_RE / PARKED_PAGE_RE — search
|
||||
# fallback is the recovery path for exactly the rows that filter excludes.
|
||||
_SEARCH_FALLBACK_TRIGGER_RE = re.compile(
|
||||
r"(?i)(?:"
|
||||
# Cloudflare / WAF / bot-detection interstitials
|
||||
r"attention required! \| cloudflare|"
|
||||
r"just a moment|are you a robot|checking your browser|"
|
||||
r"please enable javascript|"
|
||||
r"ddos[- ]guard|px-captcha|vercel security checkpoint|"
|
||||
# Generic blocked / unavailable
|
||||
r"access denied|access to this page has been denied|"
|
||||
r"site is not available|page is not available|"
|
||||
r"403 forbidden|401 unauthorized|"
|
||||
r"bad gateway|503 service|"
|
||||
# Registrar / hosting parking placeholders
|
||||
r"this domain (?:name )?(?:has been |is )registered with|"
|
||||
r"your domain (?:is |has )(?:expired|parked)|"
|
||||
r"domain (?:has )?expired|domain (?:is )?parked|"
|
||||
r"this domain is parked|parked free, courtesy of|"
|
||||
r"domain parking|"
|
||||
# Default-server / unconfigured pages
|
||||
r"automatically generated default|default server page|"
|
||||
r"default landing page|default web page|"
|
||||
r"successfully deployed by|"
|
||||
r"welcome to apache|apache http server test page|welcome to nginx|"
|
||||
r"just another wordpress site|"
|
||||
r"hostinger horizons|"
|
||||
# For-sale parking
|
||||
r"website is for sale|domain is for sale|domain (?:name )?for sale|"
|
||||
r"buy this domain"
|
||||
r")"
|
||||
)
|
||||
|
||||
|
||||
def _registrable_root(host: str) -> str:
|
||||
"""Return a coarse 'registrable' root for SEO-spam matching.
|
||||
|
||||
We compare the *last two* labels of the input domain to the *last two*
|
||||
labels of the search result's host. That's not a full PSL lookup — it
|
||||
correctly equates `www.foo.com` with `foo.com` and `sub.foo.co.uk` with
|
||||
`foo.co.uk` for ccTLD pairs we care about, but it would equate
|
||||
`foo.com.au` with `com.au`. The same-root check is paired with an exact
|
||||
second-level match where available, so the false-equate risk is bounded.
|
||||
"""
|
||||
parts = host.lower().strip().split(".")
|
||||
if len(parts) <= 2:
|
||||
return host.lower().strip()
|
||||
return ".".join(parts[-2:])
|
||||
|
||||
|
||||
def _hosts_match(input_domain: str, result_host: str) -> bool:
|
||||
"""Return True iff the search-result host belongs to the input domain.
|
||||
|
||||
Anti-SEO-spam guard: the search engine often returns multiple results
|
||||
for a `site:foo.com` query, and the top hit isn't always on `foo.com`
|
||||
— sometimes it's a third-party page that scraped or talks about the
|
||||
domain. We accept a result only when the result's host is exactly the
|
||||
input domain or a subdomain of it.
|
||||
"""
|
||||
if not result_host:
|
||||
return False
|
||||
a = input_domain.lower().strip().rstrip(".")
|
||||
b = result_host.lower().strip().rstrip(".")
|
||||
if a == b:
|
||||
return True
|
||||
return b.endswith("." + a)
|
||||
|
||||
|
||||
def _search_fallback_fetch(domain: str, max_results: int = 5) -> dict:
|
||||
"""Recover title + description from a DuckDuckGo search result.
|
||||
|
||||
Returns the same shape as ``_fetch_homepage`` (minus the rebrand_signal /
|
||||
external_links extraction, which both require body HTML we don't have
|
||||
when going through search). Rate-limited by ddgs's own internal
|
||||
throttling — no extra sleep needed.
|
||||
|
||||
The same-domain guard (`_hosts_match`) is the SEO-spam defense:
|
||||
search results that point at a *different* host than the input
|
||||
domain are silently skipped, and we keep walking down the result
|
||||
list until we find one whose host belongs to the input domain or
|
||||
we exhaust the result set.
|
||||
|
||||
If `_DDGS` is None (ddgs not installed) the function returns an
|
||||
empty result rather than raising — the caller decides how to handle
|
||||
that (the CLI flag check happens upstream).
|
||||
"""
|
||||
out = {
|
||||
"title": "",
|
||||
"description": "",
|
||||
"final_url": "",
|
||||
"title_source": "",
|
||||
}
|
||||
if _DDGS is None:
|
||||
return out
|
||||
try:
|
||||
with _DDGS() as engine:
|
||||
results = list(engine.text(f"site:{domain}", max_results=max_results))
|
||||
except Exception as e:
|
||||
# Network / rate-limit / parse errors all fall through. The
|
||||
# caller treats an empty result the same way as a no-search-result.
|
||||
out["error"] = f"search: {type(e).__name__}: {e}"[:200]
|
||||
return out
|
||||
for r in results:
|
||||
href = r.get("href", "") or ""
|
||||
host = _hostname_from_url(href)
|
||||
if not _hosts_match(domain, host):
|
||||
continue
|
||||
out["title"] = (r.get("title") or "").strip()
|
||||
# ddgs's body field is the search snippet — DDG calls it "abstract"
|
||||
# in the JSON API; the python wrapper exposes it as 'body'.
|
||||
out["description"] = (r.get("body") or "").strip()
|
||||
out["final_url"] = href
|
||||
out["title_source"] = "search"
|
||||
return out
|
||||
return out
|
||||
|
||||
|
||||
def _looks_bot_blocked(meta: dict) -> bool:
|
||||
"""Decide whether a homepage-fetch result warrants a search-fallback.
|
||||
|
||||
Triggers when the title/description match one of the bot-block /
|
||||
parking patterns OR both fields are empty (typical of WAF interstitials
|
||||
that strip <title>/<meta> entirely). The combined check is broader
|
||||
than just the regex because some interstitials produce no extractable
|
||||
metadata at all.
|
||||
"""
|
||||
title = (meta.get("title") or "").strip()
|
||||
desc = (meta.get("description") or "").strip()
|
||||
if not (title or desc):
|
||||
return True
|
||||
return bool(
|
||||
_SEARCH_FALLBACK_TRIGGER_RE.search(title)
|
||||
or _SEARCH_FALLBACK_TRIGGER_RE.search(desc)
|
||||
)
|
||||
|
||||
|
||||
def _collect_one(
|
||||
domain: str,
|
||||
whois_timeout: float,
|
||||
http_timeout: float,
|
||||
use_search_fallback: bool = False,
|
||||
) -> dict:
|
||||
row = {k: "" for k in FIELDS}
|
||||
row["domain"] = domain
|
||||
row.update(_parse_whois(_run_whois(domain, whois_timeout)))
|
||||
row.update(_fetch_homepage(domain, http_timeout))
|
||||
if row.get("title") or row.get("description"):
|
||||
row["title_source"] = "homepage"
|
||||
# Search fallback: when the homepage fetch returned a bot-block /
|
||||
# parked / placeholder / empty result, ask DuckDuckGo for the
|
||||
# `site:<domain>` snippet. Same-domain guard prevents SEO-spam
|
||||
# contamination (a third-party page that scraped the domain).
|
||||
if use_search_fallback and _looks_bot_blocked(row):
|
||||
sf = _search_fallback_fetch(domain)
|
||||
if sf["title"] or sf["description"]:
|
||||
row["title"] = sf["title"]
|
||||
row["description"] = sf["description"]
|
||||
# Preserve the homepage-fetched final_url if we had one — it
|
||||
# represents what the *server* redirected us to, which is more
|
||||
# useful for redirect-target / rebrand analysis than the search
|
||||
# result's href.
|
||||
if not row.get("final_url"):
|
||||
row["final_url"] = sf["final_url"]
|
||||
row["title_source"] = "search"
|
||||
ips = _resolve_ips(domain)
|
||||
row["ips"] = ",".join(ips[:4])
|
||||
# WHOIS the first resolved IP — usually reveals the hosting ASN / provider,
|
||||
@@ -898,7 +1070,28 @@ def _main():
|
||||
default=0,
|
||||
help="Only process the first N pending domains (0 = all)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--use-search-fallback",
|
||||
action="store_true",
|
||||
help=(
|
||||
"When the homepage fetch returns a bot-block / parked / "
|
||||
"placeholder / empty page, fall back to a DuckDuckGo "
|
||||
"site:<domain> search and use the top result's title and "
|
||||
"description (only if the result host belongs to the input "
|
||||
"domain, anti-SEO-spam guard). Requires the `ddgs` package "
|
||||
"(pip install ddgs, or pip install .[build]). Off by default "
|
||||
"because it adds ~0.5–1s of latency per fallback row and "
|
||||
"depends on a third-party search service."
|
||||
),
|
||||
)
|
||||
args = p.parse_args()
|
||||
if args.use_search_fallback and _DDGS is None:
|
||||
print(
|
||||
"error: --use-search-fallback requires the `ddgs` package "
|
||||
"(pip install ddgs, or pip install .[build]).",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
mapped = _load_mapped(args.map)
|
||||
overrides = _load_psl_overrides(args.psl_overrides) if args.psl_overrides else []
|
||||
@@ -930,7 +1123,13 @@ def _main():
|
||||
writer.writeheader()
|
||||
with ThreadPoolExecutor(max_workers=args.workers) as ex:
|
||||
futures = {
|
||||
ex.submit(_collect_one, d, args.whois_timeout, args.http_timeout): d
|
||||
ex.submit(
|
||||
_collect_one,
|
||||
d,
|
||||
args.whois_timeout,
|
||||
args.http_timeout,
|
||||
args.use_search_fallback,
|
||||
): d
|
||||
for d in pending
|
||||
}
|
||||
for i, fut in enumerate(as_completed(futures), 1):
|
||||
|
||||
@@ -55,6 +55,12 @@ dependencies = [
|
||||
|
||||
[project.optional-dependencies]
|
||||
build = [
|
||||
# Used only by maintainer tooling under parsedmarc/resources/maps/ —
|
||||
# `collect_domain_info.py --use-search-fallback` falls back to a
|
||||
# DuckDuckGo search when the homepage fetch returns a bot-block / parked
|
||||
# / empty page. Optional import; the script runs without it as long as
|
||||
# the fallback flag isn't passed.
|
||||
"ddgs>=9.0.0",
|
||||
"hatch>=1.14.0",
|
||||
"myst-parser[linkify]",
|
||||
"nose",
|
||||
|
||||
Reference in New Issue
Block a user