Surface ASN info and use it for source attribution when a PTR is absent (#715)

* Surface ASN info and fall back to it when a PTR is absent

Adds three new fields to every IP source record — ``asn`` (integer,
e.g. 15169), ``asn_name`` (``"Google LLC"``), ``asn_domain``
(``"google.com"``) — sourced from the bundled IPinfo Lite MMDB. These
flow through to CSV, JSON, Elasticsearch, OpenSearch, and Splunk
outputs as ``source_asn``, ``source_asn_name``, ``source_asn_domain``.

More importantly: when an IP has no reverse DNS (common for many
large senders), source attribution now falls back to the ASN domain
as a lookup key into the same ``reverse_dns_map``. Thanks to #712
and #714, ~85% of routed IPv4 space now has an ``as_domain`` that
hits the map, so rows that were previously unattributable now get a
``source_name``/``source_type`` derived from the ASN. When the ASN
domain misses the map, the raw AS name is used as ``source_name``
with ``source_type`` left null — still better than nothing.

Crucially, ``source_reverse_dns`` and ``source_base_domain`` remain
null on ASN-derived rows, so downstream consumers can still tell a
PTR-resolved attribution apart from an ASN-derived one.

ASN is stored as an integer at the schema level (Elasticsearch /
OpenSearch mappings use ``Integer``) so consumers can do range
queries and numeric sorts; dashboards can prepend ``AS`` at display
time. The MMDB reader normalizes both IPinfo's ``"AS15169"`` string
and MaxMind's ``autonomous_system_number`` int to the same int form.

Also fixes a pre-existing caching bug in ``get_ip_address_info``:
entries without reverse DNS were never written to the IP-info cache,
so every no-PTR IP re-did the MMDB read and DNS attempt on every
call. The cache write is now unconditional.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Bump to 9.9.0 and document the ASN fallback work

Updates the changelog with a 9.9.0 entry covering the ASN-domain
aliases (#712, #714), map-maintenance tooling fixes (#713), and the
ASN-fallback source attribution added in this branch.

Extends AGENTS.md to explain that ``base_reverse_dns_map.csv`` is now
a mixed-namespace map (rDNS bases alongside ASN domains) and adds a
short recipe for finding high-value ASN-domain misses against the
bundled MMDB, so future contributors know where the map's second
lookup path comes from.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Document project conventions previously held only in agent memory

Promotes four conventions out of per-agent memory and into AGENTS.md
so every contributor — human or agent — works from the same baseline:

- Run ruff check + format before committing (Code Style).
- Store natively numeric values as numbers, not pre-formatted strings
  (e.g. ASN as int 15169, not "AS15169"; ES/OS mappings as Integer)
  (Code Style).
- Before rewriting a tracked list/data file from freshly-generated
  content, verify the existing content via git — these files
  accumulate manually-curated entries across sessions (Editing tracked
  data files).
- A release isn't done until hatch-built sdist + wheel are attached to
  the GitHub release page; full 8-step sequence documented (Releases).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sean Whalen
2026-04-23 02:13:30 -04:00
committed by GitHub
parent c2678f8e21
commit 2cda5bf59b
11 changed files with 315 additions and 49 deletions
+61
View File
@@ -223,6 +223,67 @@ class Test(unittest.TestCase):
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
print("Passed!")
def testIpAddressInfoSurfacesASNFields(self):
"""ASN number, name, and domain from the bundled MMDB appear on every
IP info result, even when no PTR resolves."""
info = parsedmarc.utils.get_ip_address_info("8.8.8.8", offline=True)
self.assertEqual(info["asn"], 15169)
self.assertIsInstance(info["asn"], int)
self.assertEqual(info["asn_domain"], "google.com")
self.assertTrue(info["asn_name"])
def testIpAddressInfoFallsBackToASNMapEntryWhenNoPTR(self):
"""When reverse DNS is absent, the ASN domain should be used as a
lookup into the reverse_dns_map so the row still gets attributed,
while reverse_dns and base_domain remain null."""
info = parsedmarc.utils.get_ip_address_info("8.8.8.8", offline=True)
self.assertIsNone(info["reverse_dns"])
self.assertIsNone(info["base_domain"])
self.assertEqual(info["name"], "Google (Including Gmail and Google Workspace)")
self.assertEqual(info["type"], "Email Provider")
def testIpAddressInfoFallsBackToRawASNameOnMapMiss(self):
"""When neither PTR nor an ASN-map entry resolves, the raw AS name
is used as source_name with type left null — better than leaving
the row unattributed."""
# 204.79.197.100 is in an ASN whose as_domain is not in the map at
# the time of this test (msn.com); this exercises the asn_name
# fallback branch without depending on a specific map state.
from unittest.mock import patch
with patch(
"parsedmarc.utils.get_ip_address_db_record",
return_value={
"country": "US",
"asn": 64496,
"asn_name": "Some Unmapped Org, Inc.",
"asn_domain": "unmapped-for-this-test.example",
},
):
# Bypass cache to avoid prior-test pollution.
info = parsedmarc.utils.get_ip_address_info(
"192.0.2.1", offline=True, cache=None
)
self.assertIsNone(info["reverse_dns"])
self.assertIsNone(info["base_domain"])
self.assertIsNone(info["type"])
self.assertEqual(info["name"], "Some Unmapped Org, Inc.")
self.assertEqual(info["asn_domain"], "unmapped-for-this-test.example")
def testAggregateCsvExposesASNColumns(self):
"""The aggregate CSV output should include source_asn, source_asn_name,
and source_asn_domain columns."""
result = parsedmarc.parse_report_file(
"samples/aggregate/!example.com!1538204542!1538463818.xml",
always_use_local_files=True,
offline=True,
)
csv_text = parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
header = csv_text.splitlines()[0].split(",")
self.assertIn("source_asn", header)
self.assertIn("source_asn_name", header)
self.assertIn("source_asn_domain", header)
def testOpenSearchSigV4RequiresRegion(self):
with self.assertRaises(opensearch_module.OpenSearchError):
opensearch_module.set_hosts(