Detect map-key rebrands via homepage drift sweep (#752)

Adds two complementary pieces of M&A drift detection over base_reverse_dns_map.csv:

- `collect_domain_info.py` gains two derived columns. `rebrand_signal` combines
  a body-text regex ("now X" / "formerly known as X" / "we became X" / ...)
  with a narrow path-and-alt-text regex ("rebrand", "brand-launch",
  "brand-announcement", "name-change", "our-new-name", ...) that runs against
  the JSON-unescaped page bytes, so URL slugs and image alt attributes inside
  Elementor / hydration script blobs are reachable. The two-regex split is
  what catches image-only acquisition banners like bankonitusa.com's "now
  Navanta" — a `<a href="https://navanta.com/brand-launch-..."><img
  alt="Brand announcement"></a>` with no visible text — that pure body-text
  scanning misses. `external_links` collects the homepage's non-self,
  non-social outbound link hosts as review context only.

- `detect_rebrands.py` is a new sibling drift sweep. It re-fetches every key
  in base_reverse_dns_map.csv with the same fetch machinery, evaluates two
  default flag triggers (`rebrand_signal` matched, or final URL host doesn't
  sit under the input domain), and writes a compact TSV of just the flagged
  rows. `external_links` is captured into the row as context but is not a
  default trigger — most outbound links are to partners / customers / vendors,
  and flagging them would flood review with noise. `--flag-external-links`
  opts into that signal for thorough sweeps. Resume-safe via `-o`.

Output is review fodder, not automated map mutation: a single signal is one
corroborating source, and promoting a flagged row into the map still requires
a second source per the two-corroborating-sources rule.

README and AGENTS.md updated to document the new columns and script.

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sean Whalen
2026-05-06 21:22:30 -04:00
committed by GitHub
parent 6fa561d172
commit c752e776de
4 changed files with 587 additions and 24 deletions
+2 -1
View File
@@ -224,7 +224,8 @@ When `unknown_base_reverse_dns.csv` has new entries, follow this order rather th
- `find_unknown_base_reverse_dns.py` — regenerates `unknown_base_reverse_dns.csv` from `base_reverse_dns.csv` by subtracting what is already mapped or known-unknown. Enforces the no-full-IP privacy rule at ingest. Translates non-domain-shaped `source_name` rows (raw MMDB `as_name` strings surfaced by the ASN-fallback path in `utils.py:get_ip_address_info` when the IP had no PTR and the `as_domain` was uncategorized) to their corresponding `as_domain` via the bundled MMDB, so the row enters the pipeline as a researchable domain (and drops out automatically if that `as_domain` is already mapped). Run after merging a batch.
- `detect_psl_overrides.py` — scans the lists for clustered IP-containing patterns, auto-adds brand suffixes to `psl_overrides.txt`, folds affected entries to their base, and removes any remaining full-IP entries. Run before the collector on any new batch.
- `collect_domain_info.py` — the bulk enrichment collector described above. Respects `psl_overrides.txt` and skips full-IP entries.
- `collect_domain_info.py` — the bulk enrichment collector described above. Respects `psl_overrides.txt` and skips full-IP entries. Two derived columns surface drift signals that are also useful during initial classification: `rebrand_signal` combines a body-text regex (matches "now X", "formerly known as X", "is now part of X", etc.) with a path/alt-text regex (matches "rebrand", "brand-launch", "brand-announcement", "name-change", "our-new-name") so that image-only acquisition banners — `<a href="…/brand-launch-…"><img alt="Brand announcement"></a>` — also fire. `external_links` lists the homepage's non-self, non-social outbound link hosts; useful as review context but not a flag trigger by default in the drift sweep (most external links are to partners / customers / vendors and don't indicate a rebrand).
- `detect_rebrands.py` — drift sweep that re-fetches every key in `base_reverse_dns_map.csv` with the same machinery as `collect_domain_info.py` and emits a TSV of rows where `rebrand_signal` or `redirect_changed` (final URL host doesn't sit under the input domain) fired. Output is for periodic review — a single signal is one corroborating source; promoting a flagged row still needs a second source per the two-corroborating-sources rule. Resume-safe via `-o`. Use `--limit N` to spot-check a slice; `--include-clean` to also emit non-flagged rows; `--flag-external-links` to additionally flag rows whose only signal is an outbound non-self host (off by default to keep partner/vendor noise out of the review queue).
- `find_bad_utf8.py` — locates invalid UTF-8 bytes (used after past encoding corruption).
- `sortlists.py` — case-insensitive sort + dedupe + `type`-column validator for the list files; the authoritative sorter run after every batch edit.
+20
View File
@@ -129,10 +129,30 @@ Scans `unknown_base_reverse_dns.csv` for full-IP-containing entries that share a
Bulk enrichment collector. For every domain in `unknown_base_reverse_dns.csv` that is not already in `base_reverse_dns_map.csv`, runs `whois` on the domain, fetches a size-capped `https://` GET, resolves A/AAAA records, and runs `whois` on the first resolved IP. Writes a TSV (`domain_info.tsv` by default) with the registrant org/country/registrar, page `<title>`/`<meta description>`, resolved IPs, and IP-WHOIS org/netname/country — the compact metadata a classifier needs to decide each domain in one pass. Respects `psl_overrides.txt`, skips full-IP entries, and is resume-safe (re-running only fetches domains missing from the output file).
The TSV also carries two derived columns that surface drift signals (and double as classification hints when a homepage explicitly names its operator):
- `rebrand_signal` — first ~120-char excerpt of the page where one of two regexes hit. (a) Body-text phrases: *now X*, *is now part of X*, *formerly known as X*, *we became X*, *rebranded as X*, *acquired by X*, *merged with X*, *joined the X*. Common false-positive trailing words (`Now Available`, `Now Hiring`, etc.) are filtered, and the captured brand must start with an uppercase letter. (b) Path / alt-text phrases: `rebrand`, `brand-launch`, `brand-announcement`, `brand-change`, `name-change`, `our-new-name`, `new-name-for`, `acquisition-announcement`, `merger-announcement`. The path scan runs against the JSON-unescaped page bytes, so it sees URL slugs and image alt attributes embedded in script blobs. Real-world case: bankonitusa.com's "now Navanta" banner is image-only — `<a href="https://navanta.com/brand-launch-..."><img alt="Brand announcement"></a>` — and pure body-text scanning misses it; the path regex matches via the `brand-launch` slug and `Brand announcement` alt attribute.
- `external_links` — comma-separated list of up to 5 distinct outbound link hosts, after stripping the input domain (and its subdomains) and a small noise list (social, CDN, analytics, app stores). Useful as context when reviewing a flagged row, but a noisy *flag* — most external links are to partners / customers / vendors that have no operator relationship — so `detect_rebrands.py` does not treat this column as a flag trigger by default. Pass `--flag-external-links` for a thorough sweep.
## domain_info.tsv
The output of `collect_domain_info.py`. Tab-separated, one row per researched domain. Not tracked by Git — it is regenerated on demand and contains transient third-party WHOIS/HTML data.
## detect_rebrands.py
Drift sweep that re-fetches every key in `base_reverse_dns_map.csv` with the same machinery as `collect_domain_info.py` and writes a TSV (`rebrand_drift.tsv` by default) of rows where a drift signal fired. Two signals are flagged by default:
- `rebrand_signal` — the collector's body-text and path/alt-text regexes (see above) matched.
- `redirect_changed` — the homepage's final URL host is not the input domain or a subdomain of it (typical case-1 acquisition redirect, e.g. vodafone.is → syn.is).
`external_links` is captured into the output for context but is not a default trigger — most outbound links are to partners / customers / vendors and would generate noise. Pass `--flag-external-links` to also flag on this column during a thorough sweep where missing an image-only banner that lacks a rebrand-themed slug or alt text is worse than the noise.
The output is for periodic review, not automated map mutation. Each hit is one corroborating source; promoting a flagged row into the map still requires a second source per the two-corroborating-sources rule in [AGENTS.md](../../../AGENTS.md). Resume-safe: re-running only re-fetches keys not already in the output file. Use `--limit N` to spot-check a slice and `--include-clean` to also write non-flagged rows for inspection of the no-signal majority.
## rebrand_drift.tsv
The output of `detect_rebrands.py`. Tab-separated, one row per flagged map key. Not tracked by Git — regenerated on demand.
## sortlists.py
Validation and sorting helper invoked as a module. Alphabetically sorts `base_reverse_dns_map.csv` (case-insensitive by first column, preserving CRLF line endings), deduplicates entries, validates that every `type` appears in `base_reverse_dns_types.txt`, and warns on names that contain unescaped commas or stray whitespace. Run it after any batch merge before committing.
+325 -23
View File
@@ -6,7 +6,15 @@ Reads a list of domains (defaults to the unmapped entries in
useful for classifying an unknown sender:
domain, whois_org, whois_country, registrar, title, description,
final_url, http_status, error
rebrand_signal, external_links, final_url, http_status, ips,
ip_whois_org, ip_whois_netname, ip_whois_country, error
`rebrand_signal` flags rows whose page text matches a phrase like "now X" or
"formerly known as X" — useful both for classifying an unknown sender ("we
became Newfold Digital") and as a drift signal when re-run against existing
map keys via `detect_rebrands.py`. `external_links` carries the homepage's
non-self, non-social outbound link hosts; it catches image-only acquisition
banners that text scanning misses (e.g. bankonitusa.com → navanta.com).
The output is resume-safe: re-running the script only fetches domains that are
not already in the output file. Designed to produce a small file that an LLM
@@ -30,6 +38,7 @@ import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from html.parser import HTMLParser
from urllib.parse import urlparse
import requests
import urllib3
@@ -52,6 +61,8 @@ FIELDS = [
"registrar",
"title",
"description",
"rebrand_signal",
"external_links",
"final_url",
"http_status",
"ips",
@@ -135,6 +146,7 @@ IP_WHOIS_NETNAME_KEYS = ("netname", "network-name")
IP_WHOIS_COUNTRY_KEYS = ("country",)
MAX_BODY_BYTES = 256 * 1024 # truncate responses so a hostile page can't blow up RAM
MAX_BODY_TEXT_CHARS = 100 * 1024 # cap on extracted visible body text
# Privacy filter: drop entries containing a full IPv4 address (four dotted or
# dashed octets). Full IPs in a reverse-DNS base domain reveal a specific
@@ -143,6 +155,227 @@ _FULL_IP_RE = re.compile(
r"(?<![\d])(\d{1,3})[-.](\d{1,3})[-.](\d{1,3})[-.](\d{1,3})(?![\d])"
)
# Rebrand-signal scan. Triggered phrases are followed by a captured brand name
# (capitalized, non-noise word). The reviewer ultimately judges whether a hit
# is a real rebrand banner — the regex's job is to not miss the obvious ones.
# Real cases: "now Navanta", "is now part of Lumen", "formerly known as
# Symantec Email Security", "we became Newfold Digital".
REBRAND_RE = re.compile(
r"(?:"
r"(?:now|formerly(?: known as)?) "
r"|"
r"(?:we became|rebranded(?: as| to)?|merged with|"
r"acquired by|previously known as|previously operated as|"
r"is now (?:a )?part of|new name for|joined the) "
r")"
r"([A-Za-z][A-Za-z0-9&]+)",
re.IGNORECASE,
)
# Path-style rebrand markers that appear in URL slugs and image alt text.
# Real-world image-only rebrand banners (the typical "we got acquired"
# treatment) put the announcement in a slug like
# `/brand-launch-frequently-asked-questions/` and an alt like
# "Brand announcement Learn more", neither of which the body-text
# REBRAND_RE can see. Phrasing here is deliberately narrow — "brand"
# alone is far too common; we require it joined to launch / announcement /
# change / etc. by a space, dash, or underscore, which virtually never
# occurs outside a rebrand context.
REBRAND_PATH_RE = re.compile(
r"(?:"
r"rebrand"
r"|brand[ _-](?:launch|announcement|reveal|refresh|change|update)"
r"|name[ _-]change"
r"|our[ _-]new[ _-](?:name|brand)"
r"|new[ _-]name[ _-]for"
r"|(?:acquisition|merger)[ _-]announcement"
r")",
re.IGNORECASE,
)
# Words that commonly follow "now"/"formerly" outside a rebrand context. The
# regex would otherwise hit "Now Available", "Formerly Open", etc. Add to
# this set if review surfaces a recurring false positive — keep the set
# narrow so real one-word brand names (Navanta, Lumen, Sykt, etc.) survive.
_REBRAND_NOISE = frozenset(
{
"Available",
"Accepting",
"Active",
"Booking",
"Closed",
"Complete",
"Enrolling",
"Expanding",
"Free",
"Hiring",
"Live",
"Loading",
"Offering",
"Online",
"Open",
"Operating",
"Pending",
"Playing",
"Powered",
"Selling",
"Serving",
"Shipping",
"Showing",
"Streaming",
"Supporting",
"Trending",
"Underway",
"You",
"Your",
}
)
# Hostnames that overwhelmingly appear as outbound links on virtually every
# homepage and carry no signal about the operator's identity. Keeping these
# out of `external_links` means the column is dominated by hosts that
# actually tell us something — e.g. an outbound link to navanta.com from
# bankonitusa.com (the rebrand's banner is an image-only `<a href>` with
# no visible "Navanta" text, so href scanning is the only cheap way to
# catch it without rendering JavaScript).
_NOISE_LINK_HOSTS = frozenset(
{
"facebook.com",
"fb.com",
"twitter.com",
"x.com",
"linkedin.com",
"instagram.com",
"youtube.com",
"youtu.be",
"tiktok.com",
"pinterest.com",
"vimeo.com",
"reddit.com",
"medium.com",
"github.com",
"gitlab.com",
"bitbucket.org",
"google.com",
"googleapis.com",
"googletagmanager.com",
"googleadservices.com",
"google-analytics.com",
"gstatic.com",
"doubleclick.net",
"play.google.com",
"apps.apple.com",
"apple.com",
"microsoft.com",
"office.com",
"cloudflare.com",
"jsdelivr.net",
"unpkg.com",
"bootstrapcdn.com",
"fontawesome.com",
"wp.com",
"w.org",
"wordpress.org",
"schema.org",
"ogp.me",
}
)
_HREF_RE = re.compile(
r"""href\s*=\s*['"]https?://([^/'"\s>]+)""",
re.IGNORECASE,
)
def _hostname_from_url(url: str) -> str:
try:
return (urlparse(url).hostname or "").lower()
except Exception:
return ""
def _is_noise_host(host: str) -> bool:
for noise in _NOISE_LINK_HOSTS:
if host == noise or host.endswith("." + noise):
return True
return False
def _external_link_hosts(self_domain: str, text: str, limit: int = 5) -> list:
"""Return up to `limit` distinct external hostnames found in <a href> URLs.
Skips hosts that match the input domain (or any of its subdomains) and
common social/CDN/analytics/utility hosts that appear on practically every
page. Hosts are returned in first-appearance order; a host whose
registered domain matches the input but happens to be a different
subdomain (e.g. login.example.com on example.com's homepage) is treated
as self.
"""
self_domain = (self_domain or "").lower()
seen = []
seen_set = set()
for m in _HREF_RE.finditer(text):
host = m.group(1).lower()
if not host or host in seen_set:
continue
if self_domain and (host == self_domain or host.endswith("." + self_domain)):
continue
if _is_noise_host(host):
continue
seen_set.add(host)
seen.append(host)
if len(seen) >= limit:
break
return seen
def _rebrand_signal(*texts: str) -> str:
"""Return first ~120-char context of a rebrand-keyword hit, or ''.
Scans each input text in order. Returns the first hit whose captured
brand-name token is not on the noise list — keeps the surrounding
sentence so a reviewer can decide at a glance whether the match is a
real banner ("BankOnIT is now Navanta") or residual noise.
"""
for text in texts:
if not text:
continue
for m in REBRAND_RE.finditer(text):
brand = m.group(1)
# Real brand names in rebrand banners are virtually always written
# with an initial capital. Filtering on case lets us match the
# trigger phrase case-insensitively while still rejecting common
# post-trigger noise like "now hiring" / "formerly available".
if not brand or not brand[0].isupper():
continue
if brand in _REBRAND_NOISE:
continue
start = max(0, m.start() - 30)
end = min(len(text), m.end() + 80)
return _strip_field(text[start:end])
return ""
def _rebrand_path_signal(text: str) -> str:
"""Return first ~120-char context of a rebrand-themed path/alt-text hit.
Runs ``REBRAND_PATH_RE`` against the unescaped page text — the same
blob ``_external_link_hosts`` consumes — so URL slugs (`href=
"https://navanta.com/brand-launch-..."`) and image alt attributes
(`alt="Brand announcement"`) are both visible. The regex's phrasing
is narrow enough that hitting it almost always corresponds to a real
rebrand artifact rather than ordinary marketing copy.
"""
if not text:
return ""
m = REBRAND_PATH_RE.search(text)
if not m:
return ""
start = max(0, m.start() - 40)
end = min(len(text), m.end() + 80)
return _strip_field(text[start:end])
def _has_full_ip(s: str) -> bool:
for m in _FULL_IP_RE.finditer(s):
@@ -243,20 +476,32 @@ def _lookup_ip(ip: str, timeout: float) -> dict:
return _parse_ip_whois(_run_whois(ip, timeout))
class _HeadParser(HTMLParser):
"""Extract <title> and the first description-like meta tag."""
class _PageParser(HTMLParser):
"""Extract <title>, the first description-like meta tag, and body text.
Body text excludes the contents of <script>/<style>/<noscript>/<template>
elements — those rarely correspond to anything visible and routinely
contain large embedded JSON blobs that would crowd out the actual page
text under the body-text cap. Whitespace is collapsed at join time.
"""
_SKIP_TAGS = ("script", "style", "noscript", "template")
def __init__(self):
super().__init__(convert_charrefs=True)
self.title = ""
self.description = ""
self._body_parts = []
self._body_chars = 0
self._in_title = False
self._stop = False
self._in_body = False
self._skip_depth = 0
def handle_starttag(self, tag, attrs):
if self._stop:
return
tag = tag.lower()
if tag in self._SKIP_TAGS:
self._skip_depth += 1
return
if tag == "title":
self._in_title = True
elif tag == "meta":
@@ -270,29 +515,72 @@ class _HeadParser(HTMLParser):
):
self.description = _strip_field(a.get("content", ""))
elif tag == "body":
# everything useful is in <head>; stop parsing once we hit <body>
self._stop = True
self._in_body = True
def handle_endtag(self, tag):
if tag.lower() == "title":
tag = tag.lower()
if tag in self._SKIP_TAGS:
if self._skip_depth:
self._skip_depth -= 1
return
if tag == "title":
self._in_title = False
elif tag == "body":
self._in_body = False
def handle_data(self, data):
if self._skip_depth:
return
if self._in_title and not self.title:
self.title = _strip_field(data)
if self._in_body and self._body_chars < MAX_BODY_TEXT_CHARS:
self._body_parts.append(data)
self._body_chars += len(data)
@property
def body_text(self) -> str:
return re.sub(r"\s+", " ", " ".join(self._body_parts)).strip()
def _parse_head(body: bytes, encoding: str) -> tuple:
def _extract_metadata(domain: str, body: bytes, encoding: str) -> dict:
"""Decode the response body once and extract every per-page signal.
Returns ``title``, ``description``, ``rebrand_signal``, ``external_links``.
Decoding once and running both the HTML parser and the href regex on the
same string avoids paying the decode cost twice.
"""
out = {
"title": "",
"description": "",
"rebrand_signal": "",
"external_links": "",
}
try:
text = body.decode(encoding, errors="replace")
except LookupError:
text = body.decode("utf-8", errors="replace")
parser = _HeadParser()
parser = _PageParser()
try:
parser.feed(text)
except Exception:
pass
return parser.title, parser.description
out["title"] = parser.title
out["description"] = parser.description
# Many sites embed serialized HTML inside <script> blocks (block-editor /
# Elementor templates, JSON-LD, hydration payloads) where quotes and
# slashes are JSON-escaped: `href=\"https:\/\/...\"`. The parser already
# skipped that content for body_text, but the URLs and alt-text inside
# it still signal where the page is pointing — bankonitusa.com's "now
# Navanta" banner is image-only `<a href>` with `alt="Brand
# announcement"` and slug `/brand-launch-.../`, all sitting inside an
# escaped Elementor blob. Unescape so the path-style rebrand regex and
# the link-host regex both see them.
unescaped = text.replace('\\"', '"').replace("\\/", "/").replace("\\'", "'")
text_signal = _rebrand_signal(parser.title, parser.description, parser.body_text)
path_signal = _rebrand_path_signal(unescaped)
out["rebrand_signal"] = text_signal or path_signal
out["external_links"] = ",".join(_external_link_hosts(domain, unescaped))
return out
def _browser_fallback_fetch(url: str, timeout: float) -> dict:
@@ -317,6 +605,8 @@ def _browser_fallback_fetch(url: str, timeout: float) -> dict:
out = {
"title": "",
"description": "",
"rebrand_signal": "",
"external_links": "",
"final_url": "",
"http_status": "",
"error": "",
@@ -342,7 +632,13 @@ def _browser_fallback_fetch(url: str, timeout: float) -> dict:
body += chunk
if len(body) >= MAX_BODY_BYTES:
break
out["title"], out["description"] = _parse_head(body, r.encoding or "utf-8")
meta = _extract_metadata(
_hostname_from_url(url), body, r.encoding or "utf-8"
)
out["title"] = meta["title"]
out["description"] = meta["description"]
out["rebrand_signal"] = meta["rebrand_signal"]
out["external_links"] = meta["external_links"]
except requests.RequestException as e:
out["error"] = f"{type(e).__name__}: {e}"[:200]
except (ssl.SSLError, OSError) as e:
@@ -356,6 +652,8 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
out = {
"title": "",
"description": "",
"rebrand_signal": "",
"external_links": "",
"final_url": "",
"http_status": "",
"error": "",
@@ -366,8 +664,12 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
url = f"{scheme}://{domain}/"
primary_status = ""
primary_url = ""
primary_title = ""
primary_description = ""
primary_meta = {
"title": "",
"description": "",
"rebrand_signal": "",
"external_links": "",
}
primary_err = ""
try:
with requests.get(
@@ -384,18 +686,17 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
body += chunk
if len(body) >= MAX_BODY_BYTES:
break
primary_title, primary_description = _parse_head(
body, r.encoding or "utf-8"
)
primary_meta = _extract_metadata(domain, body, r.encoding or "utf-8")
except requests.RequestException as e:
primary_err = f"{type(e).__name__}: {e}"
except socket.error as e:
primary_err = f"socket: {e}"
# Happy path: requests got a 2xx with parseable head metadata.
if primary_status.startswith("2") and (primary_title or primary_description):
out["title"] = primary_title
out["description"] = primary_description
if primary_status.startswith("2") and (
primary_meta["title"] or primary_meta["description"]
):
out.update(primary_meta)
out["final_url"] = primary_url
out["http_status"] = primary_status
out["error"] = ""
@@ -409,6 +710,8 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
if cf["title"] or cf["description"]:
out["title"] = cf["title"]
out["description"] = cf["description"]
out["rebrand_signal"] = cf.get("rebrand_signal", "")
out["external_links"] = cf.get("external_links", "")
out["final_url"] = cf["final_url"] or primary_url
out["http_status"] = cf["http_status"] or primary_status
out["error"] = ""
@@ -427,8 +730,7 @@ def _fetch_homepage(domain: str, timeout: float) -> dict:
continue
# 2xx with empty head — accept whatever we got and stop.
out["title"] = primary_title
out["description"] = primary_description
out.update(primary_meta)
out["final_url"] = primary_url
out["http_status"] = primary_status
out["error"] = ""
@@ -0,0 +1,240 @@
#!/usr/bin/env python
"""Re-fetch mapped reverse-DNS base domains and surface possible rebrand signals.
Walks `base_reverse_dns_map.csv`, fetches each domain's homepage with the same
machinery used by `collect_domain_info.py`, and writes a TSV listing rows where
one of two default drift signals fired:
- `rebrand_signal` — the homepage's title / description / body text matched a
rebrand-keyword phrase ("is now X", "formerly known as X", "we became X",
...) *or* a rebrand-themed URL slug or image-alt phrase ("brand-launch",
"brand-announcement", "rebrand", "name-change", "our-new-name", ...). The
path/alt-text scan catches image-only banners — bankonitusa.com's "now
Navanta" banner is an image inside `<a href="https://navanta.com/brand-launch-...">`
with `alt="Brand announcement"` — that pure body-text scanning misses.
- `redirect_changed` — the homepage redirected to a host whose registered
domain is different from the input. Common acquisition pattern (e.g.
vodafone.is → syn.is, apogee.us → boldyn.com) where the original brand is
now served by the acquirer's primary site.
`external_links` is captured into the output for context — the homepage's
non-self, non-social outbound link hosts — but is *not* a default flag
trigger. Most external links are to partners / customers / vendors and do
not indicate a rebrand; flagging on them would flood review with noise.
Pass `--flag-external-links` to also flag on this signal during a thorough
sweep where missing an image-only banner that lacks rebrand-themed slug
or alt text is worse than the noise.
The output is meant for periodic review, not automated map mutation. Treat
each hit as a candidate for manual verification per AGENTS.md case-1 / case-2
rules — a single signal is *one* corroborating source; a real map update
still needs two.
Run from the `parsedmarc/resources/maps/` directory:
python detect_rebrands.py [-m base_reverse_dns_map.csv] \\
[-o rebrand_drift.tsv] [--workers N] [--limit N]
Resume-safe: re-running only re-fetches domains not already in the output.
"""
import argparse
import csv
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
from collect_domain_info import (
MAP_FILE,
_fetch_homepage,
)
DEFAULT_OUTPUT = "rebrand_drift.tsv"
OUTPUT_FIELDS = [
"domain",
"current_name",
"current_type",
"rebrand_signal",
"external_links",
"final_url",
"redirect_changed",
"title",
"description",
"http_status",
"error",
]
def _final_host(final_url: str) -> str:
if not final_url:
return ""
try:
return (urlparse(final_url).hostname or "").lower()
except Exception:
return ""
def _redirect_changed(domain: str, final_url: str) -> bool:
"""True when the homepage's final hostname is not under the input domain.
The map keys are already base domains, so any redirect that lands outside
the input domain's name space is a candidate signal — typical case-1
acquisition redirect (vodafone.is → syn.is). Subdomain redirects under
the same base (www.example.com → example.com) are not flagged. False
positives from generic CDN / login subdomains on a sister-brand host are
accepted; the reviewer judges per AGENTS.md case-2 rules.
"""
host = _final_host(final_url)
if not host:
return False
if host == domain or host.endswith("." + domain):
return False
return True
def _load_map(map_path: str) -> list:
"""Return [(domain, name, type), ...] from base_reverse_dns_map.csv."""
rows = []
with open(map_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
d = (row.get("base_reverse_dns") or "").strip().lower()
if d:
rows.append(
(
d,
(row.get("name") or "").strip(),
(row.get("type") or "").strip(),
)
)
return rows
def _load_existing(output_path: str) -> set:
done = set()
if not os.path.exists(output_path):
return done
with open(output_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
d = (row.get("domain") or "").strip().lower()
if d:
done.add(d)
return done
def _check_one(domain: str, name: str, type_: str, http_timeout: float) -> dict:
page = _fetch_homepage(domain, http_timeout)
return {
"domain": domain,
"current_name": name,
"current_type": type_,
"rebrand_signal": page.get("rebrand_signal", ""),
"external_links": page.get("external_links", ""),
"final_url": page.get("final_url", ""),
"redirect_changed": "1"
if _redirect_changed(domain, page.get("final_url", ""))
else "",
"title": page.get("title", ""),
"description": page.get("description", ""),
"http_status": page.get("http_status", ""),
"error": page.get("error", ""),
}
def _main():
p = argparse.ArgumentParser(description=(__doc__ or "").splitlines()[0])
p.add_argument("-m", "--map", default=MAP_FILE)
p.add_argument("-o", "--output", default=DEFAULT_OUTPUT)
p.add_argument("--workers", type=int, default=16)
p.add_argument("--http-timeout", type=float, default=8.0)
p.add_argument(
"--limit",
type=int,
default=0,
help="Only check the first N pending domains (0 = all)",
)
p.add_argument(
"--include-clean",
action="store_true",
help=(
"Write every fetched row to the output, not just the ones with a "
"rebrand_signal or redirect_changed hit. Useful for spot-checking "
"the no-signal majority."
),
)
p.add_argument(
"--flag-external-links",
action="store_true",
help=(
"Also flag rows whose homepage links to any non-self, non-noise "
"external host. Off by default because most external links are "
"to partners / customers / vendors and don't indicate a rebrand "
"— a partner case study would otherwise produce a noisy hit. "
"Useful for thorough sweeps where missing an image-only banner "
"(no rebrand-themed slug or alt text) is worse than the noise."
),
)
args = p.parse_args()
map_rows = _load_map(args.map)
done = _load_existing(args.output)
pending = [r for r in map_rows if r[0] not in done]
if args.limit > 0:
pending = pending[: args.limit]
print(
f"Map: {len(map_rows)} domains | "
f"already in output: {len(done)} | "
f"to fetch: {len(pending)}",
file=sys.stderr,
)
if not pending:
return
write_header = not os.path.exists(args.output) or os.path.getsize(args.output) == 0
flagged = 0
with open(args.output, "a", encoding="utf-8", newline="") as out_f:
writer = csv.DictWriter(
out_f,
fieldnames=OUTPUT_FIELDS,
delimiter="\t",
lineterminator="\n",
quoting=csv.QUOTE_MINIMAL,
)
if write_header:
writer.writeheader()
with ThreadPoolExecutor(max_workers=args.workers) as ex:
futures = {
ex.submit(_check_one, d, n, t, args.http_timeout): d
for (d, n, t) in pending
}
for i, fut in enumerate(as_completed(futures), 1):
d = futures[fut]
try:
row = fut.result()
except Exception as e:
row = {k: "" for k in OUTPUT_FIELDS}
row["domain"] = d
row["error"] = f"unhandled: {type(e).__name__}: {e}"[:200]
hit = bool(row.get("rebrand_signal") or row.get("redirect_changed"))
if args.flag_external_links and row.get("external_links"):
hit = True
if hit or args.include_clean:
writer.writerow(row)
out_f.flush()
if hit:
flagged += 1
if i % 100 == 0 or i == len(pending):
print(
f" {i}/{len(pending)} fetched, {flagged} flagged: {d}",
file=sys.stderr,
)
print(f"Done. {flagged} flagged rows written to {args.output}", file=sys.stderr)
if __name__ == "__main__":
_main()