Compare commits

...

1 Commits
9.3.1 ... 9.4.0

Author SHA1 Message Date
Sean Whalen
2032438d3b 9.4.0
### Added

- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
- Add premade OpenSearch index patterns, visualizations, and dashboards

### Changed

- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
- Bump OpenSearch support to `< 4`

### Fixed

- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
2026-03-23 17:08:26 -04:00
8 changed files with 369 additions and 57 deletions

View File

@@ -1,5 +1,23 @@
# Changelog
## 9.4.0
### Added
- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
- Add premade OpenSearch index patterns, visualizations, and dashboards
### Changed
- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
- Bump OpenSearch support to `< 4`
### Fixed
- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
## 9.3.1
### Breaking changes

View File

@@ -15,7 +15,7 @@ services:
condition: service_healthy
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:2
image: opensearchproject/opensearch-dashboards:3
environment:
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
ports:

File diff suppressed because one or more lines are too long

View File

@@ -19,6 +19,7 @@ import yaml
from tqdm import tqdm
from parsedmarc import (
REVERSE_DNS_MAP,
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
ParserError,
@@ -48,7 +49,12 @@ from parsedmarc.mail import (
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
from parsedmarc.utils import (
get_base_domain,
get_reverse_dns,
is_mbox,
load_reverse_dns_map,
)
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
@@ -1068,20 +1074,22 @@ def _main():
elif "reported_domain" in report:
domain = report["reported_domain"]
elif "policies" in report:
domain = report["policies"][0]["domain"]
domain = report["policies"][0]["policy_domain"]
if domain:
domain = get_base_domain(domain)
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
if domain:
domain = domain.lower()
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
return None
def process_reports(reports_):
@@ -1092,6 +1100,22 @@ def _main():
logger.error(message)
output_errors.append(message)
if index_prefix_domain_map is not None:
filtered_tls = []
for report in reports_.get("smtp_tls_reports", []):
if get_index_prefix(report) is not None:
filtered_tls.append(report)
else:
domain = "unknown"
if "policies" in report and report["policies"]:
domain = report["policies"][0].get("policy_domain", "unknown")
logger.debug(
"Ignoring SMTP TLS report for domain not in "
"index_prefix_domain_map: %s",
domain,
)
reports_["smtp_tls_reports"] = filtered_tls
indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
@@ -2087,6 +2111,17 @@ def _main():
_close_output_clients(clients)
clients = new_clients
index_prefix_domain_map = new_index_prefix_domain_map
# Reload the reverse DNS map so changes to the
# map path/URL in the config take effect.
load_reverse_dns_map(
REVERSE_DNS_MAP,
always_use_local_file=new_opts.always_use_local_files,
local_file_path=new_opts.reverse_dns_map_path,
url=new_opts.reverse_dns_map_url,
offline=new_opts.offline,
)
for k, v in vars(new_opts).items():
setattr(opts, k, v)

View File

@@ -1,3 +1,3 @@
__version__ = "9.3.1"
__version__ = "9.4.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -335,6 +335,76 @@ def get_ip_address_country(
return country
def load_reverse_dns_map(
reverse_dns_map: ReverseDNSMap,
*,
always_use_local_file: bool = False,
local_file_path: Optional[str] = None,
url: Optional[str] = None,
offline: bool = False,
) -> None:
"""
Loads the reverse DNS map from a URL or local file.
Clears and repopulates the given map dict in place. If the map is
fetched from a URL, that is tried first; on failure (or if offline/local
mode is selected) the bundled CSV is used as a fallback.
Args:
reverse_dns_map (dict): The map dict to populate (modified in place)
always_use_local_file (bool): Always use a local map file
local_file_path (str): Path to a local map file
url (str): URL to a reverse DNS map
offline (bool): Use the built-in copy of the reverse DNS map
"""
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map.clear()
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = {
"name": row["name"].strip(),
"type": row["type"].strip(),
}
csv_file = io.StringIO()
if not (offline or always_use_local_file):
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
def get_service_from_reverse_dns_base_domain(
base_domain,
*,
@@ -361,55 +431,21 @@ def get_service_from_reverse_dns_base_domain(
"""
base_domain = base_domain.lower().strip()
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None:
reverse_dns_map_value = {}
else:
reverse_dns_map_value = reverse_dns_map
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map_value) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
load_reverse_dns_map(
reverse_dns_map_value,
always_use_local_file=always_use_local_file,
local_file_path=local_file_path,
url=url,
offline=offline,
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
service: ReverseDNSService
try:
service = reverse_dns_map_value[base_domain]

View File

@@ -50,7 +50,7 @@ dependencies = [
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"opensearch-py>=2.4.2,<=4.0.0",
"publicsuffixlist>=0.10.0",
"pygelf>=0.4.2",
"requests>=2.22.0",

195
tests.py
View File

@@ -4,6 +4,7 @@
from __future__ import absolute_import, print_function, unicode_literals
import io
import json
import os
import signal
import sys
@@ -2278,6 +2279,200 @@ watch = true
# Second init (after reload with v2 config): kafka_hosts should be None
self.assertIsNone(init_opts_captures[1].kafka_hosts)
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testReloadRefreshesReverseDnsMap(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_parse_config,
mock_init_clients,
):
"""SIGHUP reload repopulates the reverse DNS map so lookups still work."""
import signal as signal_module
from parsedmarc import REVERSE_DNS_MAP
mock_imap.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
def parse_side_effect(config_file, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
opts.mailbox_watch = True
return None
mock_parse_config.side_effect = parse_side_effect
mock_init_clients.return_value = {}
# Snapshot the map state after each watch_inbox call
map_snapshots = []
watch_calls = [0]
def watch_side_effect(*args, **kwargs):
watch_calls[0] += 1
if watch_calls[0] == 1:
if hasattr(signal_module, "SIGHUP"):
import os
os.kill(os.getpid(), signal_module.SIGHUP)
return
else:
# Capture the map state after reload, before we stop the loop
map_snapshots.append(dict(REVERSE_DNS_MAP))
raise FileExistsError("stop")
mock_watch.side_effect = watch_side_effect
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
cfg.write(self._BASE_CONFIG)
cfg_path = cfg.name
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
# Pre-populate the map so we can verify it gets refreshed
REVERSE_DNS_MAP.clear()
REVERSE_DNS_MAP["stale.example.com"] = {
"name": "Stale",
"type": "stale",
}
original_contents = dict(REVERSE_DNS_MAP)
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
with self.assertRaises(SystemExit):
parsedmarc.cli._main()
self.assertEqual(mock_watch.call_count, 2)
# The map should have been repopulated (not empty, not the stale data)
self.assertEqual(len(map_snapshots), 1)
refreshed = map_snapshots[0]
self.assertGreater(len(refreshed), 0, "Map should not be empty after reload")
self.assertNotEqual(
refreshed,
original_contents,
"Map should have been refreshed, not kept stale data",
)
self.assertNotIn(
"stale.example.com",
refreshed,
"Stale entry should have been cleared by reload",
)
class TestIndexPrefixDomainMapTlsFiltering(unittest.TestCase):
"""Tests that SMTP TLS reports for unmapped domains are filtered out
when index_prefix_domain_map is configured."""
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.IMAPConnection")
def testTlsReportsFilteredByDomainMap(
self,
mock_imap_connection,
mock_get_reports,
):
"""TLS reports for domains not in the map should be silently dropped."""
mock_imap_connection.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [
{
"organization_name": "Allowed Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "allowed-1",
"contact_info": "tls@allowed.example.com",
"policies": [
{
"policy_domain": "allowed.example.com",
"policy_type": "sts",
"successful_session_count": 1,
"failed_session_count": 0,
}
],
},
{
"organization_name": "Unmapped Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "unmapped-1",
"contact_info": "tls@unmapped.example.net",
"policies": [
{
"policy_domain": "unmapped.example.net",
"policy_type": "sts",
"successful_session_count": 5,
"failed_session_count": 0,
}
],
},
{
"organization_name": "Mixed Case Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "mixed-case-1",
"contact_info": "tls@mixedcase.example.com",
"policies": [
{
"policy_domain": "MixedCase.Example.Com",
"policy_type": "sts",
"successful_session_count": 2,
"failed_session_count": 0,
}
],
},
],
}
domain_map = {"tenant_a": ["example.com"]}
with NamedTemporaryFile("w", suffix=".yaml", delete=False) as map_file:
import yaml
yaml.dump(domain_map, map_file)
map_path = map_file.name
self.addCleanup(lambda: os.path.exists(map_path) and os.remove(map_path))
config = f"""[general]
save_smtp_tls = true
silent = false
index_prefix_domain_map = {map_path}
[imap]
host = imap.example.com
user = test-user
password = test-password
"""
with NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
config_file.write(config)
config_path = config_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
captured = io.StringIO()
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
with patch("sys.stdout", captured):
parsedmarc.cli._main()
output = json.loads(captured.getvalue())
tls_reports = output["smtp_tls_reports"]
self.assertEqual(len(tls_reports), 2)
report_ids = {r["report_id"] for r in tls_reports}
self.assertIn("allowed-1", report_ids)
self.assertIn("mixed-case-1", report_ids)
self.assertNotIn("unmapped-1", report_ids)
if __name__ == "__main__":
unittest.main(verbosity=2)