From c752e776de12be0d435a15d3a97fee9179c8ab3c Mon Sep 17 00:00:00 2001 From: Sean Whalen <44679+seanthegeek@users.noreply.github.com> Date: Wed, 6 May 2026 21:22:30 -0400 Subject: [PATCH] Detect map-key rebrands via homepage drift sweep (#752) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two complementary pieces of M&A drift detection over base_reverse_dns_map.csv: - `collect_domain_info.py` gains two derived columns. `rebrand_signal` combines a body-text regex ("now X" / "formerly known as X" / "we became X" / ...) with a narrow path-and-alt-text regex ("rebrand", "brand-launch", "brand-announcement", "name-change", "our-new-name", ...) that runs against the JSON-unescaped page bytes, so URL slugs and image alt attributes inside Elementor / hydration script blobs are reachable. The two-regex split is what catches image-only acquisition banners like bankonitusa.com's "now Navanta" — a `Brand announcement` with no visible text — that pure body-text scanning misses. `external_links` collects the homepage's non-self, non-social outbound link hosts as review context only. - `detect_rebrands.py` is a new sibling drift sweep. It re-fetches every key in base_reverse_dns_map.csv with the same fetch machinery, evaluates two default flag triggers (`rebrand_signal` matched, or final URL host doesn't sit under the input domain), and writes a compact TSV of just the flagged rows. `external_links` is captured into the row as context but is not a default trigger — most outbound links are to partners / customers / vendors, and flagging them would flood review with noise. `--flag-external-links` opts into that signal for thorough sweeps. Resume-safe via `-o`. Output is review fodder, not automated map mutation: a single signal is one corroborating source, and promoting a flagged row into the map still requires a second source per the two-corroborating-sources rule. README and AGENTS.md updated to document the new columns and script. Co-authored-by: Sean Whalen Co-authored-by: Claude Opus 4.7 (1M context) --- AGENTS.md | 3 +- parsedmarc/resources/maps/README.md | 20 + .../resources/maps/collect_domain_info.py | 348 ++++++++++++++++-- parsedmarc/resources/maps/detect_rebrands.py | 240 ++++++++++++ 4 files changed, 587 insertions(+), 24 deletions(-) create mode 100644 parsedmarc/resources/maps/detect_rebrands.py diff --git a/AGENTS.md b/AGENTS.md index 8618551..9ed8900 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -224,7 +224,8 @@ When `unknown_base_reverse_dns.csv` has new entries, follow this order rather th - `find_unknown_base_reverse_dns.py` — regenerates `unknown_base_reverse_dns.csv` from `base_reverse_dns.csv` by subtracting what is already mapped or known-unknown. Enforces the no-full-IP privacy rule at ingest. Translates non-domain-shaped `source_name` rows (raw MMDB `as_name` strings surfaced by the ASN-fallback path in `utils.py:get_ip_address_info` when the IP had no PTR and the `as_domain` was uncategorized) to their corresponding `as_domain` via the bundled MMDB, so the row enters the pipeline as a researchable domain (and drops out automatically if that `as_domain` is already mapped). Run after merging a batch. - `detect_psl_overrides.py` — scans the lists for clustered IP-containing patterns, auto-adds brand suffixes to `psl_overrides.txt`, folds affected entries to their base, and removes any remaining full-IP entries. Run before the collector on any new batch. -- `collect_domain_info.py` — the bulk enrichment collector described above. Respects `psl_overrides.txt` and skips full-IP entries. +- `collect_domain_info.py` — the bulk enrichment collector described above. Respects `psl_overrides.txt` and skips full-IP entries. Two derived columns surface drift signals that are also useful during initial classification: `rebrand_signal` combines a body-text regex (matches "now X", "formerly known as X", "is now part of X", etc.) with a path/alt-text regex (matches "rebrand", "brand-launch", "brand-announcement", "name-change", "our-new-name") so that image-only acquisition banners — `Brand announcement` — also fire. `external_links` lists the homepage's non-self, non-social outbound link hosts; useful as review context but not a flag trigger by default in the drift sweep (most external links are to partners / customers / vendors and don't indicate a rebrand). +- `detect_rebrands.py` — drift sweep that re-fetches every key in `base_reverse_dns_map.csv` with the same machinery as `collect_domain_info.py` and emits a TSV of rows where `rebrand_signal` or `redirect_changed` (final URL host doesn't sit under the input domain) fired. Output is for periodic review — a single signal is one corroborating source; promoting a flagged row still needs a second source per the two-corroborating-sources rule. Resume-safe via `-o`. Use `--limit N` to spot-check a slice; `--include-clean` to also emit non-flagged rows; `--flag-external-links` to additionally flag rows whose only signal is an outbound non-self host (off by default to keep partner/vendor noise out of the review queue). - `find_bad_utf8.py` — locates invalid UTF-8 bytes (used after past encoding corruption). - `sortlists.py` — case-insensitive sort + dedupe + `type`-column validator for the list files; the authoritative sorter run after every batch edit. diff --git a/parsedmarc/resources/maps/README.md b/parsedmarc/resources/maps/README.md index 03a2621..3fbe31f 100644 --- a/parsedmarc/resources/maps/README.md +++ b/parsedmarc/resources/maps/README.md @@ -129,10 +129,30 @@ Scans `unknown_base_reverse_dns.csv` for full-IP-containing entries that share a Bulk enrichment collector. For every domain in `unknown_base_reverse_dns.csv` that is not already in `base_reverse_dns_map.csv`, runs `whois` on the domain, fetches a size-capped `https://` GET, resolves A/AAAA records, and runs `whois` on the first resolved IP. Writes a TSV (`domain_info.tsv` by default) with the registrant org/country/registrar, page ``/`<meta description>`, resolved IPs, and IP-WHOIS org/netname/country — the compact metadata a classifier needs to decide each domain in one pass. Respects `psl_overrides.txt`, skips full-IP entries, and is resume-safe (re-running only fetches domains missing from the output file). +The TSV also carries two derived columns that surface drift signals (and double as classification hints when a homepage explicitly names its operator): + +- `rebrand_signal` — first ~120-char excerpt of the page where one of two regexes hit. (a) Body-text phrases: *now X*, *is now part of X*, *formerly known as X*, *we became X*, *rebranded as X*, *acquired by X*, *merged with X*, *joined the X*. Common false-positive trailing words (`Now Available`, `Now Hiring`, etc.) are filtered, and the captured brand must start with an uppercase letter. (b) Path / alt-text phrases: `rebrand`, `brand-launch`, `brand-announcement`, `brand-change`, `name-change`, `our-new-name`, `new-name-for`, `acquisition-announcement`, `merger-announcement`. The path scan runs against the JSON-unescaped page bytes, so it sees URL slugs and image alt attributes embedded in script blobs. Real-world case: bankonitusa.com's "now Navanta" banner is image-only — `<a href="https://navanta.com/brand-launch-..."><img alt="Brand announcement"></a>` — and pure body-text scanning misses it; the path regex matches via the `brand-launch` slug and `Brand announcement` alt attribute. +- `external_links` — comma-separated list of up to 5 distinct outbound link hosts, after stripping the input domain (and its subdomains) and a small noise list (social, CDN, analytics, app stores). Useful as context when reviewing a flagged row, but a noisy *flag* — most external links are to partners / customers / vendors that have no operator relationship — so `detect_rebrands.py` does not treat this column as a flag trigger by default. Pass `--flag-external-links` for a thorough sweep. + ## domain_info.tsv The output of `collect_domain_info.py`. Tab-separated, one row per researched domain. Not tracked by Git — it is regenerated on demand and contains transient third-party WHOIS/HTML data. +## detect_rebrands.py + +Drift sweep that re-fetches every key in `base_reverse_dns_map.csv` with the same machinery as `collect_domain_info.py` and writes a TSV (`rebrand_drift.tsv` by default) of rows where a drift signal fired. Two signals are flagged by default: + +- `rebrand_signal` — the collector's body-text and path/alt-text regexes (see above) matched. +- `redirect_changed` — the homepage's final URL host is not the input domain or a subdomain of it (typical case-1 acquisition redirect, e.g. vodafone.is → syn.is). + +`external_links` is captured into the output for context but is not a default trigger — most outbound links are to partners / customers / vendors and would generate noise. Pass `--flag-external-links` to also flag on this column during a thorough sweep where missing an image-only banner that lacks a rebrand-themed slug or alt text is worse than the noise. + +The output is for periodic review, not automated map mutation. Each hit is one corroborating source; promoting a flagged row into the map still requires a second source per the two-corroborating-sources rule in [AGENTS.md](../../../AGENTS.md). Resume-safe: re-running only re-fetches keys not already in the output file. Use `--limit N` to spot-check a slice and `--include-clean` to also write non-flagged rows for inspection of the no-signal majority. + +## rebrand_drift.tsv + +The output of `detect_rebrands.py`. Tab-separated, one row per flagged map key. Not tracked by Git — regenerated on demand. + ## sortlists.py Validation and sorting helper invoked as a module. Alphabetically sorts `base_reverse_dns_map.csv` (case-insensitive by first column, preserving CRLF line endings), deduplicates entries, validates that every `type` appears in `base_reverse_dns_types.txt`, and warns on names that contain unescaped commas or stray whitespace. Run it after any batch merge before committing. diff --git a/parsedmarc/resources/maps/collect_domain_info.py b/parsedmarc/resources/maps/collect_domain_info.py index 96aa571..6b69c52 100644 --- a/parsedmarc/resources/maps/collect_domain_info.py +++ b/parsedmarc/resources/maps/collect_domain_info.py @@ -6,7 +6,15 @@ Reads a list of domains (defaults to the unmapped entries in useful for classifying an unknown sender: domain, whois_org, whois_country, registrar, title, description, - final_url, http_status, error + rebrand_signal, external_links, final_url, http_status, ips, + ip_whois_org, ip_whois_netname, ip_whois_country, error + +`rebrand_signal` flags rows whose page text matches a phrase like "now X" or +"formerly known as X" — useful both for classifying an unknown sender ("we +became Newfold Digital") and as a drift signal when re-run against existing +map keys via `detect_rebrands.py`. `external_links` carries the homepage's +non-self, non-social outbound link hosts; it catches image-only acquisition +banners that text scanning misses (e.g. bankonitusa.com → navanta.com). The output is resume-safe: re-running the script only fetches domains that are not already in the output file. Designed to produce a small file that an LLM @@ -30,6 +38,7 @@ import subprocess import sys from concurrent.futures import ThreadPoolExecutor, as_completed from html.parser import HTMLParser +from urllib.parse import urlparse import requests import urllib3 @@ -52,6 +61,8 @@ FIELDS = [ "registrar", "title", "description", + "rebrand_signal", + "external_links", "final_url", "http_status", "ips", @@ -135,6 +146,7 @@ IP_WHOIS_NETNAME_KEYS = ("netname", "network-name") IP_WHOIS_COUNTRY_KEYS = ("country",) MAX_BODY_BYTES = 256 * 1024 # truncate responses so a hostile page can't blow up RAM +MAX_BODY_TEXT_CHARS = 100 * 1024 # cap on extracted visible body text # Privacy filter: drop entries containing a full IPv4 address (four dotted or # dashed octets). Full IPs in a reverse-DNS base domain reveal a specific @@ -143,6 +155,227 @@ _FULL_IP_RE = re.compile( r"(?<![\d])(\d{1,3})[-.](\d{1,3})[-.](\d{1,3})[-.](\d{1,3})(?![\d])" ) +# Rebrand-signal scan. Triggered phrases are followed by a captured brand name +# (capitalized, non-noise word). The reviewer ultimately judges whether a hit +# is a real rebrand banner — the regex's job is to not miss the obvious ones. +# Real cases: "now Navanta", "is now part of Lumen", "formerly known as +# Symantec Email Security", "we became Newfold Digital". +REBRAND_RE = re.compile( + r"(?:" + r"(?:now|formerly(?: known as)?) " + r"|" + r"(?:we became|rebranded(?: as| to)?|merged with|" + r"acquired by|previously known as|previously operated as|" + r"is now (?:a )?part of|new name for|joined the) " + r")" + r"([A-Za-z][A-Za-z0-9&]+)", + re.IGNORECASE, +) + +# Path-style rebrand markers that appear in URL slugs and image alt text. +# Real-world image-only rebrand banners (the typical "we got acquired" +# treatment) put the announcement in a slug like +# `/brand-launch-frequently-asked-questions/` and an alt like +# "Brand announcement – Learn more", neither of which the body-text +# REBRAND_RE can see. Phrasing here is deliberately narrow — "brand" +# alone is far too common; we require it joined to launch / announcement / +# change / etc. by a space, dash, or underscore, which virtually never +# occurs outside a rebrand context. +REBRAND_PATH_RE = re.compile( + r"(?:" + r"rebrand" + r"|brand[ _-](?:launch|announcement|reveal|refresh|change|update)" + r"|name[ _-]change" + r"|our[ _-]new[ _-](?:name|brand)" + r"|new[ _-]name[ _-]for" + r"|(?:acquisition|merger)[ _-]announcement" + r")", + re.IGNORECASE, +) + +# Words that commonly follow "now"/"formerly" outside a rebrand context. The +# regex would otherwise hit "Now Available", "Formerly Open", etc. Add to +# this set if review surfaces a recurring false positive — keep the set +# narrow so real one-word brand names (Navanta, Lumen, Sykt, etc.) survive. +_REBRAND_NOISE = frozenset( + { + "Available", + "Accepting", + "Active", + "Booking", + "Closed", + "Complete", + "Enrolling", + "Expanding", + "Free", + "Hiring", + "Live", + "Loading", + "Offering", + "Online", + "Open", + "Operating", + "Pending", + "Playing", + "Powered", + "Selling", + "Serving", + "Shipping", + "Showing", + "Streaming", + "Supporting", + "Trending", + "Underway", + "You", + "Your", + } +) + + +# Hostnames that overwhelmingly appear as outbound links on virtually every +# homepage and carry no signal about the operator's identity. Keeping these +# out of `external_links` means the column is dominated by hosts that +# actually tell us something — e.g. an outbound link to navanta.com from +# bankonitusa.com (the rebrand's banner is an image-only `<a href>` with +# no visible "Navanta" text, so href scanning is the only cheap way to +# catch it without rendering JavaScript). +_NOISE_LINK_HOSTS = frozenset( + { + "facebook.com", + "fb.com", + "twitter.com", + "x.com", + "linkedin.com", + "instagram.com", + "youtube.com", + "youtu.be", + "tiktok.com", + "pinterest.com", + "vimeo.com", + "reddit.com", + "medium.com", + "github.com", + "gitlab.com", + "bitbucket.org", + "google.com", + "googleapis.com", + "googletagmanager.com", + "googleadservices.com", + "google-analytics.com", + "gstatic.com", + "doubleclick.net", + "play.google.com", + "apps.apple.com", + "apple.com", + "microsoft.com", + "office.com", + "cloudflare.com", + "jsdelivr.net", + "unpkg.com", + "bootstrapcdn.com", + "fontawesome.com", + "wp.com", + "w.org", + "wordpress.org", + "schema.org", + "ogp.me", + } +) + +_HREF_RE = re.compile( + r"""href\s*=\s*['"]https?://([^/'"\s>]+)""", + re.IGNORECASE, +) + + +def _hostname_from_url(url: str) -> str: + try: + return (urlparse(url).hostname or "").lower() + except Exception: + return "" + + +def _is_noise_host(host: str) -> bool: + for noise in _NOISE_LINK_HOSTS: + if host == noise or host.endswith("." + noise): + return True + return False + + +def _external_link_hosts(self_domain: str, text: str, limit: int = 5) -> list: + """Return up to `limit` distinct external hostnames found in <a href> URLs. + + Skips hosts that match the input domain (or any of its subdomains) and + common social/CDN/analytics/utility hosts that appear on practically every + page. Hosts are returned in first-appearance order; a host whose + registered domain matches the input but happens to be a different + subdomain (e.g. login.example.com on example.com's homepage) is treated + as self. + """ + self_domain = (self_domain or "").lower() + seen = [] + seen_set = set() + for m in _HREF_RE.finditer(text): + host = m.group(1).lower() + if not host or host in seen_set: + continue + if self_domain and (host == self_domain or host.endswith("." + self_domain)): + continue + if _is_noise_host(host): + continue + seen_set.add(host) + seen.append(host) + if len(seen) >= limit: + break + return seen + + +def _rebrand_signal(*texts: str) -> str: + """Return first ~120-char context of a rebrand-keyword hit, or ''. + + Scans each input text in order. Returns the first hit whose captured + brand-name token is not on the noise list — keeps the surrounding + sentence so a reviewer can decide at a glance whether the match is a + real banner ("BankOnIT is now Navanta") or residual noise. + """ + for text in texts: + if not text: + continue + for m in REBRAND_RE.finditer(text): + brand = m.group(1) + # Real brand names in rebrand banners are virtually always written + # with an initial capital. Filtering on case lets us match the + # trigger phrase case-insensitively while still rejecting common + # post-trigger noise like "now hiring" / "formerly available". + if not brand or not brand[0].isupper(): + continue + if brand in _REBRAND_NOISE: + continue + start = max(0, m.start() - 30) + end = min(len(text), m.end() + 80) + return _strip_field(text[start:end]) + return "" + + +def _rebrand_path_signal(text: str) -> str: + """Return first ~120-char context of a rebrand-themed path/alt-text hit. + + Runs ``REBRAND_PATH_RE`` against the unescaped page text — the same + blob ``_external_link_hosts`` consumes — so URL slugs (`href= + "https://navanta.com/brand-launch-..."`) and image alt attributes + (`alt="Brand announcement"`) are both visible. The regex's phrasing + is narrow enough that hitting it almost always corresponds to a real + rebrand artifact rather than ordinary marketing copy. + """ + if not text: + return "" + m = REBRAND_PATH_RE.search(text) + if not m: + return "" + start = max(0, m.start() - 40) + end = min(len(text), m.end() + 80) + return _strip_field(text[start:end]) + def _has_full_ip(s: str) -> bool: for m in _FULL_IP_RE.finditer(s): @@ -243,20 +476,32 @@ def _lookup_ip(ip: str, timeout: float) -> dict: return _parse_ip_whois(_run_whois(ip, timeout)) -class _HeadParser(HTMLParser): - """Extract <title> and the first description-like meta tag.""" +class _PageParser(HTMLParser): + """Extract <title>, the first description-like meta tag, and body text. + + Body text excludes the contents of <script>/<style>/<noscript>/<template> + elements — those rarely correspond to anything visible and routinely + contain large embedded JSON blobs that would crowd out the actual page + text under the body-text cap. Whitespace is collapsed at join time. + """ + + _SKIP_TAGS = ("script", "style", "noscript", "template") def __init__(self): super().__init__(convert_charrefs=True) self.title = "" self.description = "" + self._body_parts = [] + self._body_chars = 0 self._in_title = False - self._stop = False + self._in_body = False + self._skip_depth = 0 def handle_starttag(self, tag, attrs): - if self._stop: - return tag = tag.lower() + if tag in self._SKIP_TAGS: + self._skip_depth += 1 + return if tag == "title": self._in_title = True elif tag == "meta": @@ -270,29 +515,72 @@ class _HeadParser(HTMLParser): ): self.description = _strip_field(a.get("content", "")) elif tag == "body": - # everything useful is in <head>; stop parsing once we hit <body> - self._stop = True + self._in_body = True def handle_endtag(self, tag): - if tag.lower() == "title": + tag = tag.lower() + if tag in self._SKIP_TAGS: + if self._skip_depth: + self._skip_depth -= 1 + return + if tag == "title": self._in_title = False + elif tag == "body": + self._in_body = False def handle_data(self, data): + if self._skip_depth: + return if self._in_title and not self.title: self.title = _strip_field(data) + if self._in_body and self._body_chars < MAX_BODY_TEXT_CHARS: + self._body_parts.append(data) + self._body_chars += len(data) + + @property + def body_text(self) -> str: + return re.sub(r"\s+", " ", " ".join(self._body_parts)).strip() -def _parse_head(body: bytes, encoding: str) -> tuple: +def _extract_metadata(domain: str, body: bytes, encoding: str) -> dict: + """Decode the response body once and extract every per-page signal. + + Returns ``title``, ``description``, ``rebrand_signal``, ``external_links``. + Decoding once and running both the HTML parser and the href regex on the + same string avoids paying the decode cost twice. + """ + out = { + "title": "", + "description": "", + "rebrand_signal": "", + "external_links": "", + } try: text = body.decode(encoding, errors="replace") except LookupError: text = body.decode("utf-8", errors="replace") - parser = _HeadParser() + parser = _PageParser() try: parser.feed(text) except Exception: pass - return parser.title, parser.description + out["title"] = parser.title + out["description"] = parser.description + # Many sites embed serialized HTML inside <script> blocks (block-editor / + # Elementor templates, JSON-LD, hydration payloads) where quotes and + # slashes are JSON-escaped: `href=\"https:\/\/...\"`. The parser already + # skipped that content for body_text, but the URLs and alt-text inside + # it still signal where the page is pointing — bankonitusa.com's "now + # Navanta" banner is image-only `<a href>` with `alt="Brand + # announcement"` and slug `/brand-launch-.../`, all sitting inside an + # escaped Elementor blob. Unescape so the path-style rebrand regex and + # the link-host regex both see them. + unescaped = text.replace('\\"', '"').replace("\\/", "/").replace("\\'", "'") + text_signal = _rebrand_signal(parser.title, parser.description, parser.body_text) + path_signal = _rebrand_path_signal(unescaped) + out["rebrand_signal"] = text_signal or path_signal + out["external_links"] = ",".join(_external_link_hosts(domain, unescaped)) + return out def _browser_fallback_fetch(url: str, timeout: float) -> dict: @@ -317,6 +605,8 @@ def _browser_fallback_fetch(url: str, timeout: float) -> dict: out = { "title": "", "description": "", + "rebrand_signal": "", + "external_links": "", "final_url": "", "http_status": "", "error": "", @@ -342,7 +632,13 @@ def _browser_fallback_fetch(url: str, timeout: float) -> dict: body += chunk if len(body) >= MAX_BODY_BYTES: break - out["title"], out["description"] = _parse_head(body, r.encoding or "utf-8") + meta = _extract_metadata( + _hostname_from_url(url), body, r.encoding or "utf-8" + ) + out["title"] = meta["title"] + out["description"] = meta["description"] + out["rebrand_signal"] = meta["rebrand_signal"] + out["external_links"] = meta["external_links"] except requests.RequestException as e: out["error"] = f"{type(e).__name__}: {e}"[:200] except (ssl.SSLError, OSError) as e: @@ -356,6 +652,8 @@ def _fetch_homepage(domain: str, timeout: float) -> dict: out = { "title": "", "description": "", + "rebrand_signal": "", + "external_links": "", "final_url": "", "http_status": "", "error": "", @@ -366,8 +664,12 @@ def _fetch_homepage(domain: str, timeout: float) -> dict: url = f"{scheme}://{domain}/" primary_status = "" primary_url = "" - primary_title = "" - primary_description = "" + primary_meta = { + "title": "", + "description": "", + "rebrand_signal": "", + "external_links": "", + } primary_err = "" try: with requests.get( @@ -384,18 +686,17 @@ def _fetch_homepage(domain: str, timeout: float) -> dict: body += chunk if len(body) >= MAX_BODY_BYTES: break - primary_title, primary_description = _parse_head( - body, r.encoding or "utf-8" - ) + primary_meta = _extract_metadata(domain, body, r.encoding or "utf-8") except requests.RequestException as e: primary_err = f"{type(e).__name__}: {e}" except socket.error as e: primary_err = f"socket: {e}" # Happy path: requests got a 2xx with parseable head metadata. - if primary_status.startswith("2") and (primary_title or primary_description): - out["title"] = primary_title - out["description"] = primary_description + if primary_status.startswith("2") and ( + primary_meta["title"] or primary_meta["description"] + ): + out.update(primary_meta) out["final_url"] = primary_url out["http_status"] = primary_status out["error"] = "" @@ -409,6 +710,8 @@ def _fetch_homepage(domain: str, timeout: float) -> dict: if cf["title"] or cf["description"]: out["title"] = cf["title"] out["description"] = cf["description"] + out["rebrand_signal"] = cf.get("rebrand_signal", "") + out["external_links"] = cf.get("external_links", "") out["final_url"] = cf["final_url"] or primary_url out["http_status"] = cf["http_status"] or primary_status out["error"] = "" @@ -427,8 +730,7 @@ def _fetch_homepage(domain: str, timeout: float) -> dict: continue # 2xx with empty head — accept whatever we got and stop. - out["title"] = primary_title - out["description"] = primary_description + out.update(primary_meta) out["final_url"] = primary_url out["http_status"] = primary_status out["error"] = "" diff --git a/parsedmarc/resources/maps/detect_rebrands.py b/parsedmarc/resources/maps/detect_rebrands.py new file mode 100644 index 0000000..35215ee --- /dev/null +++ b/parsedmarc/resources/maps/detect_rebrands.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python +"""Re-fetch mapped reverse-DNS base domains and surface possible rebrand signals. + +Walks `base_reverse_dns_map.csv`, fetches each domain's homepage with the same +machinery used by `collect_domain_info.py`, and writes a TSV listing rows where +one of two default drift signals fired: + +- `rebrand_signal` — the homepage's title / description / body text matched a + rebrand-keyword phrase ("is now X", "formerly known as X", "we became X", + ...) *or* a rebrand-themed URL slug or image-alt phrase ("brand-launch", + "brand-announcement", "rebrand", "name-change", "our-new-name", ...). The + path/alt-text scan catches image-only banners — bankonitusa.com's "now + Navanta" banner is an image inside `<a href="https://navanta.com/brand-launch-...">` + with `alt="Brand announcement"` — that pure body-text scanning misses. +- `redirect_changed` — the homepage redirected to a host whose registered + domain is different from the input. Common acquisition pattern (e.g. + vodafone.is → syn.is, apogee.us → boldyn.com) where the original brand is + now served by the acquirer's primary site. + +`external_links` is captured into the output for context — the homepage's +non-self, non-social outbound link hosts — but is *not* a default flag +trigger. Most external links are to partners / customers / vendors and do +not indicate a rebrand; flagging on them would flood review with noise. +Pass `--flag-external-links` to also flag on this signal during a thorough +sweep where missing an image-only banner that lacks rebrand-themed slug +or alt text is worse than the noise. + +The output is meant for periodic review, not automated map mutation. Treat +each hit as a candidate for manual verification per AGENTS.md case-1 / case-2 +rules — a single signal is *one* corroborating source; a real map update +still needs two. + +Run from the `parsedmarc/resources/maps/` directory: + + python detect_rebrands.py [-m base_reverse_dns_map.csv] \\ + [-o rebrand_drift.tsv] [--workers N] [--limit N] + +Resume-safe: re-running only re-fetches domains not already in the output. +""" + +import argparse +import csv +import os +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from urllib.parse import urlparse + +from collect_domain_info import ( + MAP_FILE, + _fetch_homepage, +) + +DEFAULT_OUTPUT = "rebrand_drift.tsv" + +OUTPUT_FIELDS = [ + "domain", + "current_name", + "current_type", + "rebrand_signal", + "external_links", + "final_url", + "redirect_changed", + "title", + "description", + "http_status", + "error", +] + + +def _final_host(final_url: str) -> str: + if not final_url: + return "" + try: + return (urlparse(final_url).hostname or "").lower() + except Exception: + return "" + + +def _redirect_changed(domain: str, final_url: str) -> bool: + """True when the homepage's final hostname is not under the input domain. + + The map keys are already base domains, so any redirect that lands outside + the input domain's name space is a candidate signal — typical case-1 + acquisition redirect (vodafone.is → syn.is). Subdomain redirects under + the same base (www.example.com → example.com) are not flagged. False + positives from generic CDN / login subdomains on a sister-brand host are + accepted; the reviewer judges per AGENTS.md case-2 rules. + """ + host = _final_host(final_url) + if not host: + return False + if host == domain or host.endswith("." + domain): + return False + return True + + +def _load_map(map_path: str) -> list: + """Return [(domain, name, type), ...] from base_reverse_dns_map.csv.""" + rows = [] + with open(map_path, encoding="utf-8", newline="") as f: + reader = csv.DictReader(f) + for row in reader: + d = (row.get("base_reverse_dns") or "").strip().lower() + if d: + rows.append( + ( + d, + (row.get("name") or "").strip(), + (row.get("type") or "").strip(), + ) + ) + return rows + + +def _load_existing(output_path: str) -> set: + done = set() + if not os.path.exists(output_path): + return done + with open(output_path, encoding="utf-8", newline="") as f: + reader = csv.DictReader(f, delimiter="\t") + for row in reader: + d = (row.get("domain") or "").strip().lower() + if d: + done.add(d) + return done + + +def _check_one(domain: str, name: str, type_: str, http_timeout: float) -> dict: + page = _fetch_homepage(domain, http_timeout) + return { + "domain": domain, + "current_name": name, + "current_type": type_, + "rebrand_signal": page.get("rebrand_signal", ""), + "external_links": page.get("external_links", ""), + "final_url": page.get("final_url", ""), + "redirect_changed": "1" + if _redirect_changed(domain, page.get("final_url", "")) + else "", + "title": page.get("title", ""), + "description": page.get("description", ""), + "http_status": page.get("http_status", ""), + "error": page.get("error", ""), + } + + +def _main(): + p = argparse.ArgumentParser(description=(__doc__ or "").splitlines()[0]) + p.add_argument("-m", "--map", default=MAP_FILE) + p.add_argument("-o", "--output", default=DEFAULT_OUTPUT) + p.add_argument("--workers", type=int, default=16) + p.add_argument("--http-timeout", type=float, default=8.0) + p.add_argument( + "--limit", + type=int, + default=0, + help="Only check the first N pending domains (0 = all)", + ) + p.add_argument( + "--include-clean", + action="store_true", + help=( + "Write every fetched row to the output, not just the ones with a " + "rebrand_signal or redirect_changed hit. Useful for spot-checking " + "the no-signal majority." + ), + ) + p.add_argument( + "--flag-external-links", + action="store_true", + help=( + "Also flag rows whose homepage links to any non-self, non-noise " + "external host. Off by default because most external links are " + "to partners / customers / vendors and don't indicate a rebrand " + "— a partner case study would otherwise produce a noisy hit. " + "Useful for thorough sweeps where missing an image-only banner " + "(no rebrand-themed slug or alt text) is worse than the noise." + ), + ) + args = p.parse_args() + + map_rows = _load_map(args.map) + done = _load_existing(args.output) + pending = [r for r in map_rows if r[0] not in done] + if args.limit > 0: + pending = pending[: args.limit] + + print( + f"Map: {len(map_rows)} domains | " + f"already in output: {len(done)} | " + f"to fetch: {len(pending)}", + file=sys.stderr, + ) + if not pending: + return + + write_header = not os.path.exists(args.output) or os.path.getsize(args.output) == 0 + flagged = 0 + with open(args.output, "a", encoding="utf-8", newline="") as out_f: + writer = csv.DictWriter( + out_f, + fieldnames=OUTPUT_FIELDS, + delimiter="\t", + lineterminator="\n", + quoting=csv.QUOTE_MINIMAL, + ) + if write_header: + writer.writeheader() + with ThreadPoolExecutor(max_workers=args.workers) as ex: + futures = { + ex.submit(_check_one, d, n, t, args.http_timeout): d + for (d, n, t) in pending + } + for i, fut in enumerate(as_completed(futures), 1): + d = futures[fut] + try: + row = fut.result() + except Exception as e: + row = {k: "" for k in OUTPUT_FIELDS} + row["domain"] = d + row["error"] = f"unhandled: {type(e).__name__}: {e}"[:200] + hit = bool(row.get("rebrand_signal") or row.get("redirect_changed")) + if args.flag_external_links and row.get("external_links"): + hit = True + if hit or args.include_clean: + writer.writerow(row) + out_f.flush() + if hit: + flagged += 1 + if i % 100 == 0 or i == len(pending): + print( + f" {i}/{len(pending)} fetched, {flagged} flagged: {d}", + file=sys.stderr, + ) + + print(f"Done. {flagged} flagged rows written to {args.output}", file=sys.stderr) + + +if __name__ == "__main__": + _main()