From 73f4e90fdb9cf0368d3b0926bfe22075f9bf38fa Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Sun, 22 Mar 2026 14:38:32 -0400 Subject: [PATCH] 9.3.1 Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path` - Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec` - Splunk HEC `skip_certificate_verification` now works correctly with self-signed certificates - SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict - Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error") - Enhanced error handling for output client initialization --- tests.py | 366 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 249 insertions(+), 117 deletions(-) diff --git a/tests.py b/tests.py index 37e33dd..e2c231a 100755 --- a/tests.py +++ b/tests.py @@ -184,7 +184,9 @@ class Test(unittest.TestCase): sample_path, always_use_local_files=True, offline=OFFLINE_MODE ) assert result["report_type"] == "aggregate" - parsedmarc.parsed_aggregate_reports_to_csv(cast(AggregateReport, result["report"])) + parsedmarc.parsed_aggregate_reports_to_csv( + cast(AggregateReport, result["report"]) + ) print("Passed!") def testEmptySample(self): @@ -205,11 +207,11 @@ class Test(unittest.TestCase): sample_content, offline=OFFLINE_MODE ) assert email_result["report_type"] == "failure" - result = parsedmarc.parse_report_file( - sample_path, offline=OFFLINE_MODE - ) + result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE) assert result["report_type"] == "failure" - parsedmarc.parsed_failure_reports_to_csv(cast(FailureReport, result["report"])) + parsedmarc.parsed_failure_reports_to_csv( + cast(FailureReport, result["report"]) + ) print("Passed!") def testFailureReportBackwardCompat(self): @@ -234,9 +236,7 @@ class Test(unittest.TestCase): def testDMARCbisDraftSample(self): """Test parsing the sample report from the DMARCbis aggregate draft""" print() - sample_path = ( - "samples/aggregate/dmarcbis-draft-sample.xml" - ) + sample_path = "samples/aggregate/dmarcbis-draft-sample.xml" print("Testing {0}: ".format(sample_path), end="") result = parsedmarc.parse_report_file( sample_path, always_use_local_files=True, offline=True @@ -252,15 +252,9 @@ class Test(unittest.TestCase): # Verify report_metadata metadata = report["report_metadata"] self.assertEqual(metadata["org_name"], "Sample Reporter") - self.assertEqual( - metadata["org_email"], "report_sender@example-reporter.com" - ) - self.assertEqual( - metadata["org_extra_contact_info"], "..." - ) - self.assertEqual( - metadata["report_id"], "3v98abbp8ya9n3va8yr8oa3ya" - ) + self.assertEqual(metadata["org_email"], "report_sender@example-reporter.com") + self.assertEqual(metadata["org_extra_contact_info"], "...") + self.assertEqual(metadata["report_id"], "3v98abbp8ya9n3va8yr8oa3ya") self.assertEqual( metadata["generator"], "Example DMARC Aggregate Reporter v1.2", @@ -286,9 +280,7 @@ class Test(unittest.TestCase): rec = report["records"][0] self.assertEqual(rec["source"]["ip_address"], "192.0.2.123") self.assertEqual(rec["count"], 123) - self.assertEqual( - rec["policy_evaluated"]["disposition"], "pass" - ) + self.assertEqual(rec["policy_evaluated"]["disposition"], "pass") self.assertEqual(rec["policy_evaluated"]["dkim"], "pass") self.assertEqual(rec["policy_evaluated"]["spf"], "fail") @@ -319,8 +311,7 @@ class Test(unittest.TestCase): """Test that RFC 7489 reports have None for DMARCbis-only fields""" print() sample_path = ( - "samples/aggregate/" - "example.net!example.com!1529366400!1529452799.xml" + "samples/aggregate/example.net!example.com!1529366400!1529452799.xml" ) print("Testing {0}: ".format(sample_path), end="") result = parsedmarc.parse_report_file( @@ -370,11 +361,11 @@ class Test(unittest.TestCase): continue print("Testing {0}: ".format(sample_path), end="") with self.subTest(sample=sample_path): - result = parsedmarc.parse_report_file( - sample_path, offline=OFFLINE_MODE - ) + result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE) assert result["report_type"] == "smtp_tls" - parsedmarc.parsed_smtp_tls_reports_to_csv(cast(SMTPTLSReport, result["report"])) + parsedmarc.parsed_smtp_tls_reports_to_csv( + cast(SMTPTLSReport, result["report"]) + ) print("Passed!") def testOpenSearchSigV4RequiresRegion(self): @@ -3314,7 +3305,11 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": None, "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "pass", + }, }, "identifiers": {"header_from": "example.com"}, "auth_results": {"dkim": [], "spf": []}, @@ -3328,7 +3323,11 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "5", - "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "fail"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "fail", + }, }, "identifiers": {"header_from": "example.com"}, "auth_results": {}, @@ -3390,9 +3389,16 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "pass", + }, + }, + "identities": { + "header_from": "Example.COM", + "envelope_from": "example.com", }, - "identities": {"header_from": "Example.COM", "envelope_from": "example.com"}, "auth_results": {"dkim": [], "spf": []}, } result = parsedmarc._parse_report_record(record, offline=True) @@ -3405,7 +3411,11 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "fail", "spf": "fail"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "fail", + "spf": "fail", + }, }, "identifiers": {"header_from": "example.com"}, "auth_results": { @@ -3425,7 +3435,11 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "fail", "spf": "fail"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "fail", + "spf": "fail", + }, }, "identifiers": {"header_from": "example.com"}, "auth_results": { @@ -3445,19 +3459,37 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "pass", + }, }, "identifiers": {"header_from": "example.com"}, "auth_results": { - "dkim": [{"domain": "example.com", "selector": "s1", - "result": "pass", "human_result": "good key"}], - "spf": [{"domain": "example.com", "scope": "mfrom", - "result": "pass", "human_result": "sender valid"}], + "dkim": [ + { + "domain": "example.com", + "selector": "s1", + "result": "pass", + "human_result": "good key", + } + ], + "spf": [ + { + "domain": "example.com", + "scope": "mfrom", + "result": "pass", + "human_result": "sender valid", + } + ], }, } result = parsedmarc._parse_report_record(record, offline=True) self.assertEqual(result["auth_results"]["dkim"][0]["human_result"], "good key") - self.assertEqual(result["auth_results"]["spf"][0]["human_result"], "sender valid") + self.assertEqual( + result["auth_results"]["spf"][0]["human_result"], "sender valid" + ) def testParseReportRecordEnvelopeFromFallback(self): """envelope_from falls back to last SPF domain when missing""" @@ -3465,12 +3497,18 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "pass", + }, }, "identifiers": {"header_from": "example.com"}, "auth_results": { "dkim": [], - "spf": [{"domain": "Bounce.Example.COM", "scope": "mfrom", "result": "pass"}], + "spf": [ + {"domain": "Bounce.Example.COM", "scope": "mfrom", "result": "pass"} + ], }, } result = parsedmarc._parse_report_record(record, offline=True) @@ -3482,7 +3520,11 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "pass", + }, }, "identifiers": { "header_from": "example.com", @@ -3490,7 +3532,9 @@ class TestEnvVarConfig(unittest.TestCase): }, "auth_results": { "dkim": [], - "spf": [{"domain": "SPF.Example.COM", "scope": "mfrom", "result": "pass"}], + "spf": [ + {"domain": "SPF.Example.COM", "scope": "mfrom", "result": "pass"} + ], }, } result = parsedmarc._parse_report_record(record, offline=True) @@ -3502,7 +3546,11 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "pass", + }, }, "identifiers": { "header_from": "example.com", @@ -3520,7 +3568,11 @@ class TestEnvVarConfig(unittest.TestCase): "row": { "source_ip": "192.0.2.1", "count": "1", - "policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "fail"}, + "policy_evaluated": { + "disposition": "none", + "dkim": "pass", + "spf": "fail", + }, }, "identifiers": {"header_from": "example.com"}, "auth_results": {"dkim": [], "spf": []}, @@ -3645,7 +3697,9 @@ class TestEnvVarConfig(unittest.TestCase): } result = parsedmarc._parse_smtp_tls_report_policy(policy) self.assertEqual(len(result["failure_details"]), 1) - self.assertEqual(result["failure_details"][0]["result_type"], "certificate-expired") + self.assertEqual( + result["failure_details"][0]["result_type"], "certificate-expired" + ) def testParseSmtpTlsReportPolicyMissingField(self): """Missing required policy field raises InvalidSMTPTLSReport""" @@ -3657,27 +3711,29 @@ class TestEnvVarConfig(unittest.TestCase): # ============================================================ def testParseSmtpTlsReportJsonValid(self): """Valid SMTP TLS JSON report parses correctly""" - report = json.dumps({ - "organization-name": "Example Corp", - "date-range": { - "start-datetime": "2024-01-01T00:00:00Z", - "end-datetime": "2024-01-02T00:00:00Z", - }, - "contact-info": "admin@example.com", - "report-id": "report-123", - "policies": [ - { - "policy": { - "policy-type": "sts", - "policy-domain": "example.com", - }, - "summary": { - "total-successful-session-count": 50, - "total-failure-session-count": 0, - }, - } - ], - }) + report = json.dumps( + { + "organization-name": "Example Corp", + "date-range": { + "start-datetime": "2024-01-01T00:00:00Z", + "end-datetime": "2024-01-02T00:00:00Z", + }, + "contact-info": "admin@example.com", + "report-id": "report-123", + "policies": [ + { + "policy": { + "policy-type": "sts", + "policy-domain": "example.com", + }, + "summary": { + "total-successful-session-count": 50, + "total-failure-session-count": 0, + }, + } + ], + } + ) result = parsedmarc.parse_smtp_tls_report_json(report) self.assertEqual(result["organization_name"], "Example Corp") self.assertEqual(result["report_id"], "report-123") @@ -3685,16 +3741,26 @@ class TestEnvVarConfig(unittest.TestCase): def testParseSmtpTlsReportJsonBytes(self): """SMTP TLS report as bytes parses correctly""" - report = json.dumps({ - "organization-name": "Org", - "date-range": {"start-datetime": "2024-01-01", "end-datetime": "2024-01-02"}, - "contact-info": "a@b.com", - "report-id": "r1", - "policies": [{ - "policy": {"policy-type": "tlsa", "policy-domain": "a.com"}, - "summary": {"total-successful-session-count": 1, "total-failure-session-count": 0}, - }], - }).encode("utf-8") + report = json.dumps( + { + "organization-name": "Org", + "date-range": { + "start-datetime": "2024-01-01", + "end-datetime": "2024-01-02", + }, + "contact-info": "a@b.com", + "report-id": "r1", + "policies": [ + { + "policy": {"policy-type": "tlsa", "policy-domain": "a.com"}, + "summary": { + "total-successful-session-count": 1, + "total-failure-session-count": 0, + }, + } + ], + } + ).encode("utf-8") result = parsedmarc.parse_smtp_tls_report_json(report) self.assertEqual(result["organization_name"], "Org") @@ -3706,13 +3772,18 @@ class TestEnvVarConfig(unittest.TestCase): def testParseSmtpTlsReportJsonPoliciesNotList(self): """Non-list policies raises InvalidSMTPTLSReport""" - report = json.dumps({ - "organization-name": "Org", - "date-range": {"start-datetime": "2024-01-01", "end-datetime": "2024-01-02"}, - "contact-info": "a@b.com", - "report-id": "r1", - "policies": "not-a-list", - }) + report = json.dumps( + { + "organization-name": "Org", + "date-range": { + "start-datetime": "2024-01-01", + "end-datetime": "2024-01-02", + }, + "contact-info": "a@b.com", + "report-id": "r1", + "policies": "not-a-list", + } + ) with self.assertRaises(parsedmarc.InvalidSMTPTLSReport): parsedmarc.parse_smtp_tls_report_json(report) @@ -3789,7 +3860,9 @@ class TestEnvVarConfig(unittest.TestCase): """ report = parsedmarc.parse_aggregate_report_xml(xml, offline=True) - self.assertEqual(report["records"][0]["policy_evaluated"]["disposition"], "pass") + self.assertEqual( + report["records"][0]["policy_evaluated"]["disposition"], "pass" + ) def testAggregateReportMultipleRecords(self): """Reports with multiple records are all parsed""" @@ -3839,7 +3912,8 @@ class TestEnvVarConfig(unittest.TestCase): """CSV rows include np, testing, discovery_method columns""" result = parsedmarc.parse_report_file( "samples/aggregate/dmarcbis-draft-sample.xml", - always_use_local_files=True, offline=True, + always_use_local_files=True, + offline=True, ) report = cast(AggregateReport, result["report"]) rows = parsedmarc.parsed_aggregate_reports_to_csv_rows(report) @@ -4047,9 +4121,7 @@ class TestEnvVarConfig(unittest.TestCase): def testHumanTimestampToDatetimeNegativeZero(self): """-0000 timezone is handled""" - dt = parsedmarc.utils.human_timestamp_to_datetime( - "2024-01-01 00:00:00 -0000" - ) + dt = parsedmarc.utils.human_timestamp_to_datetime("2024-01-01 00:00:00 -0000") self.assertEqual(dt.year, 2024) def testHumanTimestampToUnixTimestamp(self): @@ -4099,14 +4171,19 @@ class TestEnvVarConfig(unittest.TestCase): def testGetIpAddressInfoCache(self): """get_ip_address_info uses cache on second call""" from expiringdict import ExpiringDict + cache = ExpiringDict(max_len=100, max_age_seconds=60) with patch("parsedmarc.utils.get_reverse_dns", return_value="dns.google"): info1 = parsedmarc.utils.get_ip_address_info( - "8.8.8.8", offline=False, cache=cache, + "8.8.8.8", + offline=False, + cache=cache, always_use_local_files=True, ) self.assertIn("8.8.8.8", cache) - info2 = parsedmarc.utils.get_ip_address_info("8.8.8.8", offline=False, cache=cache) + info2 = parsedmarc.utils.get_ip_address_info( + "8.8.8.8", offline=False, cache=cache + ) self.assertEqual(info1["ip_address"], info2["ip_address"]) self.assertEqual(info2["reverse_dns"], "dns.google") @@ -4173,6 +4250,7 @@ class TestEnvVarConfig(unittest.TestCase): def testWebhookClientInit(self): """WebhookClient initializes with correct attributes""" from parsedmarc.webhook import WebhookClient + client = WebhookClient( aggregate_url="http://agg.example.com", failure_url="http://fail.example.com", @@ -4186,18 +4264,26 @@ class TestEnvVarConfig(unittest.TestCase): def testWebhookClientSaveMethods(self): """WebhookClient save methods call _send_to_webhook""" from parsedmarc.webhook import WebhookClient + client = WebhookClient("http://a", "http://f", "http://t") client.session = MagicMock() client.save_aggregate_report_to_webhook('{"test": 1}') - client.session.post.assert_called_with("http://a", data='{"test": 1}', timeout=60) + client.session.post.assert_called_with( + "http://a", data='{"test": 1}', timeout=60 + ) client.save_failure_report_to_webhook('{"fail": 1}') - client.session.post.assert_called_with("http://f", data='{"fail": 1}', timeout=60) + client.session.post.assert_called_with( + "http://f", data='{"fail": 1}', timeout=60 + ) client.save_smtp_tls_report_to_webhook('{"tls": 1}') - client.session.post.assert_called_with("http://t", data='{"tls": 1}', timeout=60) + client.session.post.assert_called_with( + "http://t", data='{"tls": 1}', timeout=60 + ) def testWebhookBackwardCompatAlias(self): """WebhookClient forensic alias points to failure method""" from parsedmarc.webhook import WebhookClient + self.assertIs( WebhookClient.save_forensic_report_to_webhook, # type: ignore[attr-defined] WebhookClient.save_failure_report_to_webhook, @@ -4206,6 +4292,7 @@ class TestEnvVarConfig(unittest.TestCase): def testKafkaStripMetadata(self): """KafkaClient.strip_metadata extracts metadata to root""" from parsedmarc.kafkaclient import KafkaClient + report = { "report_metadata": { "org_name": "TestOrg", @@ -4225,6 +4312,7 @@ class TestEnvVarConfig(unittest.TestCase): def testKafkaGenerateDateRange(self): """KafkaClient.generate_date_range generates date range list""" from parsedmarc.kafkaclient import KafkaClient + report = { "report_metadata": { "begin_date": "2024-01-01 00:00:00", @@ -4239,6 +4327,7 @@ class TestEnvVarConfig(unittest.TestCase): def testSplunkHECClientInit(self): """HECClient initializes with correct URL and headers""" from parsedmarc.splunk import HECClient + client = HECClient( url="https://splunk.example.com:8088", access_token="my-token", @@ -4253,6 +4342,7 @@ class TestEnvVarConfig(unittest.TestCase): def testSplunkHECClientStripTokenPrefix(self): """HECClient strips 'Splunk ' prefix from token""" from parsedmarc.splunk import HECClient + client = HECClient( url="https://splunk.example.com", access_token="Splunk my-token", @@ -4263,6 +4353,7 @@ class TestEnvVarConfig(unittest.TestCase): def testSplunkBackwardCompatAlias(self): """HECClient forensic alias points to failure method""" from parsedmarc.splunk import HECClient + self.assertIs( HECClient.save_forensic_reports_to_splunk, # type: ignore[attr-defined] HECClient.save_failure_reports_to_splunk, @@ -4271,6 +4362,7 @@ class TestEnvVarConfig(unittest.TestCase): def testSyslogClientUdpInit(self): """SyslogClient creates UDP handler""" from parsedmarc.syslog import SyslogClient + client = SyslogClient("localhost", 514, protocol="udp") self.assertEqual(client.server_name, "localhost") self.assertEqual(client.server_port, 514) @@ -4279,12 +4371,14 @@ class TestEnvVarConfig(unittest.TestCase): def testSyslogClientInvalidProtocol(self): """SyslogClient with invalid protocol raises ValueError""" from parsedmarc.syslog import SyslogClient + with self.assertRaises(ValueError): SyslogClient("localhost", 514, protocol="invalid") def testSyslogBackwardCompatAlias(self): """SyslogClient forensic alias points to failure method""" from parsedmarc.syslog import SyslogClient + self.assertIs( SyslogClient.save_forensic_report_to_syslog, # type: ignore[attr-defined] SyslogClient.save_failure_report_to_syslog, @@ -4293,6 +4387,7 @@ class TestEnvVarConfig(unittest.TestCase): def testLogAnalyticsConfig(self): """LogAnalyticsConfig stores all fields""" from parsedmarc.loganalytics import LogAnalyticsConfig + config = LogAnalyticsConfig( client_id="cid", client_secret="csec", @@ -4315,6 +4410,7 @@ class TestEnvVarConfig(unittest.TestCase): def testLogAnalyticsClientValidationError(self): """LogAnalyticsClient raises on missing required config""" from parsedmarc.loganalytics import LogAnalyticsClient, LogAnalyticsException + with self.assertRaises(LogAnalyticsException): LogAnalyticsClient( client_id="", @@ -4329,18 +4425,34 @@ class TestEnvVarConfig(unittest.TestCase): def testSmtpTlsCsvRows(self): """parsed_smtp_tls_reports_to_csv_rows produces correct rows""" - report_json = json.dumps({ - "organization-name": "Org", - "date-range": {"start-datetime": "2024-01-01T00:00:00Z", "end-datetime": "2024-01-02T00:00:00Z"}, - "contact-info": "a@b.com", - "report-id": "r1", - "policies": [{ - "policy": {"policy-type": "sts", "policy-domain": "example.com", - "policy-string": ["v: STSv1"], "mx-host-pattern": ["*.example.com"]}, - "summary": {"total-successful-session-count": 10, "total-failure-session-count": 1}, - "failure-details": [{"result-type": "cert-expired", "failed-session-count": 1}], - }], - }) + report_json = json.dumps( + { + "organization-name": "Org", + "date-range": { + "start-datetime": "2024-01-01T00:00:00Z", + "end-datetime": "2024-01-02T00:00:00Z", + }, + "contact-info": "a@b.com", + "report-id": "r1", + "policies": [ + { + "policy": { + "policy-type": "sts", + "policy-domain": "example.com", + "policy-string": ["v: STSv1"], + "mx-host-pattern": ["*.example.com"], + }, + "summary": { + "total-successful-session-count": 10, + "total-failure-session-count": 1, + }, + "failure-details": [ + {"result-type": "cert-expired", "failed-session-count": 1} + ], + } + ], + } + ) parsed = parsedmarc.parse_smtp_tls_report_json(report_json) rows = parsedmarc.parsed_smtp_tls_reports_to_csv_rows(parsed) self.assertTrue(len(rows) >= 2) @@ -4351,7 +4463,8 @@ class TestEnvVarConfig(unittest.TestCase): """parsed_aggregate_reports_to_csv_rows handles list of reports""" result = parsedmarc.parse_report_file( "samples/aggregate/dmarcbis-draft-sample.xml", - always_use_local_files=True, offline=True, + always_use_local_files=True, + offline=True, ) report = cast(AggregateReport, result["report"]) # Pass as a list @@ -4365,10 +4478,18 @@ class TestEnvVarConfig(unittest.TestCase): def testExceptionHierarchy(self): """Exception class hierarchy is correct""" self.assertTrue(issubclass(parsedmarc.ParserError, RuntimeError)) - self.assertTrue(issubclass(parsedmarc.InvalidDMARCReport, parsedmarc.ParserError)) - self.assertTrue(issubclass(parsedmarc.InvalidAggregateReport, parsedmarc.InvalidDMARCReport)) - self.assertTrue(issubclass(parsedmarc.InvalidFailureReport, parsedmarc.InvalidDMARCReport)) - self.assertTrue(issubclass(parsedmarc.InvalidSMTPTLSReport, parsedmarc.ParserError)) + self.assertTrue( + issubclass(parsedmarc.InvalidDMARCReport, parsedmarc.ParserError) + ) + self.assertTrue( + issubclass(parsedmarc.InvalidAggregateReport, parsedmarc.InvalidDMARCReport) + ) + self.assertTrue( + issubclass(parsedmarc.InvalidFailureReport, parsedmarc.InvalidDMARCReport) + ) + self.assertTrue( + issubclass(parsedmarc.InvalidSMTPTLSReport, parsedmarc.ParserError) + ) self.assertIs(parsedmarc.InvalidForensicReport, parsedmarc.InvalidFailureReport) def testAggregateReportNormalization(self): @@ -4412,6 +4533,7 @@ class TestEnvVarConfig(unittest.TestCase): def testGelfBackwardCompatAlias(self): """GelfClient forensic alias points to failure method""" from parsedmarc.gelf import GelfClient + self.assertIs( GelfClient.save_forensic_report_to_gelf, # type: ignore[attr-defined] GelfClient.save_failure_report_to_gelf, @@ -4420,6 +4542,7 @@ class TestEnvVarConfig(unittest.TestCase): def testS3BackwardCompatAlias(self): """S3Client forensic alias points to failure method""" from parsedmarc.s3 import S3Client + self.assertIs( S3Client.save_forensic_report_to_s3, # type: ignore[attr-defined] S3Client.save_failure_report_to_s3, @@ -4428,6 +4551,7 @@ class TestEnvVarConfig(unittest.TestCase): def testKafkaBackwardCompatAlias(self): """KafkaClient forensic alias points to failure method""" from parsedmarc.kafkaclient import KafkaClient + self.assertIs( KafkaClient.save_forensic_reports_to_kafka, # type: ignore[attr-defined] KafkaClient.save_failure_reports_to_kafka, @@ -4455,7 +4579,9 @@ class TestEnvVarConfig(unittest.TestCase): with open(sample_path, "rb") as f: data = f.read() report = parsedmarc.parse_aggregate_report_file( - data, offline=True, always_use_local_files=True, + data, + offline=True, + always_use_local_files=True, ) self.assertEqual(report["report_metadata"]["org_name"], "Sample Reporter") self.assertEqual(report["policy_published"]["domain"], "example.com") @@ -4470,9 +4596,12 @@ class TestEnvVarConfig(unittest.TestCase): continue print("Testing {0}: ".format(sample_path), end="") with self.subTest(sample=sample_path): - parsed_report = cast(AggregateReport, parsedmarc.parse_report_file( - sample_path, always_use_local_files=True, offline=OFFLINE_MODE - )["report"]) + parsed_report = cast( + AggregateReport, + parsedmarc.parse_report_file( + sample_path, always_use_local_files=True, offline=OFFLINE_MODE + )["report"], + ) parsedmarc.parsed_aggregate_reports_to_csv(parsed_report) print("Passed!") @@ -4492,9 +4621,12 @@ class TestEnvVarConfig(unittest.TestCase): for sample_path in sample_paths: print("Testing CSV for {0}: ".format(sample_path), end="") with self.subTest(sample=sample_path): - parsed_report = cast(FailureReport, parsedmarc.parse_report_file( - sample_path, offline=OFFLINE_MODE - )["report"]) + parsed_report = cast( + FailureReport, + parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)[ + "report" + ], + ) csv_output = parsedmarc.parsed_failure_reports_to_csv(parsed_report) self.assertIsNotNone(csv_output) self.assertIn(",", csv_output)