mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-05-20 10:55:24 +00:00
5d816a4e56
mailsuite 2.0.0 extracted the IMAP, Microsoft Graph, Gmail, and Maildir connections out of parsedmarc into mailsuite.mailbox so other projects can reuse the same provider-agnostic interface. Replace the parsedmarc/mail submodules with a thin re-export of mailsuite.mailbox and drop the duplicated implementations. Per the migration note in seanthegeek/mailsuite#22, pass token_cache_name="parsedmarc" so existing AuthenticationRecord caches on disk continue to work without re-prompting users to authenticate. The existing graph_url config knob is forwarded unchanged. Drop direct dependencies that are now installed transitively via mailsuite[gmail,msgraph] (msgraph-core, imapclient, google-*). The extras are pulled in non-optionally so Gmail and Microsoft Graph support remain available out of the box. Drop nine test classes that were exercising mailsuite-side implementation internals (TestGmailConnection, TestGraphConnection, TestImapConnection, the _get_creds/_generate_credential half of TestGmailAuthModes, TestImapFallbacks, TestMSGraphFolderFallback, TestMaildirConnection, TestMaildirReportsFolder, TestMaildirUidHandling, TestTokenParentDirCreation); these are mailsuite's tests now. The CLI integration tests that mock parsedmarc.cli.{IMAP,Gmail,MSGraph}Connection are kept. Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2452 lines
89 KiB
Python
Executable File
2452 lines
89 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
|
|
import io
|
|
import json
|
|
import os
|
|
import signal
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from configparser import ConfigParser
|
|
from glob import glob
|
|
from pathlib import Path
|
|
from tempfile import NamedTemporaryFile
|
|
from typing import cast
|
|
from types import SimpleNamespace
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from lxml import etree # type: ignore[import-untyped]
|
|
|
|
import parsedmarc
|
|
import parsedmarc.cli
|
|
import parsedmarc.elastic
|
|
import parsedmarc.opensearch as opensearch_module
|
|
import parsedmarc.utils
|
|
|
|
# Detect if running in GitHub Actions to skip DNS lookups
|
|
OFFLINE_MODE = os.environ.get("GITHUB_ACTIONS", "false").lower() == "true"
|
|
|
|
|
|
def minify_xml(xml_string):
|
|
parser = etree.XMLParser(remove_blank_text=True)
|
|
tree = etree.fromstring(xml_string.encode("utf-8"), parser)
|
|
return etree.tostring(tree, pretty_print=False).decode("utf-8")
|
|
|
|
|
|
def compare_xml(xml1, xml2):
|
|
parser = etree.XMLParser(remove_blank_text=True)
|
|
tree1 = etree.fromstring(xml1.encode("utf-8"), parser)
|
|
tree2 = etree.fromstring(xml2.encode("utf-8"), parser)
|
|
return etree.tostring(tree1) == etree.tostring(tree2)
|
|
|
|
|
|
class Test(unittest.TestCase):
|
|
def testBase64Decoding(self):
|
|
"""Test base64 decoding"""
|
|
# Example from Wikipedia Base64 article
|
|
b64_str = "YW55IGNhcm5hbCBwbGVhcw"
|
|
decoded_str = parsedmarc.utils.decode_base64(b64_str)
|
|
assert decoded_str == b"any carnal pleas"
|
|
|
|
def testPSLDownload(self):
|
|
subdomain = "foo.example.com"
|
|
result = parsedmarc.utils.get_base_domain(subdomain)
|
|
assert result == "example.com"
|
|
|
|
# psl_overrides.txt intentionally folds CDN-customer PTRs so every
|
|
# sender on the same network clusters under one display key.
|
|
# ``.akamaiedge.net`` is an override, so its subdomains collapse to
|
|
# ``akamaiedge.net`` even though the live PSL carries the finer-grained
|
|
# ``c.akamaiedge.net`` — the override is the design decision.
|
|
subdomain = "e3191.c.akamaiedge.net"
|
|
result = parsedmarc.utils.get_base_domain(subdomain)
|
|
assert result == "akamaiedge.net"
|
|
|
|
def testExtractReportXMLComparator(self):
|
|
"""Test XML comparator function"""
|
|
xmlnice_file = open("samples/extract_report/nice-input.xml")
|
|
xmlnice = xmlnice_file.read()
|
|
xmlnice_file.close()
|
|
xmlchanged_file = open("samples/extract_report/changed-input.xml")
|
|
xmlchanged = minify_xml(xmlchanged_file.read())
|
|
xmlchanged_file.close()
|
|
self.assertTrue(compare_xml(xmlnice, xmlnice))
|
|
self.assertTrue(compare_xml(xmlchanged, xmlchanged))
|
|
self.assertFalse(compare_xml(xmlnice, xmlchanged))
|
|
self.assertFalse(compare_xml(xmlchanged, xmlnice))
|
|
print("Passed!")
|
|
|
|
def testExtractReportBytes(self):
|
|
"""Test extract report function for bytes string input"""
|
|
print()
|
|
file = "samples/extract_report/nice-input.xml"
|
|
with open(file, "rb") as f:
|
|
data = f.read()
|
|
print("Testing {0}: ".format(file), end="")
|
|
xmlout = parsedmarc.extract_report(data)
|
|
xmlin_file = open("samples/extract_report/nice-input.xml")
|
|
xmlin = xmlin_file.read()
|
|
xmlin_file.close()
|
|
self.assertTrue(compare_xml(xmlout, xmlin))
|
|
print("Passed!")
|
|
|
|
def testExtractReportXML(self):
|
|
"""Test extract report function for XML input"""
|
|
print()
|
|
report_path = "samples/extract_report/nice-input.xml"
|
|
print("Testing {0}: ".format(report_path), end="")
|
|
xmlout = parsedmarc.extract_report_from_file_path(report_path)
|
|
xmlin_file = open("samples/extract_report/nice-input.xml")
|
|
xmlin = xmlin_file.read()
|
|
xmlin_file.close()
|
|
self.assertTrue(compare_xml(xmlout, xmlin))
|
|
print("Passed!")
|
|
|
|
def testExtractReportXMLFromPath(self):
|
|
"""Test extract report function for pathlib.Path input"""
|
|
report_path = Path("samples/extract_report/nice-input.xml")
|
|
xmlout = parsedmarc.extract_report_from_file_path(report_path)
|
|
with open("samples/extract_report/nice-input.xml") as xmlin_file:
|
|
xmlin = xmlin_file.read()
|
|
self.assertTrue(compare_xml(xmlout, xmlin))
|
|
|
|
def testExtractReportGZip(self):
|
|
"""Test extract report function for gzip input"""
|
|
print()
|
|
file = "samples/extract_report/nice-input.xml.gz"
|
|
print("Testing {0}: ".format(file), end="")
|
|
xmlout = parsedmarc.extract_report_from_file_path(file)
|
|
xmlin_file = open("samples/extract_report/nice-input.xml")
|
|
xmlin = xmlin_file.read()
|
|
xmlin_file.close()
|
|
self.assertTrue(compare_xml(xmlout, xmlin))
|
|
print("Passed!")
|
|
|
|
def testExtractReportZip(self):
|
|
"""Test extract report function for zip input"""
|
|
print()
|
|
file = "samples/extract_report/nice-input.xml.zip"
|
|
print("Testing {0}: ".format(file), end="")
|
|
xmlout = parsedmarc.extract_report_from_file_path(file)
|
|
xmlin_file = open("samples/extract_report/nice-input.xml")
|
|
xmlin = minify_xml(xmlin_file.read())
|
|
xmlin_file.close()
|
|
self.assertTrue(compare_xml(xmlout, xmlin))
|
|
xmlin_file = open("samples/extract_report/changed-input.xml")
|
|
xmlin = xmlin_file.read()
|
|
xmlin_file.close()
|
|
self.assertFalse(compare_xml(xmlout, xmlin))
|
|
print("Passed!")
|
|
|
|
def testParseReportFileAcceptsPathForXML(self):
|
|
report_path = Path(
|
|
"samples/aggregate/protection.outlook.com!example.com!1711756800!1711843200.xml"
|
|
)
|
|
result = parsedmarc.parse_report_file(
|
|
report_path,
|
|
offline=True,
|
|
)
|
|
assert result["report_type"] == "aggregate"
|
|
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
|
|
|
|
def testParseReportFileAcceptsPathForEmail(self):
|
|
report_path = Path(
|
|
"samples/aggregate/Report domain- borschow.com Submitter- google.com Report-ID- 949348866075514174.eml"
|
|
)
|
|
result = parsedmarc.parse_report_file(
|
|
report_path,
|
|
offline=True,
|
|
)
|
|
assert result["report_type"] == "aggregate"
|
|
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
|
|
|
|
def testAggregateSamples(self):
|
|
"""Test sample aggregate/rua DMARC reports"""
|
|
print()
|
|
sample_paths = glob("samples/aggregate/*")
|
|
for sample_path in sample_paths:
|
|
if os.path.isdir(sample_path):
|
|
continue
|
|
print("Testing {0}: ".format(sample_path), end="")
|
|
result = parsedmarc.parse_report_file(
|
|
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
|
|
)
|
|
assert result["report_type"] == "aggregate"
|
|
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
|
|
print("Passed!")
|
|
|
|
def testEmptySample(self):
|
|
"""Test empty/unparasable report"""
|
|
with self.assertRaises(parsedmarc.ParserError):
|
|
parsedmarc.parse_report_file("samples/empty.xml", offline=OFFLINE_MODE)
|
|
|
|
def testForensicSamples(self):
|
|
"""Test sample forensic/ruf/failure DMARC reports"""
|
|
print()
|
|
sample_paths = glob("samples/forensic/*.eml")
|
|
for sample_path in sample_paths:
|
|
print("Testing {0}: ".format(sample_path), end="")
|
|
with open(sample_path) as sample_file:
|
|
sample_content = sample_file.read()
|
|
email_result = parsedmarc.parse_report_email(
|
|
sample_content, offline=OFFLINE_MODE
|
|
)
|
|
assert email_result["report_type"] == "forensic"
|
|
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
|
assert result["report_type"] == "forensic"
|
|
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
|
|
print("Passed!")
|
|
|
|
def testSmtpTlsSamples(self):
|
|
"""Test sample SMTP TLS reports"""
|
|
print()
|
|
sample_paths = glob("samples/smtp_tls/*")
|
|
for sample_path in sample_paths:
|
|
if os.path.isdir(sample_path):
|
|
continue
|
|
print("Testing {0}: ".format(sample_path), end="")
|
|
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
|
assert result["report_type"] == "smtp_tls"
|
|
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
|
|
print("Passed!")
|
|
|
|
def testIpAddressInfoSurfacesASNFields(self):
|
|
"""ASN number, name, and domain from the bundled MMDB appear on every
|
|
IP info result, even when no PTR resolves."""
|
|
info = parsedmarc.utils.get_ip_address_info("8.8.8.8", offline=True)
|
|
self.assertEqual(info["asn"], 15169)
|
|
self.assertIsInstance(info["asn"], int)
|
|
self.assertEqual(info["as_domain"], "google.com")
|
|
self.assertTrue(info["as_name"])
|
|
|
|
def testIpAddressInfoFallsBackToASNMapEntryWhenNoPTR(self):
|
|
"""When reverse DNS is absent, the ASN domain should be used as a
|
|
lookup into the reverse_dns_map so the row still gets attributed,
|
|
while reverse_dns and base_domain remain null."""
|
|
info = parsedmarc.utils.get_ip_address_info("8.8.8.8", offline=True)
|
|
self.assertIsNone(info["reverse_dns"])
|
|
self.assertIsNone(info["base_domain"])
|
|
self.assertEqual(info["name"], "Google (Including Gmail and Google Workspace)")
|
|
self.assertEqual(info["type"], "Email Provider")
|
|
|
|
def testIpAddressInfoFallsBackToRawASNameOnMapMiss(self):
|
|
"""When neither PTR nor an ASN-map entry resolves, the raw AS name
|
|
is used as source_name with type left null — better than leaving
|
|
the row unattributed."""
|
|
# 204.79.197.100 is in an ASN whose as_domain is not in the map at
|
|
# the time of this test (msn.com); this exercises the as_name
|
|
# fallback branch without depending on a specific map state.
|
|
from unittest.mock import patch
|
|
|
|
with patch(
|
|
"parsedmarc.utils.get_ip_address_db_record",
|
|
return_value={
|
|
"country": "US",
|
|
"asn": 64496,
|
|
"as_name": "Some Unmapped Org, Inc.",
|
|
"as_domain": "unmapped-for-this-test.example",
|
|
},
|
|
):
|
|
# Bypass cache to avoid prior-test pollution.
|
|
info = parsedmarc.utils.get_ip_address_info(
|
|
"192.0.2.1", offline=True, cache=None
|
|
)
|
|
self.assertIsNone(info["reverse_dns"])
|
|
self.assertIsNone(info["base_domain"])
|
|
self.assertIsNone(info["type"])
|
|
self.assertEqual(info["name"], "Some Unmapped Org, Inc.")
|
|
self.assertEqual(info["as_domain"], "unmapped-for-this-test.example")
|
|
|
|
def testWeakFallbackAttributionIsNotCached(self):
|
|
"""A transient PTR lookup failure that lands on the raw-as_name
|
|
fallback must not poison the cache. ``get_reverse_dns()`` swallows
|
|
every DNSException as ``None``, so a timeout looks identical to a
|
|
real no-PTR case — if we cached the weak attribution, the 4-hour
|
|
TTL would lock in a misattribution even after the PTR returns.
|
|
|
|
PTR-backed matches and ASN-domain matches are stable attributions
|
|
and must still be cached, so we only skip the specific
|
|
``reverse_dns=None AND type=None AND name=as_name`` state."""
|
|
from unittest.mock import patch
|
|
from expiringdict import ExpiringDict
|
|
|
|
cache = ExpiringDict(max_len=100, max_age_seconds=14400)
|
|
|
|
# Scenario 1: weak fallback (no PTR, unmapped as_domain, raw as_name
|
|
# used). Must NOT be cached.
|
|
with patch(
|
|
"parsedmarc.utils.get_ip_address_db_record",
|
|
return_value={
|
|
"country": "US",
|
|
"asn": 64496,
|
|
"as_name": "Some Unmapped Org, Inc.",
|
|
"as_domain": "unmapped-for-this-test.example",
|
|
},
|
|
):
|
|
parsedmarc.utils.get_ip_address_info("192.0.2.1", offline=True, cache=cache)
|
|
self.assertNotIn("192.0.2.1", cache)
|
|
|
|
# Scenario 2: ASN-domain match (no PTR, as_domain IS in the map).
|
|
# Stable attribution — must still be cached.
|
|
with patch(
|
|
"parsedmarc.utils.get_ip_address_db_record",
|
|
return_value={
|
|
"country": "US",
|
|
"asn": 15169,
|
|
"as_name": "Google LLC",
|
|
"as_domain": "google.com",
|
|
},
|
|
):
|
|
parsedmarc.utils.get_ip_address_info("192.0.2.2", offline=True, cache=cache)
|
|
self.assertIn("192.0.2.2", cache)
|
|
|
|
def testIPinfoAPIPrimarySourceAndInvalidKeyIsFatal(self):
|
|
"""With an API token configured, lookups hit the API first via the
|
|
documented ?token= query param. A 401/403 response propagates as
|
|
``InvalidIPinfoAPIKey`` so the CLI can exit fatally. Any other
|
|
non-2xx or network error falls through to the MMDB silently.
|
|
|
|
The IPinfo Lite API is documented as having no request limit, so
|
|
there is no rate-limit/quota handling to test — only the fatal path
|
|
on invalid tokens and the success path."""
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from parsedmarc.utils import (
|
|
InvalidIPinfoAPIKey,
|
|
configure_ipinfo_api,
|
|
get_ip_address_db_record,
|
|
)
|
|
|
|
def _mock_response(status_code, json_body=None):
|
|
resp = MagicMock()
|
|
resp.status_code = status_code
|
|
resp.ok = 200 <= status_code < 300
|
|
resp.json.return_value = json_body or {}
|
|
return resp
|
|
|
|
try:
|
|
# Success: API returns IPinfo-schema JSON; record comes from API.
|
|
api_json = {
|
|
"ip": "8.8.8.8",
|
|
"asn": "AS15169",
|
|
"as_name": "Google LLC",
|
|
"as_domain": "google.com",
|
|
"country_code": "US",
|
|
}
|
|
with patch(
|
|
"parsedmarc.utils.requests.get",
|
|
return_value=_mock_response(200, api_json),
|
|
) as mock_get:
|
|
configure_ipinfo_api("fake-token", probe=False)
|
|
record = get_ip_address_db_record("8.8.8.8")
|
|
self.assertEqual(record["country"], "US")
|
|
self.assertEqual(record["asn"], 15169)
|
|
self.assertEqual(record["as_domain"], "google.com")
|
|
# Auth must use the documented query param, not a Bearer header.
|
|
_, kwargs = mock_get.call_args
|
|
self.assertEqual(kwargs["params"], {"token": "fake-token"})
|
|
self.assertNotIn("Authorization", kwargs["headers"])
|
|
|
|
# Invalid key: 401 raises a fatal exception even on a random lookup.
|
|
with patch(
|
|
"parsedmarc.utils.requests.get",
|
|
return_value=_mock_response(401),
|
|
):
|
|
configure_ipinfo_api("bad-token", probe=False)
|
|
with self.assertRaises(InvalidIPinfoAPIKey):
|
|
get_ip_address_db_record("8.8.8.8")
|
|
|
|
# Any other non-2xx (e.g. 500, 503) falls back to the MMDB silently.
|
|
configure_ipinfo_api("fake-token", probe=False)
|
|
with patch(
|
|
"parsedmarc.utils.requests.get",
|
|
return_value=_mock_response(500),
|
|
):
|
|
record = get_ip_address_db_record("8.8.8.8")
|
|
# MMDB fallback fills in Google's ASN from the bundled MMDB.
|
|
self.assertEqual(record["asn"], 15169)
|
|
finally:
|
|
configure_ipinfo_api(None)
|
|
|
|
def testAggregateCsvExposesASNColumns(self):
|
|
"""The aggregate CSV output should include source_asn, source_as_name,
|
|
and source_as_domain columns."""
|
|
result = parsedmarc.parse_report_file(
|
|
"samples/aggregate/!example.com!1538204542!1538463818.xml",
|
|
always_use_local_files=True,
|
|
offline=True,
|
|
)
|
|
csv_text = parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
|
|
header = csv_text.splitlines()[0].split(",")
|
|
self.assertIn("source_asn", header)
|
|
self.assertIn("source_as_name", header)
|
|
self.assertIn("source_as_domain", header)
|
|
|
|
def testOpenSearchSigV4RequiresRegion(self):
|
|
with self.assertRaises(opensearch_module.OpenSearchError):
|
|
opensearch_module.set_hosts(
|
|
"https://example.org:9200",
|
|
auth_type="awssigv4",
|
|
)
|
|
|
|
def testOpenSearchSigV4ConfiguresConnectionClass(self):
|
|
fake_credentials = object()
|
|
with patch.object(opensearch_module.boto3, "Session") as session_cls:
|
|
session_cls.return_value.get_credentials.return_value = fake_credentials
|
|
with patch.object(
|
|
opensearch_module, "AWSV4SignerAuth", return_value="auth"
|
|
) as signer:
|
|
with patch.object(
|
|
opensearch_module.connections, "create_connection"
|
|
) as create_connection:
|
|
opensearch_module.set_hosts(
|
|
"https://example.org:9200",
|
|
use_ssl=True,
|
|
auth_type="awssigv4",
|
|
aws_region="eu-west-1",
|
|
)
|
|
signer.assert_called_once_with(fake_credentials, "eu-west-1", "es")
|
|
create_connection.assert_called_once()
|
|
self.assertEqual(
|
|
create_connection.call_args.kwargs.get("connection_class"),
|
|
opensearch_module.RequestsHttpConnection,
|
|
)
|
|
self.assertEqual(create_connection.call_args.kwargs.get("http_auth"), "auth")
|
|
|
|
def testOpenSearchSigV4RejectsUnknownAuthType(self):
|
|
with self.assertRaises(opensearch_module.OpenSearchError):
|
|
opensearch_module.set_hosts(
|
|
"https://example.org:9200",
|
|
auth_type="kerberos",
|
|
)
|
|
|
|
def testOpenSearchSigV4RequiresAwsCredentials(self):
|
|
with patch.object(opensearch_module.boto3, "Session") as session_cls:
|
|
session_cls.return_value.get_credentials.return_value = None
|
|
with self.assertRaises(opensearch_module.OpenSearchError):
|
|
opensearch_module.set_hosts(
|
|
"https://example.org:9200",
|
|
auth_type="awssigv4",
|
|
aws_region="eu-west-1",
|
|
)
|
|
|
|
@patch("parsedmarc.cli.opensearch.migrate_indexes")
|
|
@patch("parsedmarc.cli.opensearch.set_hosts")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testCliPassesOpenSearchSigV4Settings(
|
|
self,
|
|
mock_imap_connection,
|
|
mock_get_reports,
|
|
mock_set_hosts,
|
|
_mock_migrate_indexes,
|
|
):
|
|
mock_imap_connection.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
config = """[general]
|
|
save_aggregate = true
|
|
silent = true
|
|
|
|
[imap]
|
|
host = imap.example.com
|
|
user = test-user
|
|
password = test-password
|
|
|
|
[opensearch]
|
|
hosts = localhost
|
|
authentication_type = awssigv4
|
|
aws_region = eu-west-1
|
|
aws_service = aoss
|
|
"""
|
|
with tempfile.NamedTemporaryFile(
|
|
"w", suffix=".ini", delete=False
|
|
) as config_file:
|
|
config_file.write(config)
|
|
config_path = config_file.name
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(mock_set_hosts.call_args.kwargs.get("auth_type"), "awssigv4")
|
|
self.assertEqual(mock_set_hosts.call_args.kwargs.get("aws_region"), "eu-west-1")
|
|
self.assertEqual(mock_set_hosts.call_args.kwargs.get("aws_service"), "aoss")
|
|
|
|
@patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch")
|
|
@patch("parsedmarc.cli.elastic.migrate_indexes")
|
|
@patch("parsedmarc.cli.elastic.set_hosts")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testFailOnOutputErrorExits(
|
|
self,
|
|
mock_imap_connection,
|
|
mock_get_reports,
|
|
_mock_set_hosts,
|
|
_mock_migrate_indexes,
|
|
mock_save_aggregate,
|
|
):
|
|
"""CLI should exit with code 1 when fail_on_output_error is enabled"""
|
|
mock_imap_connection.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [{"policy_published": {"domain": "example.com"}}],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError(
|
|
"simulated output failure"
|
|
)
|
|
|
|
config = """[general]
|
|
save_aggregate = true
|
|
fail_on_output_error = true
|
|
silent = true
|
|
|
|
[imap]
|
|
host = imap.example.com
|
|
user = test-user
|
|
password = test-password
|
|
|
|
[elasticsearch]
|
|
hosts = localhost
|
|
"""
|
|
with tempfile.NamedTemporaryFile(
|
|
"w", suffix=".ini", delete=False
|
|
) as config_file:
|
|
config_file.write(config)
|
|
config_path = config_file.name
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
|
with self.assertRaises(SystemExit) as ctx:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(ctx.exception.code, 1)
|
|
mock_save_aggregate.assert_called_once()
|
|
|
|
@patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch")
|
|
@patch("parsedmarc.cli.elastic.migrate_indexes")
|
|
@patch("parsedmarc.cli.elastic.set_hosts")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testOutputErrorDoesNotExitWhenDisabled(
|
|
self,
|
|
mock_imap_connection,
|
|
mock_get_reports,
|
|
_mock_set_hosts,
|
|
_mock_migrate_indexes,
|
|
mock_save_aggregate,
|
|
):
|
|
mock_imap_connection.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [{"policy_published": {"domain": "example.com"}}],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError(
|
|
"simulated output failure"
|
|
)
|
|
|
|
config = """[general]
|
|
save_aggregate = true
|
|
fail_on_output_error = false
|
|
silent = true
|
|
|
|
[imap]
|
|
host = imap.example.com
|
|
user = test-user
|
|
password = test-password
|
|
|
|
[elasticsearch]
|
|
hosts = localhost
|
|
"""
|
|
with tempfile.NamedTemporaryFile(
|
|
"w", suffix=".ini", delete=False
|
|
) as config_file:
|
|
config_file.write(config)
|
|
config_path = config_file.name
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
|
parsedmarc.cli._main()
|
|
|
|
mock_save_aggregate.assert_called_once()
|
|
|
|
@patch("parsedmarc.cli.opensearch.save_forensic_report_to_opensearch")
|
|
@patch("parsedmarc.cli.opensearch.migrate_indexes")
|
|
@patch("parsedmarc.cli.opensearch.set_hosts")
|
|
@patch("parsedmarc.cli.elastic.save_forensic_report_to_elasticsearch")
|
|
@patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch")
|
|
@patch("parsedmarc.cli.elastic.migrate_indexes")
|
|
@patch("parsedmarc.cli.elastic.set_hosts")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testFailOnOutputErrorExitsWithMultipleSinkErrors(
|
|
self,
|
|
mock_imap_connection,
|
|
mock_get_reports,
|
|
_mock_es_set_hosts,
|
|
_mock_es_migrate,
|
|
mock_save_aggregate,
|
|
_mock_save_forensic_elastic,
|
|
_mock_os_set_hosts,
|
|
_mock_os_migrate,
|
|
mock_save_forensic_opensearch,
|
|
):
|
|
mock_imap_connection.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [{"policy_published": {"domain": "example.com"}}],
|
|
"forensic_reports": [{"reported_domain": "example.com"}],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError(
|
|
"aggregate sink failed"
|
|
)
|
|
mock_save_forensic_opensearch.side_effect = (
|
|
parsedmarc.cli.opensearch.OpenSearchError("forensic sink failed")
|
|
)
|
|
|
|
config = """[general]
|
|
save_aggregate = true
|
|
save_forensic = true
|
|
fail_on_output_error = true
|
|
silent = true
|
|
|
|
[imap]
|
|
host = imap.example.com
|
|
user = test-user
|
|
password = test-password
|
|
|
|
[elasticsearch]
|
|
hosts = localhost
|
|
|
|
[opensearch]
|
|
hosts = localhost
|
|
"""
|
|
with tempfile.NamedTemporaryFile(
|
|
"w", suffix=".ini", delete=False
|
|
) as config_file:
|
|
config_file.write(config)
|
|
config_path = config_file.name
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
|
with self.assertRaises(SystemExit) as ctx:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(ctx.exception.code, 1)
|
|
mock_save_aggregate.assert_called_once()
|
|
mock_save_forensic_opensearch.assert_called_once()
|
|
|
|
|
|
class _BreakLoop(BaseException):
|
|
pass
|
|
|
|
|
|
class TestGmailAuthModes(unittest.TestCase):
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.GmailConnection")
|
|
def testCliPassesGmailServiceAccountAuthSettings(
|
|
self, mock_gmail_connection, mock_get_mailbox_reports
|
|
):
|
|
mock_gmail_connection.return_value = MagicMock()
|
|
mock_get_mailbox_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
config = """[general]
|
|
silent = true
|
|
|
|
[gmail_api]
|
|
credentials_file = /tmp/service-account.json
|
|
auth_mode = service_account
|
|
service_account_user = dmarc@example.com
|
|
scopes = https://www.googleapis.com/auth/gmail.modify
|
|
"""
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg_file:
|
|
cfg_file.write(config)
|
|
config_path = cfg_file.name
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(
|
|
mock_gmail_connection.call_args.kwargs.get("auth_mode"), "service_account"
|
|
)
|
|
self.assertEqual(
|
|
mock_gmail_connection.call_args.kwargs.get("service_account_user"),
|
|
"dmarc@example.com",
|
|
)
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.GmailConnection")
|
|
def testCliAcceptsDelegatedUserAlias(self, mock_gmail_connection, mock_get_reports):
|
|
mock_gmail_connection.return_value = MagicMock()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
config = """[general]
|
|
silent = true
|
|
|
|
[gmail_api]
|
|
credentials_file = /tmp/service-account.json
|
|
auth_mode = service_account
|
|
delegated_user = delegated@example.com
|
|
scopes = https://www.googleapis.com/auth/gmail.modify
|
|
"""
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg_file:
|
|
cfg_file.write(config)
|
|
config_path = cfg_file.name
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(
|
|
mock_gmail_connection.call_args.kwargs.get("service_account_user"),
|
|
"delegated@example.com",
|
|
)
|
|
|
|
|
|
class TestMailboxWatchSince(unittest.TestCase):
|
|
def setUp(self):
|
|
from parsedmarc.log import logger as _logger
|
|
|
|
_logger.disabled = True
|
|
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
|
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
|
self._stdout_patch.start()
|
|
self._stderr_patch.start()
|
|
|
|
def tearDown(self):
|
|
from parsedmarc.log import logger as _logger
|
|
|
|
_logger.disabled = False
|
|
self._stderr_patch.stop()
|
|
self._stdout_patch.stop()
|
|
|
|
def testWatchInboxPassesSinceToMailboxFetch(self):
|
|
mailbox_connection = SimpleNamespace()
|
|
|
|
def fake_watch(check_callback, check_timeout, config_reloading=None):
|
|
check_callback(mailbox_connection)
|
|
raise _BreakLoop()
|
|
|
|
mailbox_connection.watch = fake_watch
|
|
callback = MagicMock()
|
|
with patch.object(
|
|
parsedmarc, "get_dmarc_reports_from_mailbox", return_value={}
|
|
) as mocked:
|
|
with self.assertRaises(_BreakLoop):
|
|
parsedmarc.watch_inbox(
|
|
mailbox_connection=cast(
|
|
parsedmarc.MailboxConnection, mailbox_connection
|
|
),
|
|
callback=callback,
|
|
check_timeout=1,
|
|
batch_size=10,
|
|
since="1d",
|
|
)
|
|
self.assertEqual(mocked.call_args.kwargs.get("since"), "1d")
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.watch_inbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testCliPassesSinceToWatchInbox(
|
|
self, mock_imap_connection, mock_watch_inbox, mock_get_mailbox_reports
|
|
):
|
|
mock_imap_connection.return_value = object()
|
|
mock_get_mailbox_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
mock_watch_inbox.side_effect = FileExistsError("stop-watch-loop")
|
|
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[imap]
|
|
host = imap.example.com
|
|
user = user
|
|
password = pass
|
|
|
|
[mailbox]
|
|
watch = true
|
|
since = 2d
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, 1)
|
|
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
|
|
|
|
|
|
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
|
|
def __init__(self):
|
|
self.fetch_calls: list[dict[str, object]] = []
|
|
|
|
def create_folder(self, folder_name: str):
|
|
return None
|
|
|
|
def fetch_messages(self, reports_folder: str, **kwargs):
|
|
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
|
|
return []
|
|
|
|
def fetch_message(self, message_id) -> str:
|
|
return ""
|
|
|
|
def delete_message(self, message_id):
|
|
return None
|
|
|
|
def move_message(self, message_id, folder_name: str):
|
|
return None
|
|
|
|
def keepalive(self):
|
|
return None
|
|
|
|
def watch(self, check_callback, check_timeout, config_reloading=None):
|
|
return None
|
|
|
|
|
|
class TestMailboxPerformance(unittest.TestCase):
|
|
def setUp(self):
|
|
from parsedmarc.log import logger as _logger
|
|
|
|
_logger.disabled = True
|
|
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
|
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
|
self._stdout_patch.start()
|
|
self._stderr_patch.start()
|
|
|
|
def tearDown(self):
|
|
from parsedmarc.log import logger as _logger
|
|
|
|
_logger.disabled = False
|
|
self._stderr_patch.stop()
|
|
self._stdout_patch.stop()
|
|
|
|
def testBatchModeAvoidsExtraFullFetch(self):
|
|
connection = _DummyMailboxConnection()
|
|
parsedmarc.get_dmarc_reports_from_mailbox(
|
|
connection=connection,
|
|
reports_folder="INBOX",
|
|
test=True,
|
|
batch_size=10,
|
|
create_folders=False,
|
|
)
|
|
self.assertEqual(len(connection.fetch_calls), 1)
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
def testCliPassesMsGraphCertificateAuthSettings(
|
|
self, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
mock_graph_connection.return_value = object()
|
|
mock_get_mailbox_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = Certificate
|
|
client_id = client-id
|
|
tenant_id = tenant-id
|
|
mailbox = shared@example.com
|
|
certificate_path = /tmp/msgraph-cert.pem
|
|
certificate_password = cert-pass
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("auth_method"), "Certificate"
|
|
)
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("certificate_path"),
|
|
"/tmp/msgraph-cert.pem",
|
|
)
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("certificate_password"),
|
|
"cert-pass",
|
|
)
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphCertificatePath(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = Certificate
|
|
client_id = client-id
|
|
tenant_id = tenant-id
|
|
mailbox = shared@example.com
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"certificate_path setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
def testCliUsesMsGraphUserAsMailboxForUsernamePasswordAuth(
|
|
self, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
mock_graph_connection.return_value = object()
|
|
mock_get_mailbox_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = UsernamePassword
|
|
client_id = client-id
|
|
client_secret = client-secret
|
|
user = owner@example.com
|
|
password = test-password
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("mailbox"),
|
|
"owner@example.com",
|
|
)
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("username"),
|
|
"owner@example.com",
|
|
)
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphPasswordForUsernamePasswordAuth(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = UsernamePassword
|
|
client_id = client-id
|
|
client_secret = client-secret
|
|
user = owner@example.com
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"password setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
class TestMSGraphCliValidation(unittest.TestCase):
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
def testCliPassesMsGraphClientSecretAuthSettings(
|
|
self, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
mock_graph_connection.return_value = object()
|
|
mock_get_mailbox_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = ClientSecret
|
|
client_id = client-id
|
|
client_secret = client-secret
|
|
tenant_id = tenant-id
|
|
mailbox = shared@example.com
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("auth_method"), "ClientSecret"
|
|
)
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("client_secret"),
|
|
"client-secret",
|
|
)
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("tenant_id"), "tenant-id"
|
|
)
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("mailbox"),
|
|
"shared@example.com",
|
|
)
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphClientSecretForClientSecretAuth(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = ClientSecret
|
|
client_id = client-id
|
|
tenant_id = tenant-id
|
|
mailbox = shared@example.com
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"client_secret setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphTenantIdForClientSecretAuth(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = ClientSecret
|
|
client_id = client-id
|
|
client_secret = client-secret
|
|
mailbox = shared@example.com
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"tenant_id setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphMailboxForClientSecretAuth(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = ClientSecret
|
|
client_id = client-id
|
|
client_secret = client-secret
|
|
tenant_id = tenant-id
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"mailbox setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
def testCliAllowsMsGraphDeviceCodeWithoutUser(
|
|
self, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
mock_graph_connection.return_value = object()
|
|
mock_get_mailbox_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = DeviceCode
|
|
client_id = client-id
|
|
tenant_id = tenant-id
|
|
mailbox = shared@example.com
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("auth_method"), "DeviceCode"
|
|
)
|
|
self.assertEqual(
|
|
mock_graph_connection.call_args.kwargs.get("mailbox"),
|
|
"shared@example.com",
|
|
)
|
|
self.assertIsNone(mock_graph_connection.call_args.kwargs.get("username"))
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphTenantIdForDeviceCodeAuth(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = DeviceCode
|
|
client_id = client-id
|
|
mailbox = shared@example.com
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"tenant_id setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphMailboxForDeviceCodeAuth(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = DeviceCode
|
|
client_id = client-id
|
|
tenant_id = tenant-id
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"mailbox setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphTenantIdForCertificateAuth(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = Certificate
|
|
client_id = client-id
|
|
mailbox = shared@example.com
|
|
certificate_path = /tmp/msgraph-cert.pem
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"tenant_id setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
@patch("parsedmarc.cli.logger")
|
|
def testCliRequiresMsGraphMailboxForCertificateAuth(
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
):
|
|
config_text = """[general]
|
|
silent = true
|
|
|
|
[msgraph]
|
|
auth_method = Certificate
|
|
client_id = client-id
|
|
tenant_id = tenant-id
|
|
certificate_path = /tmp/msgraph-cert.pem
|
|
"""
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_text)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
mock_logger.critical.assert_called_once_with(
|
|
"mailbox setting missing from the msgraph config section"
|
|
)
|
|
mock_graph_connection.assert_not_called()
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
class TestSighupReload(unittest.TestCase):
|
|
"""Tests for SIGHUP-driven configuration reload in watch mode."""
|
|
|
|
def setUp(self):
|
|
from parsedmarc.log import logger as _logger
|
|
|
|
_logger.disabled = True
|
|
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
|
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
|
self._stdout_patch.start()
|
|
self._stderr_patch.start()
|
|
|
|
def tearDown(self):
|
|
from parsedmarc.log import logger as _logger
|
|
|
|
_logger.disabled = False
|
|
self._stderr_patch.stop()
|
|
self._stdout_patch.stop()
|
|
|
|
_BASE_CONFIG = """[general]
|
|
silent = true
|
|
|
|
[imap]
|
|
host = imap.example.com
|
|
user = user
|
|
password = pass
|
|
|
|
[mailbox]
|
|
watch = true
|
|
"""
|
|
|
|
@unittest.skipUnless(
|
|
hasattr(signal, "SIGHUP"),
|
|
"SIGHUP not available on this platform",
|
|
)
|
|
@patch("parsedmarc.cli._init_output_clients")
|
|
@patch("parsedmarc.cli._parse_config")
|
|
@patch("parsedmarc.cli._load_config")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.watch_inbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testSighupTriggersReloadAndWatchRestarts(
|
|
self,
|
|
mock_imap,
|
|
mock_watch,
|
|
mock_get_reports,
|
|
mock_load_config,
|
|
mock_parse_config,
|
|
mock_init_clients,
|
|
):
|
|
"""SIGHUP causes watch to return, config is re-parsed, and watch restarts."""
|
|
import signal as signal_module
|
|
|
|
mock_imap.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
mock_load_config.return_value = ConfigParser()
|
|
|
|
def parse_side_effect(config, opts):
|
|
opts.imap_host = "imap.example.com"
|
|
opts.imap_user = "user"
|
|
opts.imap_password = "pass"
|
|
opts.mailbox_watch = True
|
|
return None
|
|
|
|
mock_parse_config.side_effect = parse_side_effect
|
|
mock_init_clients.return_value = {}
|
|
|
|
call_count = [0]
|
|
|
|
def watch_side_effect(*args, **kwargs):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
# Simulate SIGHUP arriving while watch is running
|
|
if hasattr(signal_module, "SIGHUP"):
|
|
import os
|
|
|
|
os.kill(os.getpid(), signal_module.SIGHUP)
|
|
return # Normal return — reload loop will continue
|
|
else:
|
|
raise FileExistsError("stop-watch-loop")
|
|
|
|
mock_watch.side_effect = watch_side_effect
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(self._BASE_CONFIG)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as cm:
|
|
parsedmarc.cli._main()
|
|
|
|
# Exited with code 1 (from FileExistsError handler)
|
|
self.assertEqual(cm.exception.code, 1)
|
|
# watch_inbox was called twice: initial run + after reload
|
|
self.assertEqual(mock_watch.call_count, 2)
|
|
# _parse_config called for initial load + reload
|
|
self.assertGreaterEqual(mock_parse_config.call_count, 2)
|
|
|
|
@unittest.skipUnless(
|
|
hasattr(signal, "SIGHUP"),
|
|
"SIGHUP not available on this platform",
|
|
)
|
|
@patch("parsedmarc.cli._init_output_clients")
|
|
@patch("parsedmarc.cli._parse_config")
|
|
@patch("parsedmarc.cli._load_config")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.watch_inbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testInvalidConfigOnReloadKeepsPreviousState(
|
|
self,
|
|
mock_imap,
|
|
mock_watch,
|
|
mock_get_reports,
|
|
mock_load_config,
|
|
mock_parse_config,
|
|
mock_init_clients,
|
|
):
|
|
"""A failing reload leaves opts and clients unchanged."""
|
|
import signal as signal_module
|
|
|
|
mock_imap.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
mock_load_config.return_value = ConfigParser()
|
|
|
|
# Initial parse sets required opts; reload parse raises
|
|
initial_map = {"prefix_": ["example.com"]}
|
|
call_count = [0]
|
|
|
|
def parse_side_effect(config, opts):
|
|
call_count[0] += 1
|
|
opts.imap_host = "imap.example.com"
|
|
opts.imap_user = "user"
|
|
opts.imap_password = "pass"
|
|
opts.mailbox_watch = True
|
|
if call_count[0] == 1:
|
|
return initial_map
|
|
raise RuntimeError("bad config")
|
|
|
|
mock_parse_config.side_effect = parse_side_effect
|
|
|
|
initial_clients = {"s3_client": MagicMock()}
|
|
mock_init_clients.return_value = initial_clients
|
|
|
|
watch_calls = [0]
|
|
|
|
def watch_side_effect(*args, **kwargs):
|
|
watch_calls[0] += 1
|
|
if watch_calls[0] == 1:
|
|
if hasattr(signal_module, "SIGHUP"):
|
|
import os
|
|
|
|
os.kill(os.getpid(), signal_module.SIGHUP)
|
|
return
|
|
else:
|
|
raise FileExistsError("stop")
|
|
|
|
mock_watch.side_effect = watch_side_effect
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(self._BASE_CONFIG)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit) as cm:
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(cm.exception.code, 1)
|
|
# watch was still called twice (reload loop continued after failed reload)
|
|
self.assertEqual(mock_watch.call_count, 2)
|
|
# The failed reload must not have closed the original clients
|
|
initial_clients["s3_client"].close.assert_not_called()
|
|
|
|
@unittest.skipUnless(
|
|
hasattr(signal, "SIGHUP"),
|
|
"SIGHUP not available on this platform",
|
|
)
|
|
@patch("parsedmarc.cli._init_output_clients")
|
|
@patch("parsedmarc.cli._parse_config")
|
|
@patch("parsedmarc.cli._load_config")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.watch_inbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testReloadClosesOldClients(
|
|
self,
|
|
mock_imap,
|
|
mock_watch,
|
|
mock_get_reports,
|
|
mock_load_config,
|
|
mock_parse_config,
|
|
mock_init_clients,
|
|
):
|
|
"""Successful reload closes the old output clients before replacing them."""
|
|
import signal as signal_module
|
|
|
|
mock_imap.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
mock_load_config.return_value = ConfigParser()
|
|
|
|
def parse_side_effect(config, opts):
|
|
opts.imap_host = "imap.example.com"
|
|
opts.imap_user = "user"
|
|
opts.imap_password = "pass"
|
|
opts.mailbox_watch = True
|
|
return None
|
|
|
|
mock_parse_config.side_effect = parse_side_effect
|
|
|
|
old_client = MagicMock()
|
|
new_client = MagicMock()
|
|
init_call = [0]
|
|
|
|
def init_side_effect(opts):
|
|
init_call[0] += 1
|
|
if init_call[0] == 1:
|
|
return {"kafka_client": old_client}
|
|
return {"kafka_client": new_client}
|
|
|
|
mock_init_clients.side_effect = init_side_effect
|
|
|
|
watch_calls = [0]
|
|
|
|
def watch_side_effect(*args, **kwargs):
|
|
watch_calls[0] += 1
|
|
if watch_calls[0] == 1:
|
|
if hasattr(signal_module, "SIGHUP"):
|
|
import os
|
|
|
|
os.kill(os.getpid(), signal_module.SIGHUP)
|
|
return
|
|
else:
|
|
raise FileExistsError("stop")
|
|
|
|
mock_watch.side_effect = watch_side_effect
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(self._BASE_CONFIG)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit):
|
|
parsedmarc.cli._main()
|
|
|
|
# Old client must have been closed when reload succeeded
|
|
old_client.close.assert_called_once()
|
|
|
|
@unittest.skipUnless(
|
|
hasattr(signal, "SIGHUP"),
|
|
"SIGHUP not available on this platform",
|
|
)
|
|
@patch("parsedmarc.cli._init_output_clients")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.watch_inbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testRemovedConfigSectionTakesEffectOnReload(
|
|
self,
|
|
mock_imap,
|
|
mock_watch,
|
|
mock_get_reports,
|
|
mock_init_clients,
|
|
):
|
|
"""Removing a config section on reload resets that option to its default."""
|
|
import signal as signal_module
|
|
|
|
mock_imap.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
mock_init_clients.return_value = {}
|
|
|
|
# First config sets kafka_hosts (with required topics); second removes it.
|
|
config_v1 = (
|
|
self._BASE_CONFIG
|
|
+ "\n[kafka]\nhosts = kafka.example.com:9092\n"
|
|
+ "aggregate_topic = dmarc_agg\n"
|
|
+ "forensic_topic = dmarc_forensic\n"
|
|
+ "smtp_tls_topic = smtp_tls\n"
|
|
)
|
|
config_v2 = self._BASE_CONFIG # no [kafka] section
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(config_v1)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
watch_calls = [0]
|
|
|
|
def watch_side_effect(*args, **kwargs):
|
|
watch_calls[0] += 1
|
|
if watch_calls[0] == 1:
|
|
# Rewrite config to remove kafka before triggering reload
|
|
with open(cfg_path, "w") as f:
|
|
f.write(config_v2)
|
|
if hasattr(signal_module, "SIGHUP"):
|
|
import os
|
|
|
|
os.kill(os.getpid(), signal_module.SIGHUP)
|
|
return
|
|
else:
|
|
raise FileExistsError("stop")
|
|
|
|
mock_watch.side_effect = watch_side_effect
|
|
|
|
# Capture opts used on each _init_output_clients call
|
|
init_opts_captures = []
|
|
|
|
def init_side_effect(opts):
|
|
from argparse import Namespace as NS
|
|
|
|
init_opts_captures.append(NS(**vars(opts)))
|
|
return {}
|
|
|
|
mock_init_clients.side_effect = init_side_effect
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit):
|
|
parsedmarc.cli._main()
|
|
|
|
# First init: kafka_hosts should be set from v1 config
|
|
self.assertIsNotNone(init_opts_captures[0].kafka_hosts)
|
|
# Second init (after reload with v2 config): kafka_hosts should be None
|
|
self.assertIsNone(init_opts_captures[1].kafka_hosts)
|
|
|
|
@unittest.skipUnless(
|
|
hasattr(signal, "SIGHUP"),
|
|
"SIGHUP not available on this platform",
|
|
)
|
|
@patch("parsedmarc.cli._init_output_clients")
|
|
@patch("parsedmarc.cli._parse_config")
|
|
@patch("parsedmarc.cli._load_config")
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.watch_inbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testReloadRefreshesReverseDnsMap(
|
|
self,
|
|
mock_imap,
|
|
mock_watch,
|
|
mock_get_reports,
|
|
mock_load_config,
|
|
mock_parse_config,
|
|
mock_init_clients,
|
|
):
|
|
"""SIGHUP reload repopulates the reverse DNS map so lookups still work."""
|
|
import signal as signal_module
|
|
|
|
from parsedmarc import REVERSE_DNS_MAP
|
|
|
|
mock_imap.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [],
|
|
}
|
|
|
|
mock_load_config.return_value = ConfigParser()
|
|
|
|
def parse_side_effect(config, opts):
|
|
opts.imap_host = "imap.example.com"
|
|
opts.imap_user = "user"
|
|
opts.imap_password = "pass"
|
|
opts.mailbox_watch = True
|
|
return None
|
|
|
|
mock_parse_config.side_effect = parse_side_effect
|
|
mock_init_clients.return_value = {}
|
|
|
|
# Snapshot the map state after each watch_inbox call
|
|
map_snapshots = []
|
|
|
|
watch_calls = [0]
|
|
|
|
def watch_side_effect(*args, **kwargs):
|
|
watch_calls[0] += 1
|
|
if watch_calls[0] == 1:
|
|
if hasattr(signal_module, "SIGHUP"):
|
|
import os
|
|
|
|
os.kill(os.getpid(), signal_module.SIGHUP)
|
|
return
|
|
else:
|
|
# Capture the map state after reload, before we stop the loop
|
|
map_snapshots.append(dict(REVERSE_DNS_MAP))
|
|
raise FileExistsError("stop")
|
|
|
|
mock_watch.side_effect = watch_side_effect
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
cfg.write(self._BASE_CONFIG)
|
|
cfg_path = cfg.name
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
# Pre-populate the map so we can verify it gets refreshed
|
|
REVERSE_DNS_MAP.clear()
|
|
REVERSE_DNS_MAP["stale.example.com"] = {
|
|
"name": "Stale",
|
|
"type": "stale",
|
|
}
|
|
original_contents = dict(REVERSE_DNS_MAP)
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
with self.assertRaises(SystemExit):
|
|
parsedmarc.cli._main()
|
|
|
|
self.assertEqual(mock_watch.call_count, 2)
|
|
# The map should have been repopulated (not empty, not the stale data)
|
|
self.assertEqual(len(map_snapshots), 1)
|
|
refreshed = map_snapshots[0]
|
|
self.assertGreater(len(refreshed), 0, "Map should not be empty after reload")
|
|
self.assertNotEqual(
|
|
refreshed,
|
|
original_contents,
|
|
"Map should have been refreshed, not kept stale data",
|
|
)
|
|
self.assertNotIn(
|
|
"stale.example.com",
|
|
refreshed,
|
|
"Stale entry should have been cleared by reload",
|
|
)
|
|
|
|
|
|
class TestIndexPrefixDomainMapTlsFiltering(unittest.TestCase):
|
|
"""Tests that SMTP TLS reports for unmapped domains are filtered out
|
|
when index_prefix_domain_map is configured."""
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
@patch("parsedmarc.cli.IMAPConnection")
|
|
def testTlsReportsFilteredByDomainMap(
|
|
self,
|
|
mock_imap_connection,
|
|
mock_get_reports,
|
|
):
|
|
"""TLS reports for domains not in the map should be silently dropped."""
|
|
mock_imap_connection.return_value = object()
|
|
mock_get_reports.return_value = {
|
|
"aggregate_reports": [],
|
|
"forensic_reports": [],
|
|
"smtp_tls_reports": [
|
|
{
|
|
"organization_name": "Allowed Org",
|
|
"begin_date": "2024-01-01T00:00:00Z",
|
|
"end_date": "2024-01-01T23:59:59Z",
|
|
"report_id": "allowed-1",
|
|
"contact_info": "tls@allowed.example.com",
|
|
"policies": [
|
|
{
|
|
"policy_domain": "allowed.example.com",
|
|
"policy_type": "sts",
|
|
"successful_session_count": 1,
|
|
"failed_session_count": 0,
|
|
}
|
|
],
|
|
},
|
|
{
|
|
"organization_name": "Unmapped Org",
|
|
"begin_date": "2024-01-01T00:00:00Z",
|
|
"end_date": "2024-01-01T23:59:59Z",
|
|
"report_id": "unmapped-1",
|
|
"contact_info": "tls@unmapped.example.net",
|
|
"policies": [
|
|
{
|
|
"policy_domain": "unmapped.example.net",
|
|
"policy_type": "sts",
|
|
"successful_session_count": 5,
|
|
"failed_session_count": 0,
|
|
}
|
|
],
|
|
},
|
|
{
|
|
"organization_name": "Mixed Case Org",
|
|
"begin_date": "2024-01-01T00:00:00Z",
|
|
"end_date": "2024-01-01T23:59:59Z",
|
|
"report_id": "mixed-case-1",
|
|
"contact_info": "tls@mixedcase.example.com",
|
|
"policies": [
|
|
{
|
|
"policy_domain": "MixedCase.Example.Com",
|
|
"policy_type": "sts",
|
|
"successful_session_count": 2,
|
|
"failed_session_count": 0,
|
|
}
|
|
],
|
|
},
|
|
],
|
|
}
|
|
|
|
domain_map = {"tenant_a": ["example.com"]}
|
|
with NamedTemporaryFile("w", suffix=".yaml", delete=False) as map_file:
|
|
import yaml
|
|
|
|
yaml.dump(domain_map, map_file)
|
|
map_path = map_file.name
|
|
self.addCleanup(lambda: os.path.exists(map_path) and os.remove(map_path))
|
|
|
|
config = f"""[general]
|
|
save_smtp_tls = true
|
|
silent = false
|
|
index_prefix_domain_map = {map_path}
|
|
|
|
[imap]
|
|
host = imap.example.com
|
|
user = test-user
|
|
password = test-password
|
|
"""
|
|
with NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
|
|
config_file.write(config)
|
|
config_path = config_file.name
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
captured = io.StringIO()
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
|
with patch("sys.stdout", captured):
|
|
parsedmarc.cli._main()
|
|
|
|
output = json.loads(captured.getvalue())
|
|
tls_reports = output["smtp_tls_reports"]
|
|
self.assertEqual(len(tls_reports), 2)
|
|
report_ids = {r["report_id"] for r in tls_reports}
|
|
self.assertIn("allowed-1", report_ids)
|
|
self.assertIn("mixed-case-1", report_ids)
|
|
self.assertNotIn("unmapped-1", report_ids)
|
|
|
|
|
|
class TestConfigAliases(unittest.TestCase):
|
|
"""Tests for config key aliases (env var friendly short names)."""
|
|
|
|
def test_maildir_create_alias(self):
|
|
"""[maildir] create works as alias for maildir_create."""
|
|
from argparse import Namespace
|
|
from parsedmarc.cli import _load_config, _parse_config
|
|
|
|
env = {
|
|
"PARSEDMARC_MAILDIR_CREATE": "true",
|
|
"PARSEDMARC_MAILDIR_PATH": "/tmp/test",
|
|
}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
config = _load_config(None)
|
|
opts = Namespace()
|
|
_parse_config(config, opts)
|
|
self.assertTrue(opts.maildir_create)
|
|
|
|
def test_maildir_path_alias(self):
|
|
"""[maildir] path works as alias for maildir_path."""
|
|
from argparse import Namespace
|
|
from parsedmarc.cli import _load_config, _parse_config
|
|
|
|
env = {"PARSEDMARC_MAILDIR_PATH": "/var/mail/dmarc"}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
config = _load_config(None)
|
|
opts = Namespace()
|
|
_parse_config(config, opts)
|
|
self.assertEqual(opts.maildir_path, "/var/mail/dmarc")
|
|
|
|
def test_msgraph_url_alias(self):
|
|
"""[msgraph] url works as alias for graph_url."""
|
|
from parsedmarc.cli import _load_config, _parse_config
|
|
from argparse import Namespace
|
|
|
|
env = {
|
|
"PARSEDMARC_MSGRAPH_AUTH_METHOD": "ClientSecret",
|
|
"PARSEDMARC_MSGRAPH_CLIENT_ID": "test-id",
|
|
"PARSEDMARC_MSGRAPH_CLIENT_SECRET": "test-secret",
|
|
"PARSEDMARC_MSGRAPH_TENANT_ID": "test-tenant",
|
|
"PARSEDMARC_MSGRAPH_MAILBOX": "test@example.com",
|
|
"PARSEDMARC_MSGRAPH_URL": "https://custom.graph.example.com",
|
|
}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
config = _load_config(None)
|
|
opts = Namespace()
|
|
_parse_config(config, opts)
|
|
self.assertEqual(opts.graph_url, "https://custom.graph.example.com")
|
|
|
|
def test_original_keys_still_work(self):
|
|
"""Original INI key names (maildir_create, maildir_path) still work."""
|
|
from argparse import Namespace
|
|
from parsedmarc.cli import _parse_config
|
|
|
|
config = ConfigParser(interpolation=None)
|
|
config.add_section("maildir")
|
|
config.set("maildir", "maildir_path", "/original/path")
|
|
config.set("maildir", "maildir_create", "true")
|
|
|
|
opts = Namespace()
|
|
_parse_config(config, opts)
|
|
self.assertEqual(opts.maildir_path, "/original/path")
|
|
self.assertTrue(opts.maildir_create)
|
|
|
|
def test_ipinfo_url_option(self):
|
|
"""[general] ipinfo_url lands on opts.ipinfo_url."""
|
|
from argparse import Namespace
|
|
from parsedmarc.cli import _parse_config
|
|
|
|
config = ConfigParser(interpolation=None)
|
|
config.add_section("general")
|
|
config.set("general", "ipinfo_url", "https://mirror.example/mmdb")
|
|
|
|
opts = Namespace()
|
|
_parse_config(config, opts)
|
|
self.assertEqual(opts.ipinfo_url, "https://mirror.example/mmdb")
|
|
|
|
def test_ip_db_url_deprecated_alias(self):
|
|
"""[general] ip_db_url is accepted as an alias for ipinfo_url but
|
|
emits a deprecation warning."""
|
|
from argparse import Namespace
|
|
from parsedmarc.cli import _parse_config
|
|
|
|
config = ConfigParser(interpolation=None)
|
|
config.add_section("general")
|
|
config.set("general", "ip_db_url", "https://old.example/mmdb")
|
|
|
|
opts = Namespace()
|
|
with self.assertLogs("parsedmarc.log", level="WARNING") as cm:
|
|
_parse_config(config, opts)
|
|
self.assertEqual(opts.ipinfo_url, "https://old.example/mmdb")
|
|
self.assertTrue(
|
|
any("ip_db_url" in line and "deprecated" in line for line in cm.output),
|
|
f"expected deprecation warning, got: {cm.output}",
|
|
)
|
|
|
|
|
|
class TestExpandPath(unittest.TestCase):
|
|
"""Tests for _expand_path config path expansion."""
|
|
|
|
def test_expand_tilde(self):
|
|
from parsedmarc.cli import _expand_path
|
|
|
|
result = _expand_path("~/some/path")
|
|
self.assertFalse(result.startswith("~"))
|
|
self.assertTrue(result.endswith("/some/path"))
|
|
|
|
def test_expand_env_var(self):
|
|
from parsedmarc.cli import _expand_path
|
|
|
|
with patch.dict(os.environ, {"PARSEDMARC_TEST_DIR": "/opt/data"}):
|
|
result = _expand_path("$PARSEDMARC_TEST_DIR/tokens/.token")
|
|
self.assertEqual(result, "/opt/data/tokens/.token")
|
|
|
|
def test_expand_both(self):
|
|
from parsedmarc.cli import _expand_path
|
|
|
|
with patch.dict(os.environ, {"MY_APP": "parsedmarc"}):
|
|
result = _expand_path("~/$MY_APP/config")
|
|
self.assertNotIn("~", result)
|
|
self.assertIn("parsedmarc/config", result)
|
|
|
|
def test_no_expansion_needed(self):
|
|
from parsedmarc.cli import _expand_path
|
|
|
|
self.assertEqual(_expand_path("/absolute/path"), "/absolute/path")
|
|
self.assertEqual(_expand_path("relative/path"), "relative/path")
|
|
|
|
|
|
class TestEnvVarConfig(unittest.TestCase):
|
|
"""Tests for environment variable configuration support."""
|
|
|
|
def test_resolve_section_key_simple(self):
|
|
"""Simple section names resolve correctly."""
|
|
from parsedmarc.cli import _resolve_section_key
|
|
|
|
self.assertEqual(_resolve_section_key("IMAP_PASSWORD"), ("imap", "password"))
|
|
self.assertEqual(_resolve_section_key("GENERAL_DEBUG"), ("general", "debug"))
|
|
self.assertEqual(_resolve_section_key("S3_BUCKET"), ("s3", "bucket"))
|
|
self.assertEqual(_resolve_section_key("GELF_HOST"), ("gelf", "host"))
|
|
|
|
def test_resolve_section_key_underscore_sections(self):
|
|
"""Multi-word section names (splunk_hec, gmail_api, etc.) resolve correctly."""
|
|
from parsedmarc.cli import _resolve_section_key
|
|
|
|
self.assertEqual(
|
|
_resolve_section_key("SPLUNK_HEC_TOKEN"), ("splunk_hec", "token")
|
|
)
|
|
self.assertEqual(
|
|
_resolve_section_key("GMAIL_API_CREDENTIALS_FILE"),
|
|
("gmail_api", "credentials_file"),
|
|
)
|
|
self.assertEqual(
|
|
_resolve_section_key("LOG_ANALYTICS_CLIENT_ID"),
|
|
("log_analytics", "client_id"),
|
|
)
|
|
|
|
def test_resolve_section_key_unknown(self):
|
|
"""Unknown prefixes return (None, None)."""
|
|
from parsedmarc.cli import _resolve_section_key
|
|
|
|
self.assertEqual(_resolve_section_key("UNKNOWN_FOO"), (None, None))
|
|
# Just a section name with no key should not match
|
|
self.assertEqual(_resolve_section_key("IMAP"), (None, None))
|
|
|
|
def test_apply_env_overrides_injects_values(self):
|
|
"""Env vars are injected into an existing ConfigParser."""
|
|
from configparser import ConfigParser
|
|
from parsedmarc.cli import _apply_env_overrides
|
|
|
|
config = ConfigParser()
|
|
config.add_section("imap")
|
|
config.set("imap", "host", "original.example.com")
|
|
|
|
env = {
|
|
"PARSEDMARC_IMAP_HOST": "new.example.com",
|
|
"PARSEDMARC_IMAP_PASSWORD": "secret123",
|
|
}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
_apply_env_overrides(config)
|
|
|
|
self.assertEqual(config.get("imap", "host"), "new.example.com")
|
|
self.assertEqual(config.get("imap", "password"), "secret123")
|
|
|
|
def test_apply_env_overrides_creates_sections(self):
|
|
"""Env vars create new sections when they don't exist."""
|
|
from configparser import ConfigParser
|
|
from parsedmarc.cli import _apply_env_overrides
|
|
|
|
config = ConfigParser()
|
|
|
|
env = {"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200"}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
_apply_env_overrides(config)
|
|
|
|
self.assertTrue(config.has_section("elasticsearch"))
|
|
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
|
|
|
|
def test_apply_env_overrides_ignores_config_file_var(self):
|
|
"""PARSEDMARC_CONFIG_FILE is not injected as a config key."""
|
|
from configparser import ConfigParser
|
|
from parsedmarc.cli import _apply_env_overrides
|
|
|
|
config = ConfigParser()
|
|
|
|
env = {"PARSEDMARC_CONFIG_FILE": "/some/path.ini"}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
_apply_env_overrides(config)
|
|
|
|
self.assertEqual(config.sections(), [])
|
|
|
|
def test_load_config_with_file_and_env_override(self):
|
|
"""Env vars override values from an INI file."""
|
|
from parsedmarc.cli import _load_config
|
|
|
|
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
|
|
f.write(
|
|
"[imap]\nhost = file.example.com\nuser = alice\npassword = fromfile\n"
|
|
)
|
|
f.flush()
|
|
config_path = f.name
|
|
|
|
try:
|
|
env = {"PARSEDMARC_IMAP_PASSWORD": "fromenv"}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
config = _load_config(config_path)
|
|
|
|
self.assertEqual(config.get("imap", "host"), "file.example.com")
|
|
self.assertEqual(config.get("imap", "user"), "alice")
|
|
self.assertEqual(config.get("imap", "password"), "fromenv")
|
|
finally:
|
|
os.unlink(config_path)
|
|
|
|
def test_load_config_env_only(self):
|
|
"""Config can be loaded purely from env vars with no file."""
|
|
from parsedmarc.cli import _load_config
|
|
|
|
env = {
|
|
"PARSEDMARC_GENERAL_DEBUG": "true",
|
|
"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200",
|
|
}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
config = _load_config(None)
|
|
|
|
self.assertEqual(config.get("general", "debug"), "true")
|
|
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
|
|
|
|
def test_parse_config_from_env(self):
|
|
"""Full round-trip: env vars -> ConfigParser -> opts."""
|
|
from argparse import Namespace
|
|
from parsedmarc.cli import _load_config, _parse_config
|
|
|
|
env = {
|
|
"PARSEDMARC_GENERAL_DEBUG": "true",
|
|
"PARSEDMARC_GENERAL_SAVE_AGGREGATE": "true",
|
|
"PARSEDMARC_GENERAL_OFFLINE": "true",
|
|
}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
config = _load_config(None)
|
|
|
|
opts = Namespace()
|
|
_parse_config(config, opts)
|
|
|
|
self.assertTrue(opts.debug)
|
|
self.assertTrue(opts.save_aggregate)
|
|
self.assertTrue(opts.offline)
|
|
|
|
def test_config_file_env_var(self):
|
|
"""PARSEDMARC_CONFIG_FILE env var specifies the config file path."""
|
|
from argparse import Namespace
|
|
from parsedmarc.cli import _load_config, _parse_config
|
|
|
|
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
|
|
f.write("[general]\ndebug = true\noffline = true\n")
|
|
f.flush()
|
|
config_path = f.name
|
|
|
|
try:
|
|
env = {"PARSEDMARC_CONFIG_FILE": config_path}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
config = _load_config(os.environ.get("PARSEDMARC_CONFIG_FILE"))
|
|
|
|
opts = Namespace()
|
|
_parse_config(config, opts)
|
|
self.assertTrue(opts.debug)
|
|
self.assertTrue(opts.offline)
|
|
finally:
|
|
os.unlink(config_path)
|
|
|
|
def test_boolean_values_from_env(self):
|
|
"""Various boolean string representations work through ConfigParser."""
|
|
from configparser import ConfigParser
|
|
from parsedmarc.cli import _apply_env_overrides
|
|
|
|
for true_val in ("true", "yes", "1", "on", "True", "YES"):
|
|
config = ConfigParser()
|
|
env = {"PARSEDMARC_GENERAL_DEBUG": true_val}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
_apply_env_overrides(config)
|
|
self.assertTrue(
|
|
config.getboolean("general", "debug"),
|
|
f"Expected truthy for {true_val!r}",
|
|
)
|
|
|
|
for false_val in ("false", "no", "0", "off", "False", "NO"):
|
|
config = ConfigParser()
|
|
env = {"PARSEDMARC_GENERAL_DEBUG": false_val}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
_apply_env_overrides(config)
|
|
self.assertFalse(
|
|
config.getboolean("general", "debug"),
|
|
f"Expected falsy for {false_val!r}",
|
|
)
|
|
|
|
|
|
class TestLoadPSLOverrides(unittest.TestCase):
|
|
"""Covers `parsedmarc.utils.load_psl_overrides`."""
|
|
|
|
def setUp(self):
|
|
# Snapshot the module-level list so each test leaves it as it found it.
|
|
self._saved = list(parsedmarc.utils.psl_overrides)
|
|
|
|
def tearDown(self):
|
|
parsedmarc.utils.psl_overrides.clear()
|
|
parsedmarc.utils.psl_overrides.extend(self._saved)
|
|
|
|
def test_offline_loads_bundled_file(self):
|
|
"""offline=True populates the list from the bundled file, no network."""
|
|
result = parsedmarc.utils.load_psl_overrides(offline=True)
|
|
self.assertIs(result, parsedmarc.utils.psl_overrides)
|
|
self.assertGreater(len(result), 0)
|
|
# The bundled file is expected to contain at least one well-known entry.
|
|
self.assertIn(".linode.com", result)
|
|
|
|
def test_local_file_path_overrides_bundled(self):
|
|
"""A custom local_file_path takes precedence over the bundled copy."""
|
|
with tempfile.NamedTemporaryFile(
|
|
"w", suffix=".txt", delete=False, encoding="utf-8"
|
|
) as tf:
|
|
tf.write("-custom-brand.com\n.another-brand.net\n\n \n")
|
|
path = tf.name
|
|
try:
|
|
result = parsedmarc.utils.load_psl_overrides(
|
|
offline=True, local_file_path=path
|
|
)
|
|
self.assertEqual(result, ["-custom-brand.com", ".another-brand.net"])
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_clear_before_reload(self):
|
|
"""Re-running load_psl_overrides replaces the list, not appends."""
|
|
parsedmarc.utils.psl_overrides.clear()
|
|
parsedmarc.utils.psl_overrides.append(".stale-entry.com")
|
|
parsedmarc.utils.load_psl_overrides(offline=True)
|
|
self.assertNotIn(".stale-entry.com", parsedmarc.utils.psl_overrides)
|
|
|
|
def test_url_success(self):
|
|
"""A 200 response from the URL populates the list."""
|
|
fake_body = "-fetched-brand.com\n.cdn-fetched.net\n"
|
|
mock_response = MagicMock()
|
|
mock_response.text = fake_body
|
|
mock_response.raise_for_status = MagicMock()
|
|
with patch(
|
|
"parsedmarc.utils.requests.get", return_value=mock_response
|
|
) as mock_get:
|
|
result = parsedmarc.utils.load_psl_overrides(url="https://example.test/ov")
|
|
self.assertEqual(result, ["-fetched-brand.com", ".cdn-fetched.net"])
|
|
mock_get.assert_called_once()
|
|
|
|
def test_url_failure_falls_back_to_local(self):
|
|
"""A network error falls back to the bundled copy."""
|
|
import requests
|
|
|
|
with patch(
|
|
"parsedmarc.utils.requests.get",
|
|
side_effect=requests.exceptions.ConnectionError("nope"),
|
|
):
|
|
result = parsedmarc.utils.load_psl_overrides(url="https://example.test/ov")
|
|
# Bundled file still loaded.
|
|
self.assertGreater(len(result), 0)
|
|
self.assertIn(".linode.com", result)
|
|
|
|
def test_always_use_local_skips_network(self):
|
|
"""always_use_local_file=True must not call requests.get."""
|
|
with patch("parsedmarc.utils.requests.get") as mock_get:
|
|
parsedmarc.utils.load_psl_overrides(always_use_local_file=True)
|
|
mock_get.assert_not_called()
|
|
|
|
|
|
class TestLoadReverseDnsMapReloadsPSLOverrides(unittest.TestCase):
|
|
"""`load_reverse_dns_map` must reload `psl_overrides.txt` in the same call
|
|
so map entries that depend on folded bases resolve correctly."""
|
|
|
|
def setUp(self):
|
|
self._saved = list(parsedmarc.utils.psl_overrides)
|
|
|
|
def tearDown(self):
|
|
parsedmarc.utils.psl_overrides.clear()
|
|
parsedmarc.utils.psl_overrides.extend(self._saved)
|
|
|
|
def test_map_load_triggers_psl_reload(self):
|
|
"""Calling load_reverse_dns_map offline also invokes load_psl_overrides
|
|
with matching flags, and the overrides list is repopulated."""
|
|
rdm = {}
|
|
parsedmarc.utils.psl_overrides.clear()
|
|
parsedmarc.utils.psl_overrides.append(".stale-from-before.com")
|
|
with patch(
|
|
"parsedmarc.utils.load_psl_overrides",
|
|
wraps=parsedmarc.utils.load_psl_overrides,
|
|
) as spy:
|
|
parsedmarc.utils.load_reverse_dns_map(rdm, offline=True)
|
|
spy.assert_called_once()
|
|
kwargs = spy.call_args.kwargs
|
|
self.assertTrue(kwargs["offline"])
|
|
self.assertIsNone(kwargs["url"])
|
|
self.assertIsNone(kwargs["local_file_path"])
|
|
self.assertNotIn(".stale-from-before.com", parsedmarc.utils.psl_overrides)
|
|
|
|
def test_map_load_forwards_psl_overrides_kwargs(self):
|
|
"""psl_overrides_path / psl_overrides_url are forwarded verbatim."""
|
|
rdm = {}
|
|
with patch("parsedmarc.utils.load_psl_overrides") as spy:
|
|
parsedmarc.utils.load_reverse_dns_map(
|
|
rdm,
|
|
offline=True,
|
|
always_use_local_file=True,
|
|
psl_overrides_path="/tmp/custom.txt",
|
|
psl_overrides_url="https://example.test/ov",
|
|
)
|
|
spy.assert_called_once_with(
|
|
always_use_local_file=True,
|
|
local_file_path="/tmp/custom.txt",
|
|
url="https://example.test/ov",
|
|
offline=True,
|
|
)
|
|
|
|
|
|
class TestGetBaseDomainWithOverrides(unittest.TestCase):
|
|
"""`get_base_domain` must honour the current psl_overrides list."""
|
|
|
|
def setUp(self):
|
|
self._saved = list(parsedmarc.utils.psl_overrides)
|
|
parsedmarc.utils.psl_overrides.clear()
|
|
parsedmarc.utils.psl_overrides.extend([".cprapid.com", "-nobre.com.br"])
|
|
|
|
def tearDown(self):
|
|
parsedmarc.utils.psl_overrides.clear()
|
|
parsedmarc.utils.psl_overrides.extend(self._saved)
|
|
|
|
def test_dot_prefixed_override_folds_subdomain(self):
|
|
result = parsedmarc.utils.get_base_domain("74-208-244-234.cprapid.com")
|
|
self.assertEqual(result, "cprapid.com")
|
|
|
|
def test_dash_prefixed_override_folds_subdomain(self):
|
|
result = parsedmarc.utils.get_base_domain("host-1-2-3-4-nobre.com.br")
|
|
self.assertEqual(result, "nobre.com.br")
|
|
|
|
def test_unmatched_domain_falls_through_to_psl(self):
|
|
result = parsedmarc.utils.get_base_domain("sub.example.com")
|
|
self.assertEqual(result, "example.com")
|
|
|
|
|
|
class TestMapScriptsIPDetection(unittest.TestCase):
|
|
"""Full-IP detection and PSL folding in the map-maintenance scripts."""
|
|
|
|
def test_collect_domain_info_detects_full_ips(self):
|
|
import parsedmarc.resources.maps.collect_domain_info as cdi
|
|
|
|
# Dotted and dashed four-octet patterns with valid octets: detected.
|
|
self.assertTrue(cdi._has_full_ip("74-208-244-234.cprapid.com"))
|
|
self.assertTrue(cdi._has_full_ip("host.192.168.1.1.example.com"))
|
|
self.assertTrue(cdi._has_full_ip("a-10-20-30-40-brand.com"))
|
|
# Three octets is NOT a full IP — OVH's reverse-DNS pattern stays safe.
|
|
self.assertFalse(cdi._has_full_ip("ip-147-135-108.us"))
|
|
# Out-of-range octet fails the 0-255 sanity check.
|
|
self.assertFalse(cdi._has_full_ip("999-1-2-3-foo.com"))
|
|
# Pure domain, no IP.
|
|
self.assertFalse(cdi._has_full_ip("example.com"))
|
|
|
|
def test_find_unknown_detects_full_ips(self):
|
|
import parsedmarc.resources.maps.find_unknown_base_reverse_dns as fu
|
|
|
|
self.assertTrue(fu._has_full_ip("170-254-144-204-nobreinternet.com.br"))
|
|
self.assertFalse(fu._has_full_ip("ip-147-135-108.us"))
|
|
self.assertFalse(fu._has_full_ip("cprapid.com"))
|
|
|
|
def test_apply_psl_override_dot_prefix(self):
|
|
import parsedmarc.resources.maps.collect_domain_info as cdi
|
|
|
|
ov = [".cprapid.com", ".linode.com"]
|
|
self.assertEqual(cdi._apply_psl_override("foo.cprapid.com", ov), "cprapid.com")
|
|
self.assertEqual(cdi._apply_psl_override("a.b.linode.com", ov), "linode.com")
|
|
|
|
def test_apply_psl_override_dash_prefix(self):
|
|
import parsedmarc.resources.maps.collect_domain_info as cdi
|
|
|
|
ov = ["-nobre.com.br"]
|
|
self.assertEqual(
|
|
cdi._apply_psl_override("1-2-3-4-nobre.com.br", ov), "nobre.com.br"
|
|
)
|
|
|
|
def test_apply_psl_override_no_match(self):
|
|
import parsedmarc.resources.maps.collect_domain_info as cdi
|
|
|
|
ov = [".cprapid.com"]
|
|
self.assertEqual(cdi._apply_psl_override("example.com", ov), "example.com")
|
|
|
|
|
|
class TestDetectPSLOverrides(unittest.TestCase):
|
|
"""Cluster detection, brand-tail extraction, and full-pipeline behaviour
|
|
for `detect_psl_overrides.py`."""
|
|
|
|
def setUp(self):
|
|
import parsedmarc.resources.maps.detect_psl_overrides as dpo
|
|
|
|
self.dpo = dpo
|
|
|
|
def test_extract_brand_tail_dot_separator(self):
|
|
self.assertEqual(
|
|
self.dpo.extract_brand_tail("74-208-244-234.cprapid.com"),
|
|
".cprapid.com",
|
|
)
|
|
|
|
def test_extract_brand_tail_dash_separator(self):
|
|
self.assertEqual(
|
|
self.dpo.extract_brand_tail("170-254-144-204-nobre.com.br"),
|
|
"-nobre.com.br",
|
|
)
|
|
|
|
def test_extract_brand_tail_no_separator(self):
|
|
self.assertEqual(
|
|
self.dpo.extract_brand_tail("host134-254-143-190tigobusiness.com.ni"),
|
|
"tigobusiness.com.ni",
|
|
)
|
|
|
|
def test_extract_brand_tail_no_ip_returns_none(self):
|
|
self.assertIsNone(self.dpo.extract_brand_tail("plain.example.com"))
|
|
|
|
def test_extract_brand_tail_rejects_short_tail(self):
|
|
"""A tail shorter than MIN_TAIL_LEN is rejected to avoid folding to `.com`."""
|
|
# Four-octet IP followed by only `.br` (2 chars after the dot) — too short.
|
|
self.assertIsNone(self.dpo.extract_brand_tail("1-2-3-4.br"))
|
|
|
|
def test_detect_clusters_meets_threshold(self):
|
|
domains = [
|
|
"1-2-3-4.cprapid.com",
|
|
"5-6-7-8.cprapid.com",
|
|
"9-10-11-12.cprapid.com",
|
|
"1-2-3-4-other.com.br", # not enough of these
|
|
]
|
|
clusters = self.dpo.detect_clusters(domains, threshold=3, known_overrides=set())
|
|
self.assertIn(".cprapid.com", clusters)
|
|
self.assertEqual(len(clusters[".cprapid.com"]), 3)
|
|
self.assertNotIn("-other.com.br", clusters)
|
|
|
|
def test_detect_clusters_honours_threshold(self):
|
|
domains = [
|
|
"1-2-3-4.cprapid.com",
|
|
"5-6-7-8.cprapid.com",
|
|
]
|
|
clusters = self.dpo.detect_clusters(domains, threshold=3, known_overrides=set())
|
|
self.assertEqual(clusters, {})
|
|
|
|
def test_detect_clusters_skips_known_overrides(self):
|
|
"""Tails already in psl_overrides.txt must not be re-proposed."""
|
|
domains = [
|
|
"1-2-3-4.cprapid.com",
|
|
"5-6-7-8.cprapid.com",
|
|
"9-10-11-12.cprapid.com",
|
|
]
|
|
clusters = self.dpo.detect_clusters(
|
|
domains, threshold=3, known_overrides={".cprapid.com"}
|
|
)
|
|
self.assertNotIn(".cprapid.com", clusters)
|
|
|
|
def test_apply_override_matches_first(self):
|
|
"""apply_override iterates in list order and returns on the first match."""
|
|
ov = [".cprapid.com", "-nobre.com.br"]
|
|
self.assertEqual(
|
|
self.dpo.apply_override("1-2-3-4.cprapid.com", ov), "cprapid.com"
|
|
)
|
|
self.assertEqual(
|
|
self.dpo.apply_override("1-2-3-4-nobre.com.br", ov), "nobre.com.br"
|
|
)
|
|
self.assertEqual(self.dpo.apply_override("unrelated.com", ov), "unrelated.com")
|
|
|
|
def test_has_full_ip_shared_with_other_scripts(self):
|
|
"""The detect script's IP check must agree with the other map scripts."""
|
|
self.assertTrue(self.dpo.has_full_ip("74-208-244-234.cprapid.com"))
|
|
self.assertFalse(self.dpo.has_full_ip("ip-147-135-108.us"))
|
|
self.assertFalse(self.dpo.has_full_ip("example.com"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(verbosity=2)
|