mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-26 08:22:45 +00:00
9.3.1
Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path` - Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec` - Splunk HEC `skip_certificate_verification` now works correctly with self-signed certificates - SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict - Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error") - Enhanced error handling for output client initialization
This commit is contained in:
366
tests.py
366
tests.py
@@ -184,7 +184,9 @@ class Test(unittest.TestCase):
|
||||
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
|
||||
)
|
||||
assert result["report_type"] == "aggregate"
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(cast(AggregateReport, result["report"]))
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(
|
||||
cast(AggregateReport, result["report"])
|
||||
)
|
||||
print("Passed!")
|
||||
|
||||
def testEmptySample(self):
|
||||
@@ -205,11 +207,11 @@ class Test(unittest.TestCase):
|
||||
sample_content, offline=OFFLINE_MODE
|
||||
)
|
||||
assert email_result["report_type"] == "failure"
|
||||
result = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "failure"
|
||||
parsedmarc.parsed_failure_reports_to_csv(cast(FailureReport, result["report"]))
|
||||
parsedmarc.parsed_failure_reports_to_csv(
|
||||
cast(FailureReport, result["report"])
|
||||
)
|
||||
print("Passed!")
|
||||
|
||||
def testFailureReportBackwardCompat(self):
|
||||
@@ -234,9 +236,7 @@ class Test(unittest.TestCase):
|
||||
def testDMARCbisDraftSample(self):
|
||||
"""Test parsing the sample report from the DMARCbis aggregate draft"""
|
||||
print()
|
||||
sample_path = (
|
||||
"samples/aggregate/dmarcbis-draft-sample.xml"
|
||||
)
|
||||
sample_path = "samples/aggregate/dmarcbis-draft-sample.xml"
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
result = parsedmarc.parse_report_file(
|
||||
sample_path, always_use_local_files=True, offline=True
|
||||
@@ -252,15 +252,9 @@ class Test(unittest.TestCase):
|
||||
# Verify report_metadata
|
||||
metadata = report["report_metadata"]
|
||||
self.assertEqual(metadata["org_name"], "Sample Reporter")
|
||||
self.assertEqual(
|
||||
metadata["org_email"], "report_sender@example-reporter.com"
|
||||
)
|
||||
self.assertEqual(
|
||||
metadata["org_extra_contact_info"], "..."
|
||||
)
|
||||
self.assertEqual(
|
||||
metadata["report_id"], "3v98abbp8ya9n3va8yr8oa3ya"
|
||||
)
|
||||
self.assertEqual(metadata["org_email"], "report_sender@example-reporter.com")
|
||||
self.assertEqual(metadata["org_extra_contact_info"], "...")
|
||||
self.assertEqual(metadata["report_id"], "3v98abbp8ya9n3va8yr8oa3ya")
|
||||
self.assertEqual(
|
||||
metadata["generator"],
|
||||
"Example DMARC Aggregate Reporter v1.2",
|
||||
@@ -286,9 +280,7 @@ class Test(unittest.TestCase):
|
||||
rec = report["records"][0]
|
||||
self.assertEqual(rec["source"]["ip_address"], "192.0.2.123")
|
||||
self.assertEqual(rec["count"], 123)
|
||||
self.assertEqual(
|
||||
rec["policy_evaluated"]["disposition"], "pass"
|
||||
)
|
||||
self.assertEqual(rec["policy_evaluated"]["disposition"], "pass")
|
||||
self.assertEqual(rec["policy_evaluated"]["dkim"], "pass")
|
||||
self.assertEqual(rec["policy_evaluated"]["spf"], "fail")
|
||||
|
||||
@@ -319,8 +311,7 @@ class Test(unittest.TestCase):
|
||||
"""Test that RFC 7489 reports have None for DMARCbis-only fields"""
|
||||
print()
|
||||
sample_path = (
|
||||
"samples/aggregate/"
|
||||
"example.net!example.com!1529366400!1529452799.xml"
|
||||
"samples/aggregate/example.net!example.com!1529366400!1529452799.xml"
|
||||
)
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
result = parsedmarc.parse_report_file(
|
||||
@@ -370,11 +361,11 @@ class Test(unittest.TestCase):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
with self.subTest(sample=sample_path):
|
||||
result = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "smtp_tls"
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(cast(SMTPTLSReport, result["report"]))
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(
|
||||
cast(SMTPTLSReport, result["report"])
|
||||
)
|
||||
print("Passed!")
|
||||
|
||||
def testOpenSearchSigV4RequiresRegion(self):
|
||||
@@ -3314,7 +3305,11 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": None,
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "pass",
|
||||
"spf": "pass",
|
||||
},
|
||||
},
|
||||
"identifiers": {"header_from": "example.com"},
|
||||
"auth_results": {"dkim": [], "spf": []},
|
||||
@@ -3328,7 +3323,11 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "5",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "fail"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "pass",
|
||||
"spf": "fail",
|
||||
},
|
||||
},
|
||||
"identifiers": {"header_from": "example.com"},
|
||||
"auth_results": {},
|
||||
@@ -3390,9 +3389,16 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "pass",
|
||||
"spf": "pass",
|
||||
},
|
||||
},
|
||||
"identities": {
|
||||
"header_from": "Example.COM",
|
||||
"envelope_from": "example.com",
|
||||
},
|
||||
"identities": {"header_from": "Example.COM", "envelope_from": "example.com"},
|
||||
"auth_results": {"dkim": [], "spf": []},
|
||||
}
|
||||
result = parsedmarc._parse_report_record(record, offline=True)
|
||||
@@ -3405,7 +3411,11 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "fail", "spf": "fail"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "fail",
|
||||
"spf": "fail",
|
||||
},
|
||||
},
|
||||
"identifiers": {"header_from": "example.com"},
|
||||
"auth_results": {
|
||||
@@ -3425,7 +3435,11 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "fail", "spf": "fail"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "fail",
|
||||
"spf": "fail",
|
||||
},
|
||||
},
|
||||
"identifiers": {"header_from": "example.com"},
|
||||
"auth_results": {
|
||||
@@ -3445,19 +3459,37 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "pass",
|
||||
"spf": "pass",
|
||||
},
|
||||
},
|
||||
"identifiers": {"header_from": "example.com"},
|
||||
"auth_results": {
|
||||
"dkim": [{"domain": "example.com", "selector": "s1",
|
||||
"result": "pass", "human_result": "good key"}],
|
||||
"spf": [{"domain": "example.com", "scope": "mfrom",
|
||||
"result": "pass", "human_result": "sender valid"}],
|
||||
"dkim": [
|
||||
{
|
||||
"domain": "example.com",
|
||||
"selector": "s1",
|
||||
"result": "pass",
|
||||
"human_result": "good key",
|
||||
}
|
||||
],
|
||||
"spf": [
|
||||
{
|
||||
"domain": "example.com",
|
||||
"scope": "mfrom",
|
||||
"result": "pass",
|
||||
"human_result": "sender valid",
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
result = parsedmarc._parse_report_record(record, offline=True)
|
||||
self.assertEqual(result["auth_results"]["dkim"][0]["human_result"], "good key")
|
||||
self.assertEqual(result["auth_results"]["spf"][0]["human_result"], "sender valid")
|
||||
self.assertEqual(
|
||||
result["auth_results"]["spf"][0]["human_result"], "sender valid"
|
||||
)
|
||||
|
||||
def testParseReportRecordEnvelopeFromFallback(self):
|
||||
"""envelope_from falls back to last SPF domain when missing"""
|
||||
@@ -3465,12 +3497,18 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "pass",
|
||||
"spf": "pass",
|
||||
},
|
||||
},
|
||||
"identifiers": {"header_from": "example.com"},
|
||||
"auth_results": {
|
||||
"dkim": [],
|
||||
"spf": [{"domain": "Bounce.Example.COM", "scope": "mfrom", "result": "pass"}],
|
||||
"spf": [
|
||||
{"domain": "Bounce.Example.COM", "scope": "mfrom", "result": "pass"}
|
||||
],
|
||||
},
|
||||
}
|
||||
result = parsedmarc._parse_report_record(record, offline=True)
|
||||
@@ -3482,7 +3520,11 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "pass",
|
||||
"spf": "pass",
|
||||
},
|
||||
},
|
||||
"identifiers": {
|
||||
"header_from": "example.com",
|
||||
@@ -3490,7 +3532,9 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
},
|
||||
"auth_results": {
|
||||
"dkim": [],
|
||||
"spf": [{"domain": "SPF.Example.COM", "scope": "mfrom", "result": "pass"}],
|
||||
"spf": [
|
||||
{"domain": "SPF.Example.COM", "scope": "mfrom", "result": "pass"}
|
||||
],
|
||||
},
|
||||
}
|
||||
result = parsedmarc._parse_report_record(record, offline=True)
|
||||
@@ -3502,7 +3546,11 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "pass"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "pass",
|
||||
"spf": "pass",
|
||||
},
|
||||
},
|
||||
"identifiers": {
|
||||
"header_from": "example.com",
|
||||
@@ -3520,7 +3568,11 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"row": {
|
||||
"source_ip": "192.0.2.1",
|
||||
"count": "1",
|
||||
"policy_evaluated": {"disposition": "none", "dkim": "pass", "spf": "fail"},
|
||||
"policy_evaluated": {
|
||||
"disposition": "none",
|
||||
"dkim": "pass",
|
||||
"spf": "fail",
|
||||
},
|
||||
},
|
||||
"identifiers": {"header_from": "example.com"},
|
||||
"auth_results": {"dkim": [], "spf": []},
|
||||
@@ -3645,7 +3697,9 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
}
|
||||
result = parsedmarc._parse_smtp_tls_report_policy(policy)
|
||||
self.assertEqual(len(result["failure_details"]), 1)
|
||||
self.assertEqual(result["failure_details"][0]["result_type"], "certificate-expired")
|
||||
self.assertEqual(
|
||||
result["failure_details"][0]["result_type"], "certificate-expired"
|
||||
)
|
||||
|
||||
def testParseSmtpTlsReportPolicyMissingField(self):
|
||||
"""Missing required policy field raises InvalidSMTPTLSReport"""
|
||||
@@ -3657,27 +3711,29 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
# ============================================================
|
||||
def testParseSmtpTlsReportJsonValid(self):
|
||||
"""Valid SMTP TLS JSON report parses correctly"""
|
||||
report = json.dumps({
|
||||
"organization-name": "Example Corp",
|
||||
"date-range": {
|
||||
"start-datetime": "2024-01-01T00:00:00Z",
|
||||
"end-datetime": "2024-01-02T00:00:00Z",
|
||||
},
|
||||
"contact-info": "admin@example.com",
|
||||
"report-id": "report-123",
|
||||
"policies": [
|
||||
{
|
||||
"policy": {
|
||||
"policy-type": "sts",
|
||||
"policy-domain": "example.com",
|
||||
},
|
||||
"summary": {
|
||||
"total-successful-session-count": 50,
|
||||
"total-failure-session-count": 0,
|
||||
},
|
||||
}
|
||||
],
|
||||
})
|
||||
report = json.dumps(
|
||||
{
|
||||
"organization-name": "Example Corp",
|
||||
"date-range": {
|
||||
"start-datetime": "2024-01-01T00:00:00Z",
|
||||
"end-datetime": "2024-01-02T00:00:00Z",
|
||||
},
|
||||
"contact-info": "admin@example.com",
|
||||
"report-id": "report-123",
|
||||
"policies": [
|
||||
{
|
||||
"policy": {
|
||||
"policy-type": "sts",
|
||||
"policy-domain": "example.com",
|
||||
},
|
||||
"summary": {
|
||||
"total-successful-session-count": 50,
|
||||
"total-failure-session-count": 0,
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
result = parsedmarc.parse_smtp_tls_report_json(report)
|
||||
self.assertEqual(result["organization_name"], "Example Corp")
|
||||
self.assertEqual(result["report_id"], "report-123")
|
||||
@@ -3685,16 +3741,26 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
|
||||
def testParseSmtpTlsReportJsonBytes(self):
|
||||
"""SMTP TLS report as bytes parses correctly"""
|
||||
report = json.dumps({
|
||||
"organization-name": "Org",
|
||||
"date-range": {"start-datetime": "2024-01-01", "end-datetime": "2024-01-02"},
|
||||
"contact-info": "a@b.com",
|
||||
"report-id": "r1",
|
||||
"policies": [{
|
||||
"policy": {"policy-type": "tlsa", "policy-domain": "a.com"},
|
||||
"summary": {"total-successful-session-count": 1, "total-failure-session-count": 0},
|
||||
}],
|
||||
}).encode("utf-8")
|
||||
report = json.dumps(
|
||||
{
|
||||
"organization-name": "Org",
|
||||
"date-range": {
|
||||
"start-datetime": "2024-01-01",
|
||||
"end-datetime": "2024-01-02",
|
||||
},
|
||||
"contact-info": "a@b.com",
|
||||
"report-id": "r1",
|
||||
"policies": [
|
||||
{
|
||||
"policy": {"policy-type": "tlsa", "policy-domain": "a.com"},
|
||||
"summary": {
|
||||
"total-successful-session-count": 1,
|
||||
"total-failure-session-count": 0,
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
).encode("utf-8")
|
||||
result = parsedmarc.parse_smtp_tls_report_json(report)
|
||||
self.assertEqual(result["organization_name"], "Org")
|
||||
|
||||
@@ -3706,13 +3772,18 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
|
||||
def testParseSmtpTlsReportJsonPoliciesNotList(self):
|
||||
"""Non-list policies raises InvalidSMTPTLSReport"""
|
||||
report = json.dumps({
|
||||
"organization-name": "Org",
|
||||
"date-range": {"start-datetime": "2024-01-01", "end-datetime": "2024-01-02"},
|
||||
"contact-info": "a@b.com",
|
||||
"report-id": "r1",
|
||||
"policies": "not-a-list",
|
||||
})
|
||||
report = json.dumps(
|
||||
{
|
||||
"organization-name": "Org",
|
||||
"date-range": {
|
||||
"start-datetime": "2024-01-01",
|
||||
"end-datetime": "2024-01-02",
|
||||
},
|
||||
"contact-info": "a@b.com",
|
||||
"report-id": "r1",
|
||||
"policies": "not-a-list",
|
||||
}
|
||||
)
|
||||
with self.assertRaises(parsedmarc.InvalidSMTPTLSReport):
|
||||
parsedmarc.parse_smtp_tls_report_json(report)
|
||||
|
||||
@@ -3789,7 +3860,9 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
</record>
|
||||
</feedback>"""
|
||||
report = parsedmarc.parse_aggregate_report_xml(xml, offline=True)
|
||||
self.assertEqual(report["records"][0]["policy_evaluated"]["disposition"], "pass")
|
||||
self.assertEqual(
|
||||
report["records"][0]["policy_evaluated"]["disposition"], "pass"
|
||||
)
|
||||
|
||||
def testAggregateReportMultipleRecords(self):
|
||||
"""Reports with multiple records are all parsed"""
|
||||
@@ -3839,7 +3912,8 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"""CSV rows include np, testing, discovery_method columns"""
|
||||
result = parsedmarc.parse_report_file(
|
||||
"samples/aggregate/dmarcbis-draft-sample.xml",
|
||||
always_use_local_files=True, offline=True,
|
||||
always_use_local_files=True,
|
||||
offline=True,
|
||||
)
|
||||
report = cast(AggregateReport, result["report"])
|
||||
rows = parsedmarc.parsed_aggregate_reports_to_csv_rows(report)
|
||||
@@ -4047,9 +4121,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
|
||||
def testHumanTimestampToDatetimeNegativeZero(self):
|
||||
"""-0000 timezone is handled"""
|
||||
dt = parsedmarc.utils.human_timestamp_to_datetime(
|
||||
"2024-01-01 00:00:00 -0000"
|
||||
)
|
||||
dt = parsedmarc.utils.human_timestamp_to_datetime("2024-01-01 00:00:00 -0000")
|
||||
self.assertEqual(dt.year, 2024)
|
||||
|
||||
def testHumanTimestampToUnixTimestamp(self):
|
||||
@@ -4099,14 +4171,19 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testGetIpAddressInfoCache(self):
|
||||
"""get_ip_address_info uses cache on second call"""
|
||||
from expiringdict import ExpiringDict
|
||||
|
||||
cache = ExpiringDict(max_len=100, max_age_seconds=60)
|
||||
with patch("parsedmarc.utils.get_reverse_dns", return_value="dns.google"):
|
||||
info1 = parsedmarc.utils.get_ip_address_info(
|
||||
"8.8.8.8", offline=False, cache=cache,
|
||||
"8.8.8.8",
|
||||
offline=False,
|
||||
cache=cache,
|
||||
always_use_local_files=True,
|
||||
)
|
||||
self.assertIn("8.8.8.8", cache)
|
||||
info2 = parsedmarc.utils.get_ip_address_info("8.8.8.8", offline=False, cache=cache)
|
||||
info2 = parsedmarc.utils.get_ip_address_info(
|
||||
"8.8.8.8", offline=False, cache=cache
|
||||
)
|
||||
self.assertEqual(info1["ip_address"], info2["ip_address"])
|
||||
self.assertEqual(info2["reverse_dns"], "dns.google")
|
||||
|
||||
@@ -4173,6 +4250,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testWebhookClientInit(self):
|
||||
"""WebhookClient initializes with correct attributes"""
|
||||
from parsedmarc.webhook import WebhookClient
|
||||
|
||||
client = WebhookClient(
|
||||
aggregate_url="http://agg.example.com",
|
||||
failure_url="http://fail.example.com",
|
||||
@@ -4186,18 +4264,26 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testWebhookClientSaveMethods(self):
|
||||
"""WebhookClient save methods call _send_to_webhook"""
|
||||
from parsedmarc.webhook import WebhookClient
|
||||
|
||||
client = WebhookClient("http://a", "http://f", "http://t")
|
||||
client.session = MagicMock()
|
||||
client.save_aggregate_report_to_webhook('{"test": 1}')
|
||||
client.session.post.assert_called_with("http://a", data='{"test": 1}', timeout=60)
|
||||
client.session.post.assert_called_with(
|
||||
"http://a", data='{"test": 1}', timeout=60
|
||||
)
|
||||
client.save_failure_report_to_webhook('{"fail": 1}')
|
||||
client.session.post.assert_called_with("http://f", data='{"fail": 1}', timeout=60)
|
||||
client.session.post.assert_called_with(
|
||||
"http://f", data='{"fail": 1}', timeout=60
|
||||
)
|
||||
client.save_smtp_tls_report_to_webhook('{"tls": 1}')
|
||||
client.session.post.assert_called_with("http://t", data='{"tls": 1}', timeout=60)
|
||||
client.session.post.assert_called_with(
|
||||
"http://t", data='{"tls": 1}', timeout=60
|
||||
)
|
||||
|
||||
def testWebhookBackwardCompatAlias(self):
|
||||
"""WebhookClient forensic alias points to failure method"""
|
||||
from parsedmarc.webhook import WebhookClient
|
||||
|
||||
self.assertIs(
|
||||
WebhookClient.save_forensic_report_to_webhook, # type: ignore[attr-defined]
|
||||
WebhookClient.save_failure_report_to_webhook,
|
||||
@@ -4206,6 +4292,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testKafkaStripMetadata(self):
|
||||
"""KafkaClient.strip_metadata extracts metadata to root"""
|
||||
from parsedmarc.kafkaclient import KafkaClient
|
||||
|
||||
report = {
|
||||
"report_metadata": {
|
||||
"org_name": "TestOrg",
|
||||
@@ -4225,6 +4312,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testKafkaGenerateDateRange(self):
|
||||
"""KafkaClient.generate_date_range generates date range list"""
|
||||
from parsedmarc.kafkaclient import KafkaClient
|
||||
|
||||
report = {
|
||||
"report_metadata": {
|
||||
"begin_date": "2024-01-01 00:00:00",
|
||||
@@ -4239,6 +4327,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testSplunkHECClientInit(self):
|
||||
"""HECClient initializes with correct URL and headers"""
|
||||
from parsedmarc.splunk import HECClient
|
||||
|
||||
client = HECClient(
|
||||
url="https://splunk.example.com:8088",
|
||||
access_token="my-token",
|
||||
@@ -4253,6 +4342,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testSplunkHECClientStripTokenPrefix(self):
|
||||
"""HECClient strips 'Splunk ' prefix from token"""
|
||||
from parsedmarc.splunk import HECClient
|
||||
|
||||
client = HECClient(
|
||||
url="https://splunk.example.com",
|
||||
access_token="Splunk my-token",
|
||||
@@ -4263,6 +4353,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testSplunkBackwardCompatAlias(self):
|
||||
"""HECClient forensic alias points to failure method"""
|
||||
from parsedmarc.splunk import HECClient
|
||||
|
||||
self.assertIs(
|
||||
HECClient.save_forensic_reports_to_splunk, # type: ignore[attr-defined]
|
||||
HECClient.save_failure_reports_to_splunk,
|
||||
@@ -4271,6 +4362,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testSyslogClientUdpInit(self):
|
||||
"""SyslogClient creates UDP handler"""
|
||||
from parsedmarc.syslog import SyslogClient
|
||||
|
||||
client = SyslogClient("localhost", 514, protocol="udp")
|
||||
self.assertEqual(client.server_name, "localhost")
|
||||
self.assertEqual(client.server_port, 514)
|
||||
@@ -4279,12 +4371,14 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testSyslogClientInvalidProtocol(self):
|
||||
"""SyslogClient with invalid protocol raises ValueError"""
|
||||
from parsedmarc.syslog import SyslogClient
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
SyslogClient("localhost", 514, protocol="invalid")
|
||||
|
||||
def testSyslogBackwardCompatAlias(self):
|
||||
"""SyslogClient forensic alias points to failure method"""
|
||||
from parsedmarc.syslog import SyslogClient
|
||||
|
||||
self.assertIs(
|
||||
SyslogClient.save_forensic_report_to_syslog, # type: ignore[attr-defined]
|
||||
SyslogClient.save_failure_report_to_syslog,
|
||||
@@ -4293,6 +4387,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testLogAnalyticsConfig(self):
|
||||
"""LogAnalyticsConfig stores all fields"""
|
||||
from parsedmarc.loganalytics import LogAnalyticsConfig
|
||||
|
||||
config = LogAnalyticsConfig(
|
||||
client_id="cid",
|
||||
client_secret="csec",
|
||||
@@ -4315,6 +4410,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testLogAnalyticsClientValidationError(self):
|
||||
"""LogAnalyticsClient raises on missing required config"""
|
||||
from parsedmarc.loganalytics import LogAnalyticsClient, LogAnalyticsException
|
||||
|
||||
with self.assertRaises(LogAnalyticsException):
|
||||
LogAnalyticsClient(
|
||||
client_id="",
|
||||
@@ -4329,18 +4425,34 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
|
||||
def testSmtpTlsCsvRows(self):
|
||||
"""parsed_smtp_tls_reports_to_csv_rows produces correct rows"""
|
||||
report_json = json.dumps({
|
||||
"organization-name": "Org",
|
||||
"date-range": {"start-datetime": "2024-01-01T00:00:00Z", "end-datetime": "2024-01-02T00:00:00Z"},
|
||||
"contact-info": "a@b.com",
|
||||
"report-id": "r1",
|
||||
"policies": [{
|
||||
"policy": {"policy-type": "sts", "policy-domain": "example.com",
|
||||
"policy-string": ["v: STSv1"], "mx-host-pattern": ["*.example.com"]},
|
||||
"summary": {"total-successful-session-count": 10, "total-failure-session-count": 1},
|
||||
"failure-details": [{"result-type": "cert-expired", "failed-session-count": 1}],
|
||||
}],
|
||||
})
|
||||
report_json = json.dumps(
|
||||
{
|
||||
"organization-name": "Org",
|
||||
"date-range": {
|
||||
"start-datetime": "2024-01-01T00:00:00Z",
|
||||
"end-datetime": "2024-01-02T00:00:00Z",
|
||||
},
|
||||
"contact-info": "a@b.com",
|
||||
"report-id": "r1",
|
||||
"policies": [
|
||||
{
|
||||
"policy": {
|
||||
"policy-type": "sts",
|
||||
"policy-domain": "example.com",
|
||||
"policy-string": ["v: STSv1"],
|
||||
"mx-host-pattern": ["*.example.com"],
|
||||
},
|
||||
"summary": {
|
||||
"total-successful-session-count": 10,
|
||||
"total-failure-session-count": 1,
|
||||
},
|
||||
"failure-details": [
|
||||
{"result-type": "cert-expired", "failed-session-count": 1}
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
parsed = parsedmarc.parse_smtp_tls_report_json(report_json)
|
||||
rows = parsedmarc.parsed_smtp_tls_reports_to_csv_rows(parsed)
|
||||
self.assertTrue(len(rows) >= 2)
|
||||
@@ -4351,7 +4463,8 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
"""parsed_aggregate_reports_to_csv_rows handles list of reports"""
|
||||
result = parsedmarc.parse_report_file(
|
||||
"samples/aggregate/dmarcbis-draft-sample.xml",
|
||||
always_use_local_files=True, offline=True,
|
||||
always_use_local_files=True,
|
||||
offline=True,
|
||||
)
|
||||
report = cast(AggregateReport, result["report"])
|
||||
# Pass as a list
|
||||
@@ -4365,10 +4478,18 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testExceptionHierarchy(self):
|
||||
"""Exception class hierarchy is correct"""
|
||||
self.assertTrue(issubclass(parsedmarc.ParserError, RuntimeError))
|
||||
self.assertTrue(issubclass(parsedmarc.InvalidDMARCReport, parsedmarc.ParserError))
|
||||
self.assertTrue(issubclass(parsedmarc.InvalidAggregateReport, parsedmarc.InvalidDMARCReport))
|
||||
self.assertTrue(issubclass(parsedmarc.InvalidFailureReport, parsedmarc.InvalidDMARCReport))
|
||||
self.assertTrue(issubclass(parsedmarc.InvalidSMTPTLSReport, parsedmarc.ParserError))
|
||||
self.assertTrue(
|
||||
issubclass(parsedmarc.InvalidDMARCReport, parsedmarc.ParserError)
|
||||
)
|
||||
self.assertTrue(
|
||||
issubclass(parsedmarc.InvalidAggregateReport, parsedmarc.InvalidDMARCReport)
|
||||
)
|
||||
self.assertTrue(
|
||||
issubclass(parsedmarc.InvalidFailureReport, parsedmarc.InvalidDMARCReport)
|
||||
)
|
||||
self.assertTrue(
|
||||
issubclass(parsedmarc.InvalidSMTPTLSReport, parsedmarc.ParserError)
|
||||
)
|
||||
self.assertIs(parsedmarc.InvalidForensicReport, parsedmarc.InvalidFailureReport)
|
||||
|
||||
def testAggregateReportNormalization(self):
|
||||
@@ -4412,6 +4533,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testGelfBackwardCompatAlias(self):
|
||||
"""GelfClient forensic alias points to failure method"""
|
||||
from parsedmarc.gelf import GelfClient
|
||||
|
||||
self.assertIs(
|
||||
GelfClient.save_forensic_report_to_gelf, # type: ignore[attr-defined]
|
||||
GelfClient.save_failure_report_to_gelf,
|
||||
@@ -4420,6 +4542,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testS3BackwardCompatAlias(self):
|
||||
"""S3Client forensic alias points to failure method"""
|
||||
from parsedmarc.s3 import S3Client
|
||||
|
||||
self.assertIs(
|
||||
S3Client.save_forensic_report_to_s3, # type: ignore[attr-defined]
|
||||
S3Client.save_failure_report_to_s3,
|
||||
@@ -4428,6 +4551,7 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
def testKafkaBackwardCompatAlias(self):
|
||||
"""KafkaClient forensic alias points to failure method"""
|
||||
from parsedmarc.kafkaclient import KafkaClient
|
||||
|
||||
self.assertIs(
|
||||
KafkaClient.save_forensic_reports_to_kafka, # type: ignore[attr-defined]
|
||||
KafkaClient.save_failure_reports_to_kafka,
|
||||
@@ -4455,7 +4579,9 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
with open(sample_path, "rb") as f:
|
||||
data = f.read()
|
||||
report = parsedmarc.parse_aggregate_report_file(
|
||||
data, offline=True, always_use_local_files=True,
|
||||
data,
|
||||
offline=True,
|
||||
always_use_local_files=True,
|
||||
)
|
||||
self.assertEqual(report["report_metadata"]["org_name"], "Sample Reporter")
|
||||
self.assertEqual(report["policy_published"]["domain"], "example.com")
|
||||
@@ -4470,9 +4596,12 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
with self.subTest(sample=sample_path):
|
||||
parsed_report = cast(AggregateReport, parsedmarc.parse_report_file(
|
||||
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
|
||||
)["report"])
|
||||
parsed_report = cast(
|
||||
AggregateReport,
|
||||
parsedmarc.parse_report_file(
|
||||
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
|
||||
)["report"],
|
||||
)
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
|
||||
print("Passed!")
|
||||
|
||||
@@ -4492,9 +4621,12 @@ class TestEnvVarConfig(unittest.TestCase):
|
||||
for sample_path in sample_paths:
|
||||
print("Testing CSV for {0}: ".format(sample_path), end="")
|
||||
with self.subTest(sample=sample_path):
|
||||
parsed_report = cast(FailureReport, parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"])
|
||||
parsed_report = cast(
|
||||
FailureReport,
|
||||
parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)[
|
||||
"report"
|
||||
],
|
||||
)
|
||||
csv_output = parsedmarc.parsed_failure_reports_to_csv(parsed_report)
|
||||
self.assertIsNotNone(csv_output)
|
||||
self.assertIn(",", csv_output)
|
||||
|
||||
Reference in New Issue
Block a user