diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 7bcc7a1..8748ce6 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -354,8 +354,6 @@ def _parse_report_record( } if "disposition" in policy_evaluated: new_policy_evaluated["disposition"] = policy_evaluated["disposition"] - if new_policy_evaluated["disposition"].strip().lower() == "pass": - new_policy_evaluated["disposition"] = "none" if "dkim" in policy_evaluated: new_policy_evaluated["dkim"] = policy_evaluated["dkim"] if "spf" in policy_evaluated: @@ -826,16 +824,30 @@ def parse_aggregate_report_xml( if "np" in policy_published: if policy_published["np"] is not None: np_ = policy_published["np"] + if np_ not in ("none", "quarantine", "reject"): + logger.warning( + "Invalid np value: {0}".format(np_) + ) new_policy_published["np"] = np_ testing = None if "testing" in policy_published: if policy_published["testing"] is not None: testing = policy_published["testing"] + if testing not in ("n", "y"): + logger.warning( + "Invalid testing value: {0}".format(testing) + ) new_policy_published["testing"] = testing discovery_method = None if "discovery_method" in policy_published: if policy_published["discovery_method"] is not None: discovery_method = policy_published["discovery_method"] + if discovery_method not in ("psl", "treewalk"): + logger.warning( + "Invalid discovery_method value: {0}".format( + discovery_method + ) + ) new_policy_published["discovery_method"] = discovery_method new_report["policy_published"] = new_policy_published diff --git a/tests.py b/tests.py index 1f29852..e2b78f0 100755 --- a/tests.py +++ b/tests.py @@ -190,10 +190,10 @@ class Test(unittest.TestCase): with self.assertRaises(parsedmarc.ParserError): parsedmarc.parse_report_file("samples/empty.xml", offline=OFFLINE_MODE) - def testForensicSamples(self): - """Test sample forensic/ruf/failure DMARC reports""" + def testFailureSamples(self): + """Test sample failure/ruf DMARC reports""" print() - sample_paths = glob("samples/forensic/*.eml") + sample_paths = glob("samples/failure/*.eml") for sample_path in sample_paths: print("Testing {0}: ".format(sample_path), end="") with open(sample_path) as sample_file: @@ -201,12 +201,162 @@ class Test(unittest.TestCase): email_result = parsedmarc.parse_report_email( sample_content, offline=OFFLINE_MODE ) - assert email_result["report_type"] == "forensic" - result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE) - assert result["report_type"] == "forensic" - parsedmarc.parsed_forensic_reports_to_csv(result["report"]) + assert email_result["report_type"] == "failure" + result = parsedmarc.parse_report_file( + sample_path, offline=OFFLINE_MODE + ) + assert result["report_type"] == "failure" + parsedmarc.parsed_failure_reports_to_csv(result["report"]) print("Passed!") + def testFailureReportBackwardCompat(self): + """Test that old forensic function aliases still work""" + self.assertIs( + parsedmarc.parse_forensic_report, + parsedmarc.parse_failure_report, + ) + self.assertIs( + parsedmarc.parsed_forensic_reports_to_csv, + parsedmarc.parsed_failure_reports_to_csv, + ) + self.assertIs( + parsedmarc.parsed_forensic_reports_to_csv_rows, + parsedmarc.parsed_failure_reports_to_csv_rows, + ) + self.assertIs( + parsedmarc.InvalidForensicReport, + parsedmarc.InvalidFailureReport, + ) + + def testDMARCbisDraftSample(self): + """Test parsing the sample report from the DMARCbis aggregate draft""" + print() + sample_path = ( + "samples/aggregate/dmarcbis-draft-sample.xml" + ) + print("Testing {0}: ".format(sample_path), end="") + result = parsedmarc.parse_report_file( + sample_path, always_use_local_files=True, offline=True + ) + report = result["report"] + + # Verify report_type + self.assertEqual(result["report_type"], "aggregate") + + # Verify xml_schema + self.assertEqual(report["xml_schema"], "1.0") + + # Verify report_metadata + metadata = report["report_metadata"] + self.assertEqual(metadata["org_name"], "Sample Reporter") + self.assertEqual( + metadata["org_email"], "report_sender@example-reporter.com" + ) + self.assertEqual( + metadata["org_extra_contact_info"], "..." + ) + self.assertEqual( + metadata["report_id"], "3v98abbp8ya9n3va8yr8oa3ya" + ) + self.assertEqual( + metadata["generator"], + "Example DMARC Aggregate Reporter v1.2", + ) + + # Verify DMARCbis policy_published fields + pp = report["policy_published"] + self.assertEqual(pp["domain"], "example.com") + self.assertEqual(pp["p"], "quarantine") + self.assertEqual(pp["sp"], "none") + self.assertEqual(pp["np"], "none") + self.assertEqual(pp["testing"], "n") + self.assertEqual(pp["discovery_method"], "treewalk") + # adkim/aspf/pct/fo default when not in XML + self.assertEqual(pp["adkim"], "r") + self.assertEqual(pp["aspf"], "r") + self.assertEqual(pp["pct"], "100") + self.assertEqual(pp["fo"], "0") + + # Verify record + self.assertEqual(len(report["records"]), 1) + rec = report["records"][0] + self.assertEqual(rec["source"]["ip_address"], "192.0.2.123") + self.assertEqual(rec["count"], 123) + self.assertEqual( + rec["policy_evaluated"]["disposition"], "pass" + ) + self.assertEqual(rec["policy_evaluated"]["dkim"], "pass") + self.assertEqual(rec["policy_evaluated"]["spf"], "fail") + + # Verify DKIM auth result with human_result + self.assertEqual(len(rec["auth_results"]["dkim"]), 1) + dkim = rec["auth_results"]["dkim"][0] + self.assertEqual(dkim["domain"], "example.com") + self.assertEqual(dkim["selector"], "abc123") + self.assertEqual(dkim["result"], "pass") + self.assertIsNone(dkim["human_result"]) + + # Verify SPF auth result with human_result + self.assertEqual(len(rec["auth_results"]["spf"]), 1) + spf = rec["auth_results"]["spf"][0] + self.assertEqual(spf["domain"], "example.com") + self.assertEqual(spf["result"], "fail") + self.assertIsNone(spf["human_result"]) + + # Verify CSV output includes new fields + csv = parsedmarc.parsed_aggregate_reports_to_csv(report) + header = csv.split("\n")[0] + self.assertIn("np", header.split(",")) + self.assertIn("testing", header.split(",")) + self.assertIn("discovery_method", header.split(",")) + print("Passed!") + + def testDMARCbisFieldsWithRFC7489(self): + """Test that RFC 7489 reports have None for DMARCbis-only fields""" + print() + sample_path = ( + "samples/aggregate/" + "example.net!example.com!1529366400!1529452799.xml" + ) + print("Testing {0}: ".format(sample_path), end="") + result = parsedmarc.parse_report_file( + sample_path, always_use_local_files=True, offline=True + ) + report = result["report"] + pp = report["policy_published"] + + # RFC 7489 fields present + self.assertEqual(pp["pct"], "100") + self.assertEqual(pp["fo"], "0") + + # DMARCbis fields absent (None) + self.assertIsNone(pp["np"]) + self.assertIsNone(pp["testing"]) + self.assertIsNone(pp["discovery_method"]) + + # generator absent (None) + self.assertIsNone(report["report_metadata"]["generator"]) + print("Passed!") + + def testDMARCbisWithExplicitFields(self): + """Test DMARCbis report with explicit testing and discovery_method""" + print() + sample_path = ( + "samples/aggregate/" + "dmarcbis-example.net!example.com!1700000000!1700086399.xml" + ) + print("Testing {0}: ".format(sample_path), end="") + result = parsedmarc.parse_report_file( + sample_path, always_use_local_files=True, offline=True + ) + report = result["report"] + pp = report["policy_published"] + + self.assertEqual(pp["np"], "reject") + self.assertEqual(pp["testing"], "y") + self.assertEqual(pp["discovery_method"], "treewalk") + print("Passed!") + def testSmtpTlsSamples(self): """Test sample SMTP TLS reports""" print()