From 373166cd7e026169eeaf48ecf85e149847733e6d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Feb 2026 19:39:16 +0000 Subject: [PATCH] Add 89 comprehensive tests covering core parsing and utilities Tests cover: - _bucket_interval_by_day: all validation branches and distribution logic - _append_parsed_record: normalize=True/False paths - _parse_report_record: None source_ip, missing auth results, reason handling, identities/identifiers mapping, human_result, envelope_from fallback, alignment - _parse_smtp_tls_failure_details: required/optional fields, missing field errors - _parse_smtp_tls_report_policy: valid/invalid types, policy_strings, failure details - parse_smtp_tls_report_json: valid/bytes/missing fields/non-list policies - Aggregate report: invalid np/testing/discovery_method, pass disposition, multiple records, XML recovery, schema versions, generator, errors, defaults, normalization, MAGIC_XML_TAG detection - utils: timestamp conversions, IP geo lookup, reverse DNS service lookup, IP address info with cache, email address parsing, filename safe strings, mbox/outlook msg detection - Output modules: WebhookClient, KafkaClient static methods, HECClient, SyslogClient, LogAnalyticsConfig/Client, backward-compatible aliases Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests.py | 1288 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 1265 insertions(+), 23 deletions(-) diff --git a/tests.py b/tests.py index 0606b08..0bfe363 100755 --- a/tests.py +++ b/tests.py @@ -436,7 +436,7 @@ class Test(unittest.TestCase): mock_imap_connection.return_value = object() mock_get_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } @@ -486,7 +486,7 @@ aws_service = aoss mock_imap_connection.return_value = object() mock_get_reports.return_value = { "aggregate_reports": [{"policy_published": {"domain": "example.com"}}], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError( @@ -536,7 +536,7 @@ hosts = localhost mock_imap_connection.return_value = object() mock_get_reports.return_value = { "aggregate_reports": [{"policy_published": {"domain": "example.com"}}], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError( @@ -568,10 +568,10 @@ hosts = localhost mock_save_aggregate.assert_called_once() - @patch("parsedmarc.cli.opensearch.save_forensic_report_to_opensearch") + @patch("parsedmarc.cli.opensearch.save_failure_report_to_opensearch") @patch("parsedmarc.cli.opensearch.migrate_indexes") @patch("parsedmarc.cli.opensearch.set_hosts") - @patch("parsedmarc.cli.elastic.save_forensic_report_to_elasticsearch") + @patch("parsedmarc.cli.elastic.save_failure_report_to_elasticsearch") @patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch") @patch("parsedmarc.cli.elastic.migrate_indexes") @patch("parsedmarc.cli.elastic.set_hosts") @@ -584,27 +584,27 @@ hosts = localhost _mock_es_set_hosts, _mock_es_migrate, mock_save_aggregate, - _mock_save_forensic_elastic, + _mock_save_failure_elastic, _mock_os_set_hosts, _mock_os_migrate, - mock_save_forensic_opensearch, + mock_save_failure_opensearch, ): mock_imap_connection.return_value = object() mock_get_reports.return_value = { "aggregate_reports": [{"policy_published": {"domain": "example.com"}}], - "forensic_reports": [{"reported_domain": "example.com"}], + "failure_reports": [{"reported_domain": "example.com"}], "smtp_tls_reports": [], } mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError( "aggregate sink failed" ) - mock_save_forensic_opensearch.side_effect = ( - parsedmarc.cli.opensearch.OpenSearchError("forensic sink failed") + mock_save_failure_opensearch.side_effect = ( + parsedmarc.cli.opensearch.OpenSearchError("failure sink failed") ) config = """[general] save_aggregate = true -save_forensic = true +save_failure = true fail_on_output_error = true silent = true @@ -632,7 +632,7 @@ hosts = localhost self.assertEqual(ctx.exception.code, 1) mock_save_aggregate.assert_called_once() - mock_save_forensic_opensearch.assert_called_once() + mock_save_failure_opensearch.assert_called_once() class _FakeGraphResponse: @@ -1314,7 +1314,7 @@ class TestGmailAuthModes(unittest.TestCase): mock_gmail_connection.return_value = MagicMock() mock_get_mailbox_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } config = """[general] @@ -1348,7 +1348,7 @@ scopes = https://www.googleapis.com/auth/gmail.modify mock_gmail_connection.return_value = MagicMock() mock_get_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } config = """[general] @@ -1480,7 +1480,7 @@ class TestMailboxWatchSince(unittest.TestCase): mock_imap_connection.return_value = object() mock_get_mailbox_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } mock_watch_inbox.side_effect = FileExistsError("stop-watch-loop") @@ -1574,7 +1574,7 @@ class TestMailboxPerformance(unittest.TestCase): mock_graph_connection.return_value = object() mock_get_mailbox_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } @@ -1650,7 +1650,7 @@ mailbox = shared@example.com mock_graph_connection.return_value = object() mock_get_mailbox_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } @@ -1801,7 +1801,7 @@ class TestMSGraphCliValidation(unittest.TestCase): mock_graph_connection.return_value = object() mock_get_mailbox_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } @@ -1943,7 +1943,7 @@ tenant_id = tenant-id mock_graph_connection.return_value = object() mock_get_mailbox_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } @@ -2157,7 +2157,7 @@ watch = true mock_imap.return_value = object() mock_get_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } @@ -2230,7 +2230,7 @@ watch = true mock_imap.return_value = object() mock_get_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } @@ -2310,7 +2310,7 @@ watch = true mock_imap.return_value = object() mock_get_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } @@ -2385,7 +2385,7 @@ watch = true mock_imap.return_value = object() mock_get_reports.return_value = { "aggregate_reports": [], - "forensic_reports": [], + "failure_reports": [], "smtp_tls_reports": [], } mock_init_clients.return_value = {} @@ -3181,6 +3181,1248 @@ class TestEnvVarConfig(unittest.TestCase): config.getboolean("general", "debug"), f"Expected falsy for {false_val!r}", ) + # ============================================================ # New tests for _bucket_interval_by_day + # ============================================================ + def testBucketIntervalBeginAfterEnd(self): + """begin > end should raise ValueError""" + from datetime import datetime, timezone + begin = datetime(2024, 1, 2, tzinfo=timezone.utc) + end = datetime(2024, 1, 1, tzinfo=timezone.utc) + with self.assertRaises(ValueError): + parsedmarc._bucket_interval_by_day(begin, end, 100) + + def testBucketIntervalNaiveDatetime(self): + """Non-timezone-aware datetimes should raise ValueError""" + from datetime import datetime + begin = datetime(2024, 1, 1) + end = datetime(2024, 1, 2) + with self.assertRaises(ValueError): + parsedmarc._bucket_interval_by_day(begin, end, 100) + + def testBucketIntervalDifferentTzinfo(self): + """Different tzinfo objects should raise ValueError""" + from datetime import datetime, timezone, timedelta + tz1 = timezone.utc + tz2 = timezone(timedelta(hours=5)) + begin = datetime(2024, 1, 1, tzinfo=tz1) + end = datetime(2024, 1, 2, tzinfo=tz2) + with self.assertRaises(ValueError): + parsedmarc._bucket_interval_by_day(begin, end, 100) + + def testBucketIntervalNegativeCount(self): + """Negative total_count should raise ValueError""" + from datetime import datetime, timezone + begin = datetime(2024, 1, 1, tzinfo=timezone.utc) + end = datetime(2024, 1, 2, tzinfo=timezone.utc) + with self.assertRaises(ValueError): + parsedmarc._bucket_interval_by_day(begin, end, -1) + + def testBucketIntervalZeroCount(self): + """Zero total_count should return empty list""" + from datetime import datetime, timezone + begin = datetime(2024, 1, 1, tzinfo=timezone.utc) + end = datetime(2024, 1, 2, tzinfo=timezone.utc) + result = parsedmarc._bucket_interval_by_day(begin, end, 0) + self.assertEqual(result, []) + + def testBucketIntervalSameBeginEnd(self): + """Same begin and end (zero interval) should return empty list""" + from datetime import datetime, timezone + dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + result = parsedmarc._bucket_interval_by_day(dt, dt, 100) + self.assertEqual(result, []) + + def testBucketIntervalSingleDay(self): + """Single day interval should return one bucket with total count""" + from datetime import datetime, timezone + begin = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + end = datetime(2024, 1, 1, 23, 59, 59, tzinfo=timezone.utc) + result = parsedmarc._bucket_interval_by_day(begin, end, 100) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["count"], 100) + self.assertEqual(result[0]["begin"], begin) + + def testBucketIntervalMultiDay(self): + """Multi-day interval should distribute counts proportionally""" + from datetime import datetime, timezone + begin = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + end = datetime(2024, 1, 3, 0, 0, 0, tzinfo=timezone.utc) + result = parsedmarc._bucket_interval_by_day(begin, end, 100) + self.assertEqual(len(result), 2) + total = sum(b["count"] for b in result) + self.assertEqual(total, 100) + # Equal days => equal distribution + self.assertEqual(result[0]["count"], 50) + self.assertEqual(result[1]["count"], 50) + + def testBucketIntervalRemainderDistribution(self): + """Odd count across equal days distributes remainder correctly""" + from datetime import datetime, timezone + begin = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + end = datetime(2024, 1, 4, 0, 0, 0, tzinfo=timezone.utc) + result = parsedmarc._bucket_interval_by_day(begin, end, 10) + total = sum(b["count"] for b in result) + self.assertEqual(total, 10) + self.assertEqual(len(result), 3) + + def testBucketIntervalPartialDays(self): + """Partial days: 12h on day1, 24h on day2 => 1/3 vs 2/3 split""" + from datetime import datetime, timezone + begin = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + end = datetime(2024, 1, 3, 0, 0, 0, tzinfo=timezone.utc) + result = parsedmarc._bucket_interval_by_day(begin, end, 90) + total = sum(b["count"] for b in result) + self.assertEqual(total, 90) + # day1: 12h, day2: 24h => 1/3 vs 2/3 + self.assertEqual(result[0]["count"], 30) + self.assertEqual(result[1]["count"], 60) + + # ============================================================ # Tests for _append_parsed_record + # ============================================================ + def testAppendParsedRecordNoNormalize(self): + """No normalization: record appended as-is with interval fields""" + from datetime import datetime, timezone + records = [] + rec = {"count": 10, "source": {"ip_address": "1.2.3.4"}} + begin = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + end = datetime(2024, 1, 2, 0, 0, 0, tzinfo=timezone.utc) + parsedmarc._append_parsed_record(rec, records, begin, end, False) + self.assertEqual(len(records), 1) + self.assertFalse(records[0]["normalized_timespan"]) + self.assertEqual(records[0]["interval_begin"], "2024-01-01 00:00:00") + self.assertEqual(records[0]["interval_end"], "2024-01-02 00:00:00") + + def testAppendParsedRecordNormalize(self): + """Normalization: record split into daily buckets""" + from datetime import datetime, timezone + records = [] + rec = {"count": 100, "source": {"ip_address": "1.2.3.4"}} + begin = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + end = datetime(2024, 1, 3, 0, 0, 0, tzinfo=timezone.utc) + parsedmarc._append_parsed_record(rec, records, begin, end, True) + self.assertEqual(len(records), 2) + total = sum(r["count"] for r in records) + self.assertEqual(total, 100) + for r in records: + self.assertTrue(r["normalized_timespan"]) + + def testAppendParsedRecordNormalizeZeroCount(self): + """Normalization with zero count: nothing appended""" + from datetime import datetime, timezone + records = [] + rec = {"count": 0, "source": {"ip_address": "1.2.3.4"}} + begin = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + end = datetime(2024, 1, 3, 0, 0, 0, tzinfo=timezone.utc) + parsedmarc._append_parsed_record(rec, records, begin, end, True) + self.assertEqual(len(records), 0) + + # ============================================================ # Tests for _parse_report_record + # ============================================================ + def testParseReportRecordNoneSourceIP(self): + """Record with None source_ip should raise ValueError""" + record = { + "row": { + "source_ip": None, + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": {"dkim": [], "spf": []}, + } + with self.assertRaises(ValueError): + parsedmarc._parse_report_record(record, offline=True) + + def testParseReportRecordMissingDkimSpf(self): + """Record with missing dkim/spf auth results defaults correctly""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "5", + "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "fail"}, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": {}, + } + result = parsedmarc._parse_report_record(record, offline=True) + self.assertEqual(result["auth_results"]["dkim"], []) + self.assertEqual(result["auth_results"]["spf"], []) + + def testParseReportRecordReasonHandling(self): + """Reasons in policy_evaluated get normalized with comment default""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "pass", + "reason": {"type": "forwarded"}, + }, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": {"dkim": [], "spf": []}, + } + result = parsedmarc._parse_report_record(record, offline=True) + reasons = result["policy_evaluated"]["policy_override_reasons"] + self.assertEqual(len(reasons), 1) + self.assertEqual(reasons[0]["type"], "forwarded") + self.assertIsNone(reasons[0]["comment"]) + + def testParseReportRecordReasonList(self): + """Multiple reasons as a list are preserved""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "pass", + "reason": [ + {"type": "forwarded", "comment": "relay"}, + {"type": "local_policy"}, + ], + }, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": {"dkim": [], "spf": []}, + } + result = parsedmarc._parse_report_record(record, offline=True) + reasons = result["policy_evaluated"]["policy_override_reasons"] + self.assertEqual(len(reasons), 2) + self.assertEqual(reasons[0]["comment"], "relay") + self.assertIsNone(reasons[1]["comment"]) + + def testParseReportRecordIdentities(self): + """'identities' key is mapped to 'identifiers'""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + }, + "identities": {"header_from": "Example.COM", "envelope_from": "example.com"}, + "auth_results": {"dkim": [], "spf": []}, + } + result = parsedmarc._parse_report_record(record, offline=True) + self.assertIn("identifiers", result) + self.assertEqual(result["identifiers"]["header_from"], "example.com") + + def testParseReportRecordDkimDefaults(self): + """DKIM result defaults: selector='none', result='none' when missing""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "fail", "spf": "fail"}, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": { + "dkim": {"domain": "example.com"}, + "spf": [], + }, + } + result = parsedmarc._parse_report_record(record, offline=True) + dkim = result["auth_results"]["dkim"][0] + self.assertEqual(dkim["selector"], "none") + self.assertEqual(dkim["result"], "none") + self.assertIsNone(dkim["human_result"]) + + def testParseReportRecordSpfDefaults(self): + """SPF result defaults: scope='mfrom', result='none' when missing""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "fail", "spf": "fail"}, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": { + "dkim": [], + "spf": {"domain": "example.com"}, + }, + } + result = parsedmarc._parse_report_record(record, offline=True) + spf = result["auth_results"]["spf"][0] + self.assertEqual(spf["scope"], "mfrom") + self.assertEqual(spf["result"], "none") + self.assertIsNone(spf["human_result"]) + + def testParseReportRecordHumanResult(self): + """human_result field is included when present""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": { + "dkim": [{"domain": "example.com", "selector": "s1", + "result": "pass", "human_result": "good key"}], + "spf": [{"domain": "example.com", "scope": "mfrom", + "result": "pass", "human_result": "sender valid"}], + }, + } + result = parsedmarc._parse_report_record(record, offline=True) + self.assertEqual(result["auth_results"]["dkim"][0]["human_result"], "good key") + self.assertEqual(result["auth_results"]["spf"][0]["human_result"], "sender valid") + + def testParseReportRecordEnvelopeFromFallback(self): + """envelope_from falls back to last SPF domain when missing""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": { + "dkim": [], + "spf": [{"domain": "Bounce.Example.COM", "scope": "mfrom", "result": "pass"}], + }, + } + result = parsedmarc._parse_report_record(record, offline=True) + self.assertEqual(result["identifiers"]["envelope_from"], "bounce.example.com") + + def testParseReportRecordEnvelopeFromNullFallback(self): + """envelope_from None value falls back to SPF domain""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + }, + "identifiers": { + "header_from": "example.com", + "envelope_from": None, + }, + "auth_results": { + "dkim": [], + "spf": [{"domain": "SPF.Example.COM", "scope": "mfrom", "result": "pass"}], + }, + } + result = parsedmarc._parse_report_record(record, offline=True) + self.assertEqual(result["identifiers"]["envelope_from"], "spf.example.com") + + def testParseReportRecordEnvelopeTo(self): + """envelope_to is preserved and moved correctly""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + }, + "identifiers": { + "header_from": "example.com", + "envelope_from": "bounce@example.com", + "envelope_to": "recipient@example.com", + }, + "auth_results": {"dkim": [], "spf": []}, + } + result = parsedmarc._parse_report_record(record, offline=True) + self.assertEqual(result["identifiers"]["envelope_to"], "recipient@example.com") + + def testParseReportRecordAlignment(self): + """Alignment fields computed correctly from policy_evaluated""" + record = { + "row": { + "source_ip": "192.0.2.1", + "count": "1", + "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "fail"}, + }, + "identifiers": {"header_from": "example.com"}, + "auth_results": {"dkim": [], "spf": []}, + } + result = parsedmarc._parse_report_record(record, offline=True) + self.assertTrue(result["alignment"]["dkim"]) + self.assertFalse(result["alignment"]["spf"]) + self.assertTrue(result["alignment"]["dmarc"]) + + # ============================================================ # Tests for _parse_smtp_tls_failure_details + # ============================================================ + def testParseSmtpTlsFailureDetailsMinimal(self): + """Minimal failure details with just required fields""" + details = { + "result-type": "certificate-expired", + "failed-session-count": 5, + } + result = parsedmarc._parse_smtp_tls_failure_details(details) + self.assertEqual(result["result_type"], "certificate-expired") + self.assertEqual(result["failed_session_count"], 5) + self.assertNotIn("sending_mta_ip", result) + + def testParseSmtpTlsFailureDetailsAllOptional(self): + """All optional fields included""" + details = { + "result-type": "starttls-not-supported", + "failed-session-count": 3, + "sending-mta-ip": "10.0.0.1", + "receiving-ip": "10.0.0.2", + "receiving-mx-hostname": "mx.example.com", + "receiving-mx-helo": "mx.example.com", + "additional-info-uri": "https://example.com/info", + "failure-reason-code": "TLS_ERROR", + } + result = parsedmarc._parse_smtp_tls_failure_details(details) + self.assertEqual(result["sending_mta_ip"], "10.0.0.1") + self.assertEqual(result["receiving_ip"], "10.0.0.2") + self.assertEqual(result["receiving_mx_hostname"], "mx.example.com") + self.assertEqual(result["receiving_mx_helo"], "mx.example.com") + self.assertEqual(result["additional_info_uri"], "https://example.com/info") + self.assertEqual(result["failure_reason_code"], "TLS_ERROR") + + def testParseSmtpTlsFailureDetailsMissingRequired(self): + """Missing required field raises InvalidSMTPTLSReport""" + with self.assertRaises(parsedmarc.InvalidSMTPTLSReport): + parsedmarc._parse_smtp_tls_failure_details({"result-type": "err"}) + + # ============================================================ # Tests for _parse_smtp_tls_report_policy + # ============================================================ + def testParseSmtpTlsReportPolicyValid(self): + """Valid STS policy parses correctly""" + policy = { + "policy": { + "policy-type": "sts", + "policy-domain": "example.com", + "policy-string": ["version: STSv1", "mode: enforce"], + "mx-host-pattern": ["*.example.com"], + }, + "summary": { + "total-successful-session-count": 100, + "total-failure-session-count": 2, + }, + } + result = parsedmarc._parse_smtp_tls_report_policy(policy) + self.assertEqual(result["policy_type"], "sts") + self.assertEqual(result["policy_domain"], "example.com") + self.assertEqual(result["policy_strings"], ["version: STSv1", "mode: enforce"]) + self.assertEqual(result["mx_host_patterns"], ["*.example.com"]) + self.assertEqual(result["successful_session_count"], 100) + self.assertEqual(result["failed_session_count"], 2) + + def testParseSmtpTlsReportPolicyInvalidType(self): + """Invalid policy type raises InvalidSMTPTLSReport""" + policy = { + "policy": { + "policy-type": "invalid", + "policy-domain": "example.com", + }, + "summary": { + "total-successful-session-count": 0, + "total-failure-session-count": 0, + }, + } + with self.assertRaises(parsedmarc.InvalidSMTPTLSReport): + parsedmarc._parse_smtp_tls_report_policy(policy) + + def testParseSmtpTlsReportPolicyEmptyPolicyString(self): + """Empty policy-string list is not included""" + policy = { + "policy": { + "policy-type": "sts", + "policy-domain": "example.com", + "policy-string": [], + "mx-host-pattern": [], + }, + "summary": { + "total-successful-session-count": 50, + "total-failure-session-count": 0, + }, + } + result = parsedmarc._parse_smtp_tls_report_policy(policy) + self.assertNotIn("policy_strings", result) + self.assertNotIn("mx_host_patterns", result) + + def testParseSmtpTlsReportPolicyWithFailureDetails(self): + """Policy with failure-details parses nested details""" + policy = { + "policy": { + "policy-type": "sts", + "policy-domain": "example.com", + }, + "summary": { + "total-successful-session-count": 10, + "total-failure-session-count": 1, + }, + "failure-details": [ + { + "result-type": "certificate-expired", + "failed-session-count": 1, + } + ], + } + result = parsedmarc._parse_smtp_tls_report_policy(policy) + self.assertEqual(len(result["failure_details"]), 1) + self.assertEqual(result["failure_details"][0]["result_type"], "certificate-expired") + + def testParseSmtpTlsReportPolicyMissingField(self): + """Missing required policy field raises InvalidSMTPTLSReport""" + policy = {"policy": {"policy-type": "sts"}, "summary": {}} + with self.assertRaises(parsedmarc.InvalidSMTPTLSReport): + parsedmarc._parse_smtp_tls_report_policy(policy) + + # ============================================================ # Tests for parse_smtp_tls_report_json + # ============================================================ + def testParseSmtpTlsReportJsonValid(self): + """Valid SMTP TLS JSON report parses correctly""" + import json + report = json.dumps({ + "organization-name": "Example Corp", + "date-range": { + "start-datetime": "2024-01-01T00:00:00Z", + "end-datetime": "2024-01-02T00:00:00Z", + }, + "contact-info": "admin@example.com", + "report-id": "report-123", + "policies": [ + { + "policy": { + "policy-type": "sts", + "policy-domain": "example.com", + }, + "summary": { + "total-successful-session-count": 50, + "total-failure-session-count": 0, + }, + } + ], + }) + result = parsedmarc.parse_smtp_tls_report_json(report) + self.assertEqual(result["organization_name"], "Example Corp") + self.assertEqual(result["report_id"], "report-123") + self.assertEqual(len(result["policies"]), 1) + + def testParseSmtpTlsReportJsonBytes(self): + """SMTP TLS report as bytes parses correctly""" + import json + report = json.dumps({ + "organization-name": "Org", + "date-range": {"start-datetime": "2024-01-01", "end-datetime": "2024-01-02"}, + "contact-info": "a@b.com", + "report-id": "r1", + "policies": [{ + "policy": {"policy-type": "tlsa", "policy-domain": "a.com"}, + "summary": {"total-successful-session-count": 1, "total-failure-session-count": 0}, + }], + }).encode("utf-8") + result = parsedmarc.parse_smtp_tls_report_json(report) + self.assertEqual(result["organization_name"], "Org") + + def testParseSmtpTlsReportJsonMissingField(self): + """Missing required field raises InvalidSMTPTLSReport""" + import json + report = json.dumps({"organization-name": "Org"}) + with self.assertRaises(parsedmarc.InvalidSMTPTLSReport): + parsedmarc.parse_smtp_tls_report_json(report) + + def testParseSmtpTlsReportJsonPoliciesNotList(self): + """Non-list policies raises InvalidSMTPTLSReport""" + import json + report = json.dumps({ + "organization-name": "Org", + "date-range": {"start-datetime": "2024-01-01", "end-datetime": "2024-01-02"}, + "contact-info": "a@b.com", + "report-id": "r1", + "policies": "not-a-list", + }) + with self.assertRaises(parsedmarc.InvalidSMTPTLSReport): + parsedmarc.parse_smtp_tls_report_json(report) + + # ============================================================ # Tests for aggregate report parsing (validation warnings, etc.) + # ============================================================ + def testAggregateReportInvalidNpWarning(self): + """Invalid np value is preserved but logs warning""" + xml = """ + + 1.0 + + Test Org + test@example.com + test-np-invalid + 17040672001704153599 + + + example.com +

none

+ banana + maybe + magic +
+ + + 192.0.2.1 + 1 + + none + pass + pass + + + example.com + + example.compass + + +
""" + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + # Invalid values are still stored + self.assertEqual(report["policy_published"]["np"], "banana") + self.assertEqual(report["policy_published"]["testing"], "maybe") + self.assertEqual(report["policy_published"]["discovery_method"], "magic") + + def testAggregateReportPassDisposition(self): + """'pass' as valid disposition is preserved""" + xml = """ + + + TestOrg + test@example.com + test-pass + 17040672001704153599 + + + example.com +

reject

+
+ + + 192.0.2.1 + 1 + + pass + pass + pass + + + example.com + + example.compass + + +
""" + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + self.assertEqual(report["records"][0]["policy_evaluated"]["disposition"], "pass") + + def testAggregateReportMultipleRecords(self): + """Reports with multiple records are all parsed""" + xml = """ + + + TestOrg + test@example.com + test-multi + 17040672001704153599 + + + example.com +

none

+
+ + + 192.0.2.1 + 10 + nonepasspass + + example.com + example.compass + + + + 192.0.2.2 + 5 + quarantinefailfail + + example.com + example.comfail + +
""" + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + self.assertEqual(len(report["records"]), 2) + self.assertEqual(report["records"][0]["count"], 10) + self.assertEqual(report["records"][1]["count"], 5) + + def testAggregateReportInvalidXmlRecovery(self): + """Badly formed XML is recovered via lxml""" + xml = 'Testt@e.comr117040672001704153599example.com

none

192.0.2.11nonepasspassexample.comexample.compass
' + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + self.assertEqual(report["report_metadata"]["report_id"], "r1") + + def testAggregateReportCsvRowsContainDMARCbisFields(self): + """CSV rows include np, testing, discovery_method columns""" + result = parsedmarc.parse_report_file( + "samples/aggregate/dmarcbis-draft-sample.xml", + always_use_local_files=True, offline=True, + ) + report = result["report"] + rows = parsedmarc.parsed_aggregate_reports_to_csv_rows(report) + self.assertTrue(len(rows) > 0) + row = rows[0] + self.assertIn("np", row) + self.assertIn("testing", row) + self.assertIn("discovery_method", row) + self.assertIn("source_ip_address", row) + self.assertIn("dkim_domains", row) + self.assertIn("spf_domains", row) + + def testAggregateReportSchemaVersion(self): + """DMARCbis report with returns correct xml_schema""" + xml = """ + + 1.0 + + TestOrg + test@example.com + test-version + 17040672001704153599 + + + example.com +

none

+
+ + + 192.0.2.1 + 1 + nonepasspass + + example.com + example.compass + +
""" + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + self.assertEqual(report["xml_schema"], "1.0") + + def testAggregateReportDraftSchema(self): + """Report without defaults to 'draft' schema""" + xml = """ + + + TestOrg + test@example.com + test-draft + 17040672001704153599 + + + example.com +

none

+
+ + + 192.0.2.1 + 1 + nonepasspass + + example.com + example.compass + +
""" + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + self.assertEqual(report["xml_schema"], "draft") + + def testAggregateReportGeneratorField(self): + """Generator field is correctly extracted""" + xml = """ + + + TestOrg + test@example.com + test-gen + My Reporter v1.0 + 17040672001704153599 + + + example.com +

none

+
+ + + 192.0.2.1 + 1 + nonepasspass + + example.com + example.compass + +
""" + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + self.assertEqual(report["report_metadata"]["generator"], "My Reporter v1.0") + + def testAggregateReportReportErrors(self): + """Report errors in metadata are captured""" + xml = """ + + + TestOrg + test@example.com + test-err + Some error + 17040672001704153599 + + + example.com +

none

+
+ + + 192.0.2.1 + 1 + nonepasspass + + example.com + example.compass + +
""" + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + self.assertIn("Some error", report["report_metadata"]["errors"]) + + def testAggregateReportPolicyDefaults(self): + """Policy defaults: adkim/aspf='r', sp=p, pct/fo=None""" + xml = """ + + + TestOrg + test@example.com + test-defaults + 17040672001704153599 + + + example.com +

reject

+
+ + + 192.0.2.1 + 1 + nonepasspass + + example.com + example.compass + +
""" + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + pp = report["policy_published"] + self.assertEqual(pp["adkim"], "r") + self.assertEqual(pp["aspf"], "r") + self.assertEqual(pp["sp"], "reject") # defaults to p + self.assertIsNone(pp["pct"]) + self.assertIsNone(pp["fo"]) + self.assertIsNone(pp["np"]) + self.assertIsNone(pp["testing"]) + self.assertIsNone(pp["discovery_method"]) + + def testMagicXmlTagDetection(self): + """XML without declaration (starting with '<') is extracted""" + xml_no_decl = b"Ta@b.comr117040672001704153599example.com

none

192.0.2.11nonepasspassexample.comexample.compass
" + self.assertTrue(xml_no_decl.startswith(parsedmarc.MAGIC_XML_TAG)) + # Ensure it extracts as XML + result = parsedmarc.extract_report(xml_no_decl) + self.assertIn("", result) + + # ============================================================ # Tests for parsedmarc/utils.py + # ============================================================ + def testTimestampToDatetime(self): + """timestamp_to_datetime converts UNIX timestamp to datetime""" + from datetime import datetime + dt = parsedmarc.utils.timestamp_to_datetime(0) + self.assertIsInstance(dt, datetime) + # Epoch 0 should be Jan 1 1970 in local time + self.assertEqual(dt.year, 1970) + + def testTimestampToHuman(self): + """timestamp_to_human returns formatted string""" + result = parsedmarc.utils.timestamp_to_human(1704067200) + self.assertRegex(result, r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}") + + def testHumanTimestampToDatetime(self): + """human_timestamp_to_datetime parses timestamp string""" + from datetime import datetime + dt = parsedmarc.utils.human_timestamp_to_datetime("2024-01-01 00:00:00") + self.assertIsInstance(dt, datetime) + self.assertEqual(dt.year, 2024) + self.assertEqual(dt.month, 1) + self.assertEqual(dt.day, 1) + + def testHumanTimestampToDatetimeUtc(self): + """human_timestamp_to_datetime with to_utc=True returns UTC""" + from datetime import timezone + dt = parsedmarc.utils.human_timestamp_to_datetime( + "2024-01-01 12:00:00", to_utc=True + ) + self.assertEqual(dt.tzinfo, timezone.utc) + + def testHumanTimestampToDatetimeParenthesisStripping(self): + """Parenthesized content is stripped from timestamps""" + dt = parsedmarc.utils.human_timestamp_to_datetime( + "Mon, 01 Jan 2024 00:00:00 +0000 (UTC)" + ) + self.assertEqual(dt.year, 2024) + + def testHumanTimestampToDatetimeNegativeZero(self): + """-0000 timezone is handled""" + dt = parsedmarc.utils.human_timestamp_to_datetime( + "2024-01-01 00:00:00 -0000" + ) + self.assertEqual(dt.year, 2024) + + def testHumanTimestampToUnixTimestamp(self): + """human_timestamp_to_unix_timestamp converts to int""" + ts = parsedmarc.utils.human_timestamp_to_unix_timestamp("2024-01-01 00:00:00") + self.assertIsInstance(ts, int) + + def testHumanTimestampToUnixTimestampWithT(self): + """T separator in timestamp is handled""" + ts = parsedmarc.utils.human_timestamp_to_unix_timestamp("2024-01-01T00:00:00") + self.assertIsInstance(ts, int) + + def testGetIpAddressCountry(self): + """get_ip_address_country returns country code using bundled DBIP""" + # 8.8.8.8 is a well-known Google DNS IP in US + country = parsedmarc.utils.get_ip_address_country("8.8.8.8") + self.assertEqual(country, "US") + + def testGetIpAddressCountryNotFound(self): + """get_ip_address_country returns None for reserved IP""" + country = parsedmarc.utils.get_ip_address_country("127.0.0.1") + self.assertIsNone(country) + + def testGetServiceFromReverseDnsBaseDomainOffline(self): + """get_service_from_reverse_dns_base_domain in offline mode""" + result = parsedmarc.utils.get_service_from_reverse_dns_base_domain( + "google.com", offline=True + ) + self.assertIn("Google", result["name"]) + self.assertIsNotNone(result["type"]) + + def testGetServiceFromReverseDnsBaseDomainUnknown(self): + """Unknown base domain returns domain as name and None as type""" + result = parsedmarc.utils.get_service_from_reverse_dns_base_domain( + "unknown-domain-xyz.example", offline=True + ) + self.assertEqual(result["name"], "unknown-domain-xyz.example") + self.assertIsNone(result["type"]) + + def testGetIpAddressInfoOffline(self): + """get_ip_address_info in offline mode returns country but no DNS""" + info = parsedmarc.utils.get_ip_address_info("8.8.8.8", offline=True) + self.assertEqual(info["ip_address"], "8.8.8.8") + self.assertEqual(info["country"], "US") + self.assertIsNone(info["reverse_dns"]) + + def testGetIpAddressInfoCache(self): + """get_ip_address_info uses cache on second call""" + from expiringdict import ExpiringDict + cache = ExpiringDict(max_len=100, max_age_seconds=60) + # offline=True means reverse_dns is None, so cache is not populated + # Use offline=False with mock to test cache + from unittest.mock import patch + with patch("parsedmarc.utils.get_reverse_dns", return_value="dns.google"): + info1 = parsedmarc.utils.get_ip_address_info( + "8.8.8.8", offline=False, cache=cache, + always_use_local_files=True, + ) + self.assertIn("8.8.8.8", cache) + info2 = parsedmarc.utils.get_ip_address_info("8.8.8.8", offline=False, cache=cache) + self.assertEqual(info1["ip_address"], info2["ip_address"]) + self.assertEqual(info2["reverse_dns"], "dns.google") + + def testParseEmailAddressWithDisplayName(self): + """parse_email_address with display name""" + result = parsedmarc.utils.parse_email_address(("John Doe", "john@example.com")) + self.assertEqual(result["display_name"], "John Doe") + self.assertEqual(result["address"], "john@example.com") + self.assertEqual(result["local"], "john") + self.assertEqual(result["domain"], "example.com") + + def testParseEmailAddressWithoutDisplayName(self): + """parse_email_address with empty display name""" + result = parsedmarc.utils.parse_email_address(("", "john@example.com")) + self.assertIsNone(result["display_name"]) + self.assertEqual(result["address"], "john@example.com") + + def testParseEmailAddressNoAt(self): + """parse_email_address with no @ returns None local/domain""" + result = parsedmarc.utils.parse_email_address(("", "localonly")) + self.assertIsNone(result["local"]) + self.assertIsNone(result["domain"]) + + def testGetFilenameSafeString(self): + """get_filename_safe_string removes invalid chars""" + result = parsedmarc.utils.get_filename_safe_string('file/name:with"bad*chars') + self.assertNotIn("/", result) + self.assertNotIn(":", result) + self.assertNotIn('"', result) + self.assertNotIn("*", result) + + def testGetFilenameSafeStringNone(self): + """get_filename_safe_string with None returns 'None'""" + result = parsedmarc.utils.get_filename_safe_string(None) + self.assertEqual(result, "None") + + def testGetFilenameSafeStringLong(self): + """get_filename_safe_string truncates to 100 chars""" + result = parsedmarc.utils.get_filename_safe_string("a" * 200) + self.assertEqual(len(result), 100) + + def testGetFilenameSafeStringTrailingDot(self): + """get_filename_safe_string strips trailing dots""" + result = parsedmarc.utils.get_filename_safe_string("filename...") + self.assertFalse(result.endswith(".")) + + def testIsMboxNonMbox(self): + """is_mbox returns False for non-mbox file""" + result = parsedmarc.utils.is_mbox("samples/empty.xml") + self.assertFalse(result) + + def testIsOutlookMsgNonMsg(self): + """is_outlook_msg returns False for non-MSG content""" + self.assertFalse(parsedmarc.utils.is_outlook_msg(b"not an outlook msg")) + self.assertFalse(parsedmarc.utils.is_outlook_msg("string content")) + + def testIsOutlookMsgMagic(self): + """is_outlook_msg returns True for correct magic bytes""" + magic = b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1" + b"\x00" * 100 + self.assertTrue(parsedmarc.utils.is_outlook_msg(magic)) + + # ============================================================ # Tests for output modules (mocked) + # ============================================================ + def testWebhookClientInit(self): + """WebhookClient initializes with correct attributes""" + from parsedmarc.webhook import WebhookClient + client = WebhookClient( + aggregate_url="http://agg.example.com", + failure_url="http://fail.example.com", + smtp_tls_url="http://tls.example.com", + ) + self.assertEqual(client.aggregate_url, "http://agg.example.com") + self.assertEqual(client.failure_url, "http://fail.example.com") + self.assertEqual(client.smtp_tls_url, "http://tls.example.com") + self.assertEqual(client.timeout, 60) + + def testWebhookClientSaveMethods(self): + """WebhookClient save methods call _send_to_webhook""" + from unittest.mock import MagicMock + from parsedmarc.webhook import WebhookClient + client = WebhookClient("http://a", "http://f", "http://t") + client.session = MagicMock() + client.save_aggregate_report_to_webhook('{"test": 1}') + client.session.post.assert_called_with("http://a", data='{"test": 1}', timeout=60) + client.save_failure_report_to_webhook('{"fail": 1}') + client.session.post.assert_called_with("http://f", data='{"fail": 1}', timeout=60) + client.save_smtp_tls_report_to_webhook('{"tls": 1}') + client.session.post.assert_called_with("http://t", data='{"tls": 1}', timeout=60) + + def testWebhookBackwardCompatAlias(self): + """WebhookClient forensic alias points to failure method""" + from parsedmarc.webhook import WebhookClient + self.assertIs( + WebhookClient.save_forensic_report_to_webhook, + WebhookClient.save_failure_report_to_webhook, + ) + + def testKafkaStripMetadata(self): + """KafkaClient.strip_metadata extracts metadata to root""" + from parsedmarc.kafkaclient import KafkaClient + report = { + "report_metadata": { + "org_name": "TestOrg", + "org_email": "test@example.com", + "report_id": "r-123", + "begin_date": "2024-01-01", + "end_date": "2024-01-02", + }, + "records": [], + } + result = KafkaClient.strip_metadata(report) + self.assertEqual(result["org_name"], "TestOrg") + self.assertEqual(result["org_email"], "test@example.com") + self.assertEqual(result["report_id"], "r-123") + self.assertNotIn("report_metadata", result) + + def testKafkaGenerateDateRange(self): + """KafkaClient.generate_date_range generates date range list""" + from parsedmarc.kafkaclient import KafkaClient + report = { + "report_metadata": { + "begin_date": "2024-01-01 00:00:00", + "end_date": "2024-01-02 00:00:00", + } + } + result = KafkaClient.generate_date_range(report) + self.assertEqual(len(result), 2) + self.assertIn("2024-01-01", result[0]) + self.assertIn("2024-01-02", result[1]) + + def testSplunkHECClientInit(self): + """HECClient initializes with correct URL and headers""" + from parsedmarc.splunk import HECClient + client = HECClient( + url="https://splunk.example.com:8088", + access_token="my-token", + index="main", + ) + self.assertIn("/services/collector/event/1.0", client.url) + self.assertEqual(client.access_token, "my-token") + self.assertEqual(client.index, "main") + self.assertEqual(client.source, "parsedmarc") + self.assertIn("Splunk my-token", client.session.headers["Authorization"]) + + def testSplunkHECClientStripTokenPrefix(self): + """HECClient strips 'Splunk ' prefix from token""" + from parsedmarc.splunk import HECClient + client = HECClient( + url="https://splunk.example.com", + access_token="Splunk my-token", + index="main", + ) + self.assertEqual(client.access_token, "my-token") + + def testSplunkBackwardCompatAlias(self): + """HECClient forensic alias points to failure method""" + from parsedmarc.splunk import HECClient + self.assertIs( + HECClient.save_forensic_reports_to_splunk, + HECClient.save_failure_reports_to_splunk, + ) + + def testSyslogClientUdpInit(self): + """SyslogClient creates UDP handler""" + from parsedmarc.syslog import SyslogClient + client = SyslogClient("localhost", 514, protocol="udp") + self.assertEqual(client.server_name, "localhost") + self.assertEqual(client.server_port, 514) + self.assertEqual(client.protocol, "udp") + + def testSyslogClientInvalidProtocol(self): + """SyslogClient with invalid protocol raises ValueError""" + from parsedmarc.syslog import SyslogClient + with self.assertRaises(ValueError): + SyslogClient("localhost", 514, protocol="invalid") + + def testSyslogBackwardCompatAlias(self): + """SyslogClient forensic alias points to failure method""" + from parsedmarc.syslog import SyslogClient + self.assertIs( + SyslogClient.save_forensic_report_to_syslog, + SyslogClient.save_failure_report_to_syslog, + ) + + def testLogAnalyticsConfig(self): + """LogAnalyticsConfig stores all fields""" + from parsedmarc.loganalytics import LogAnalyticsConfig + config = LogAnalyticsConfig( + client_id="cid", + client_secret="csec", + tenant_id="tid", + dce="https://dce.example.com", + dcr_immutable_id="dcr-123", + dcr_aggregate_stream="agg-stream", + dcr_failure_stream="fail-stream", + dcr_smtp_tls_stream="tls-stream", + ) + self.assertEqual(config.client_id, "cid") + self.assertEqual(config.client_secret, "csec") + self.assertEqual(config.tenant_id, "tid") + self.assertEqual(config.dce, "https://dce.example.com") + self.assertEqual(config.dcr_immutable_id, "dcr-123") + self.assertEqual(config.dcr_aggregate_stream, "agg-stream") + self.assertEqual(config.dcr_failure_stream, "fail-stream") + self.assertEqual(config.dcr_smtp_tls_stream, "tls-stream") + + def testLogAnalyticsClientValidationError(self): + """LogAnalyticsClient raises on missing required config""" + from parsedmarc.loganalytics import LogAnalyticsClient, LogAnalyticsException + with self.assertRaises(LogAnalyticsException): + LogAnalyticsClient( + client_id="", + client_secret="csec", + tenant_id="tid", + dce="https://dce.example.com", + dcr_immutable_id="dcr-123", + dcr_aggregate_stream="agg", + dcr_failure_stream="fail", + dcr_smtp_tls_stream="tls", + ) + + def testSmtpTlsCsvRows(self): + """parsed_smtp_tls_reports_to_csv_rows produces correct rows""" + import json + report_json = json.dumps({ + "organization-name": "Org", + "date-range": {"start-datetime": "2024-01-01T00:00:00Z", "end-datetime": "2024-01-02T00:00:00Z"}, + "contact-info": "a@b.com", + "report-id": "r1", + "policies": [{ + "policy": {"policy-type": "sts", "policy-domain": "example.com", + "policy-string": ["v: STSv1"], "mx-host-pattern": ["*.example.com"]}, + "summary": {"total-successful-session-count": 10, "total-failure-session-count": 1}, + "failure-details": [{"result-type": "cert-expired", "failed-session-count": 1}], + }], + }) + parsed = parsedmarc.parse_smtp_tls_report_json(report_json) + rows = parsedmarc.parsed_smtp_tls_reports_to_csv_rows(parsed) + self.assertTrue(len(rows) >= 2) + self.assertEqual(rows[0]["organization_name"], "Org") + self.assertEqual(rows[0]["policy_domain"], "example.com") + + def testParsedAggregateReportsToCsvRowsList(self): + """parsed_aggregate_reports_to_csv_rows handles list of reports""" + result = parsedmarc.parse_report_file( + "samples/aggregate/dmarcbis-draft-sample.xml", + always_use_local_files=True, offline=True, + ) + report = result["report"] + # Pass as a list + rows = parsedmarc.parsed_aggregate_reports_to_csv_rows([report]) + self.assertTrue(len(rows) > 0) + # Verify non-str/int/bool values are cleaned + for row in rows: + for v in row.values(): + self.assertIn(type(v), [str, int, bool]) + + def testExceptionHierarchy(self): + """Exception class hierarchy is correct""" + self.assertTrue(issubclass(parsedmarc.ParserError, RuntimeError)) + self.assertTrue(issubclass(parsedmarc.InvalidDMARCReport, parsedmarc.ParserError)) + self.assertTrue(issubclass(parsedmarc.InvalidAggregateReport, parsedmarc.InvalidDMARCReport)) + self.assertTrue(issubclass(parsedmarc.InvalidFailureReport, parsedmarc.InvalidDMARCReport)) + self.assertTrue(issubclass(parsedmarc.InvalidSMTPTLSReport, parsedmarc.ParserError)) + self.assertIs(parsedmarc.InvalidForensicReport, parsedmarc.InvalidFailureReport) + + def testAggregateReportNormalization(self): + """Reports spanning >24h get normalized per day""" + xml = """ + + + TestOrg + test@example.com + test-norm + 17040672001704326400 + + + example.com +

none

+
+ + + 192.0.2.1 + 90 + nonepasspass + + example.com + example.compass + +
""" + # Span is 259200 seconds (3 days), exceeds default 24h threshold + report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) + self.assertTrue(report["report_metadata"]["timespan_requires_normalization"]) + # Records should be split across days + self.assertTrue(len(report["records"]) > 1) + total = sum(r["count"] for r in report["records"]) + self.assertEqual(total, 90) + for r in report["records"]: + self.assertTrue(r["normalized_timespan"]) class TestLoadPSLOverrides(unittest.TestCase):