Align DMARCbis fields with actual XSD schema: testing, discovery_method, generator, human_result; handle namespaced XML

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-02-20 20:40:37 +00:00
committed by Sean Whalen
parent 39f2884acc
commit ac996a8edf
4 changed files with 81 additions and 21 deletions

View File

@@ -74,6 +74,7 @@ text_report_regex = re.compile(r"\s*([a-zA-Z\s]+):\s(.+)", re.MULTILINE)
MAGIC_ZIP = b"\x50\x4b\x03\x04"
MAGIC_GZIP = b"\x1f\x8b"
MAGIC_XML = b"\x3c\x3f\x78\x6d\x6c\x20"
MAGIC_XML_TAG = b"\x3c" # '<' - XML starting with an element tag (no declaration)
MAGIC_JSON = b"\7b"
EMAIL_SAMPLE_CONTENT_TYPES = (
@@ -415,6 +416,7 @@ def _parse_report_record(
new_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_result["human_result"] = result.get("human_result", None)
new_record["auth_results"]["dkim"].append(new_result)
if not isinstance(auth_results["spf"], list):
@@ -430,6 +432,7 @@ def _parse_report_record(
new_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_result["human_result"] = result.get("human_result", None)
new_record["auth_results"]["spf"].append(new_result)
if "envelope_from" not in new_record["identifiers"]:
@@ -782,6 +785,10 @@ def parse_aggregate_report_xml(
else:
errors = report["report_metadata"]["error"]
new_report_metadata["errors"] = errors
generator = None
if "generator" in report_metadata:
generator = report_metadata["generator"]
new_report_metadata["generator"] = generator
new_report["report_metadata"] = new_report_metadata
records = []
policy_published = report["policy_published"]
@@ -820,16 +827,16 @@ def parse_aggregate_report_xml(
if policy_published["np"] is not None:
np_ = policy_published["np"]
new_policy_published["np"] = np_
psd = None
if "psd" in policy_published:
if policy_published["psd"] is not None:
psd = policy_published["psd"]
new_policy_published["psd"] = psd
t = None
if "t" in policy_published:
if policy_published["t"] is not None:
t = policy_published["t"]
new_policy_published["t"] = t
testing = None
if "testing" in policy_published:
if policy_published["testing"] is not None:
testing = policy_published["testing"]
new_policy_published["testing"] = testing
discovery_method = None
if "discovery_method" in policy_published:
if policy_published["discovery_method"] is not None:
discovery_method = policy_published["discovery_method"]
new_policy_published["discovery_method"] = discovery_method
new_report["policy_published"] = new_policy_published
if type(report["record"]) is list:
@@ -962,6 +969,7 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
)
elif (
header[: len(MAGIC_XML)] == MAGIC_XML
or header[: len(MAGIC_XML_TAG)] == MAGIC_XML_TAG
or header[: len(MAGIC_JSON)] == MAGIC_JSON
):
report = file_object.read().decode(errors="ignore")
@@ -1088,8 +1096,10 @@ def parsed_aggregate_reports_to_csv_rows(
pct = report["policy_published"]["pct"]
fo = report["policy_published"]["fo"]
np_ = report["policy_published"].get("np", None)
psd = report["policy_published"].get("psd", None)
t = report["policy_published"].get("t", None)
testing = report["policy_published"].get("testing", None)
discovery_method = report["policy_published"].get(
"discovery_method", None
)
report_dict: dict[str, Any] = dict(
xml_schema=xml_schema,
@@ -1109,8 +1119,8 @@ def parsed_aggregate_reports_to_csv_rows(
pct=pct,
fo=fo,
np=np_,
psd=psd,
t=t,
testing=testing,
discovery_method=discovery_method,
)
for record in report["records"]:
@@ -1209,8 +1219,8 @@ def parsed_aggregate_reports_to_csv(
"pct",
"fo",
"np",
"psd",
"t",
"testing",
"discovery_method",
"source_ip_address",
"source_country",
"source_reverse_dns",

View File

@@ -21,6 +21,7 @@ class AggregateReportMetadata(TypedDict):
timespan_requires_normalization: bool
original_timespan_seconds: int
errors: List[str]
generator: Optional[str]
class AggregatePolicyPublished(TypedDict):
@@ -32,8 +33,8 @@ class AggregatePolicyPublished(TypedDict):
pct: str
fo: str
np: Optional[str]
psd: Optional[str]
t: Optional[str]
testing: Optional[str]
discovery_method: Optional[str]
class IPSourceInfo(TypedDict):
@@ -66,12 +67,14 @@ class AggregateAuthResultDKIM(TypedDict):
domain: str
result: str
selector: str
human_result: Optional[str]
class AggregateAuthResultSPF(TypedDict):
domain: str
result: str
scope: str
human_result: Optional[str]
class AggregateAuthResults(TypedDict):

View File

@@ -0,0 +1,48 @@
<feedback xmlns="urn:ietf:params:xml:ns:dmarc-2.0">
<version>1.0</version>
<report_metadata>
<org_name>Sample Reporter</org_name>
<email>report_sender@example-reporter.com</email>
<extra_contact_info>...</extra_contact_info>
<report_id>3v98abbp8ya9n3va8yr8oa3ya</report_id>
<date_range>
<begin>302832000</begin>
<end>302918399</end>
</date_range>
<generator>Example DMARC Aggregate Reporter v1.2</generator>
</report_metadata>
<policy_published>
<domain>example.com</domain>
<p>quarantine</p>
<sp>none</sp>
<np>none</np>
<testing>n</testing>
<discovery_method>treewalk</discovery_method>
</policy_published>
<record>
<row>
<source_ip>192.0.2.123</source_ip>
<count>123</count>
<policy_evaluated>
<disposition>pass</disposition>
<dkim>pass</dkim>
<spf>fail</spf>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<dkim>
<domain>example.com</domain>
<result>pass</result>
<selector>abc123</selector>
</dkim>
<spf>
<domain>example.com</domain>
<result>fail</result>
</spf>
</auth_results>
</record>
</feedback>

View File

@@ -17,9 +17,8 @@
<p>reject</p>
<sp>quarantine</sp>
<np>reject</np>
<psd>n</psd>
<t>y</t>
<pct>100</pct>
<testing>y</testing>
<discovery_method>treewalk</discovery_method>
<fo>1</fo>
</policy_published>
<record>