Rename forensic references to failure with backward-compatible aliases

- Rename parse_forensic_report -> parse_failure_report
- Rename parsed_forensic_reports_to_csv_rows -> parsed_failure_reports_to_csv_rows
- Rename parsed_forensic_reports_to_csv -> parsed_failure_reports_to_csv
- Update all internal variable names (forensic_report -> failure_report, etc.)
- Change report_type from 'forensic' to 'failure'
- Use FailureReport type instead of ForensicReport
- Use InvalidFailureReport instead of InvalidForensicReport in function bodies
- Update all docstrings and log messages
- Add backward-compatible aliases at end of file

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-02-20 20:33:52 +00:00
committed by Sean Whalen
parent b9fb4dbd59
commit 4d2db6a0fb
3 changed files with 202 additions and 78 deletions

View File

@@ -48,6 +48,7 @@ from parsedmarc.mail import (
)
from parsedmarc.types import (
AggregateReport,
FailureReport,
ForensicReport,
ParsedReport,
ParsingResults,
@@ -107,8 +108,12 @@ class InvalidAggregateReport(InvalidDMARCReport):
"""Raised when an invalid DMARC aggregate report is encountered"""
class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered"""
class InvalidFailureReport(InvalidDMARCReport):
"""Raised when an invalid DMARC failure report is encountered"""
# Backward-compatible alias
InvalidForensicReport = InvalidFailureReport
def _bucket_interval_by_day(
@@ -810,6 +815,21 @@ def parse_aggregate_report_xml(
if policy_published["fo"] is not None:
fo = policy_published["fo"]
new_policy_published["fo"] = fo
np_ = None
if "np" in policy_published:
if policy_published["np"] is not None:
np_ = policy_published["np"]
new_policy_published["np"] = np_
psd = None
if "psd" in policy_published:
if policy_published["psd"] is not None:
psd = policy_published["psd"]
new_policy_published["psd"] = psd
t = None
if "t" in policy_published:
if policy_published["t"] is not None:
t = policy_published["t"]
new_policy_published["t"] = t
new_report["policy_published"] = new_policy_published
if type(report["record"]) is list:
@@ -1067,6 +1087,9 @@ def parsed_aggregate_reports_to_csv_rows(
sp = report["policy_published"]["sp"]
pct = report["policy_published"]["pct"]
fo = report["policy_published"]["fo"]
np_ = report["policy_published"].get("np", None)
psd = report["policy_published"].get("psd", None)
t = report["policy_published"].get("t", None)
report_dict: dict[str, Any] = dict(
xml_schema=xml_schema,
@@ -1085,6 +1108,9 @@ def parsed_aggregate_reports_to_csv_rows(
sp=sp,
pct=pct,
fo=fo,
np=np_,
psd=psd,
t=t,
)
for record in report["records"]:
@@ -1182,6 +1208,9 @@ def parsed_aggregate_reports_to_csv(
"sp",
"pct",
"fo",
"np",
"psd",
"t",
"source_ip_address",
"source_country",
"source_reverse_dns",
@@ -1219,7 +1248,7 @@ def parsed_aggregate_reports_to_csv(
return csv_file_object.getvalue()
def parse_forensic_report(
def parse_failure_report(
feedback_report: str,
sample: str,
msg_date: datetime,
@@ -1232,9 +1261,9 @@ def parse_forensic_report(
nameservers: Optional[list[str]] = None,
dns_timeout: float = 2.0,
strip_attachment_payloads: bool = False,
) -> ForensicReport:
) -> FailureReport:
"""
Converts a DMARC forensic report and sample to a dict
Converts a DMARC failure report and sample to a dict
Args:
feedback_report (str): A message's feedback report as a string
@@ -1249,7 +1278,7 @@ def parse_forensic_report(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
Returns:
dict: A parsed report and sample
@@ -1265,7 +1294,7 @@ def parse_forensic_report(
if "arrival_date" not in parsed_report:
if msg_date is None:
raise InvalidForensicReport("Forensic sample is not a valid email")
raise InvalidFailureReport("Failure sample is not a valid email")
parsed_report["arrival_date"] = msg_date.isoformat()
if "version" not in parsed_report:
@@ -1351,27 +1380,27 @@ def parse_forensic_report(
parsed_report["sample"] = sample
parsed_report["parsed_sample"] = parsed_sample
return cast(ForensicReport, parsed_report)
return cast(FailureReport, parsed_report)
except KeyError as error:
raise InvalidForensicReport("Missing value: {0}".format(error.__str__()))
raise InvalidFailureReport("Missing value: {0}".format(error.__str__()))
except Exception as error:
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
raise InvalidFailureReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(
reports: Union[ForensicReport, list[ForensicReport]],
def parsed_failure_reports_to_csv_rows(
reports: Union[FailureReport, list[FailureReport]],
) -> list[dict[str, Any]]:
"""
Converts one or more parsed forensic reports to a list of dicts in flat CSV
Converts one or more parsed failure reports to a list of dicts in flat CSV
format
Args:
reports: A parsed forensic report or list of parsed forensic reports
reports: A parsed failure report or list of parsed failure reports
Returns:
list: Parsed forensic report data as a list of dicts in flat CSV format
list: Parsed failure report data as a list of dicts in flat CSV format
"""
if isinstance(reports, dict):
reports = [reports]
@@ -1398,18 +1427,18 @@ def parsed_forensic_reports_to_csv_rows(
return rows
def parsed_forensic_reports_to_csv(
reports: Union[ForensicReport, list[ForensicReport]],
def parsed_failure_reports_to_csv(
reports: Union[FailureReport, list[FailureReport]],
) -> str:
"""
Converts one or more parsed forensic reports to flat CSV format, including
Converts one or more parsed failure reports to flat CSV format, including
headers
Args:
reports: A parsed forensic report or list of parsed forensic reports
reports: A parsed failure report or list of parsed failure reports
Returns:
str: Parsed forensic report data in flat CSV format, including headers
str: Parsed failure report data in flat CSV format, including headers
"""
fields = [
"feedback_type",
@@ -1441,7 +1470,7 @@ def parsed_forensic_reports_to_csv(
csv_writer = DictWriter(csv_file, fieldnames=fields)
csv_writer.writeheader()
rows = parsed_forensic_reports_to_csv_rows(reports)
rows = parsed_failure_reports_to_csv_rows(reports)
for row in rows:
new_row: dict[str, Any] = {}
@@ -1479,13 +1508,13 @@ def parse_report_email(
nameservers (list): A list of one or more nameservers to use
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
keep_alive (callable): keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict:
* ``report_type``: ``aggregate`` or ``forensic``
* ``report_type``: ``aggregate`` or ``failure``
* ``report``: The parsed report
"""
result: Optional[ParsedReport] = None
@@ -1628,7 +1657,7 @@ def parse_report_email(
if feedback_report and sample:
try:
forensic_report = parse_forensic_report(
failure_report = parse_failure_report(
feedback_report,
sample,
msg_date,
@@ -1641,17 +1670,17 @@ def parse_report_email(
dns_timeout=dns_timeout,
strip_attachment_payloads=strip_attachment_payloads,
)
except InvalidForensicReport as e:
except InvalidFailureReport as e:
error = (
'Message with subject "{0}" '
"is not a valid "
"forensic DMARC report: {1}".format(subject, e)
"failure DMARC report: {1}".format(subject, e)
)
raise InvalidForensicReport(error)
raise InvalidFailureReport(error)
except Exception as e:
raise InvalidForensicReport(e.__str__())
raise InvalidFailureReport(e.__str__())
result = {"report_type": "forensic", "report": forensic_report}
result = {"report_type": "failure", "report": failure_report}
return result
if result is None:
@@ -1675,7 +1704,7 @@ def parse_report_file(
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24,
) -> ParsedReport:
"""Parses a DMARC aggregate or forensic file at the given path, a
"""Parses a DMARC aggregate or failure file at the given path, a
file-like object. or bytes
Args:
@@ -1685,7 +1714,7 @@ def parse_report_file(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
ip_db_path (str): Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map
@@ -1776,7 +1805,7 @@ def get_dmarc_reports_from_mbox(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map file
reverse_dns_map_url (str): URL to a reverse DNS map file
@@ -1785,11 +1814,11 @@ def get_dmarc_reports_from_mbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports``
"""
aggregate_reports: list[AggregateReport] = []
forensic_reports: list[ForensicReport] = []
failure_reports: list[FailureReport] = []
smtp_tls_reports: list[SMTPTLSReport] = []
try:
mbox = mailbox.mbox(input_)
@@ -1826,8 +1855,8 @@ def get_dmarc_reports_from_mbox(
"Skipping duplicate aggregate report "
f"from {report_org} with ID: {report_id}"
)
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
elif parsed_email["report_type"] == "failure":
failure_reports.append(parsed_email["report"])
elif parsed_email["report_type"] == "smtp_tls":
smtp_tls_reports.append(parsed_email["report"])
except InvalidDMARCReport as error:
@@ -1836,7 +1865,7 @@ def get_dmarc_reports_from_mbox(
raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_))
return {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"failure_reports": failure_reports,
"smtp_tls_reports": smtp_tls_reports,
}
@@ -1879,7 +1908,7 @@ def get_dmarc_reports_from_mailbox(
nameservers (list): A list of DNS nameservers to query
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
(use 0 for no limit)
@@ -1890,7 +1919,7 @@ def get_dmarc_reports_from_mailbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports``
"""
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
@@ -1902,25 +1931,25 @@ def get_dmarc_reports_from_mailbox(
current_time: Optional[Union[datetime, date, str]] = None
aggregate_reports: list[AggregateReport] = []
forensic_reports: list[ForensicReport] = []
failure_reports: list[FailureReport] = []
smtp_tls_reports: list[SMTPTLSReport] = []
aggregate_report_msg_uids = []
forensic_report_msg_uids = []
failure_report_msg_uids = []
smtp_tls_msg_uids = []
aggregate_reports_folder = "{0}/Aggregate".format(archive_folder)
forensic_reports_folder = "{0}/Forensic".format(archive_folder)
failure_reports_folder = "{0}/Forensic".format(archive_folder)
smtp_tls_reports_folder = "{0}/SMTP-TLS".format(archive_folder)
invalid_reports_folder = "{0}/Invalid".format(archive_folder)
if results:
aggregate_reports = results["aggregate_reports"].copy()
forensic_reports = results["forensic_reports"].copy()
failure_reports = results["failure_reports"].copy()
smtp_tls_reports = results["smtp_tls_reports"].copy()
if not test and create_folders:
connection.create_folder(archive_folder)
connection.create_folder(aggregate_reports_folder)
connection.create_folder(forensic_reports_folder)
connection.create_folder(failure_reports_folder)
connection.create_folder(smtp_tls_reports_folder)
connection.create_folder(invalid_reports_folder)
@@ -2022,9 +2051,9 @@ def get_dmarc_reports_from_mailbox(
f"Skipping duplicate aggregate report with ID: {report_id}"
)
aggregate_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
forensic_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "failure":
failure_reports.append(parsed_email["report"])
failure_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "smtp_tls":
smtp_tls_reports.append(parsed_email["report"])
smtp_tls_msg_uids.append(message_id)
@@ -2051,7 +2080,7 @@ def get_dmarc_reports_from_mailbox(
if not test:
if delete:
processed_messages = (
aggregate_report_msg_uids + forensic_report_msg_uids + smtp_tls_msg_uids
aggregate_report_msg_uids + failure_report_msg_uids + smtp_tls_msg_uids
)
number_of_processed_msgs = len(processed_messages)
@@ -2091,24 +2120,24 @@ def get_dmarc_reports_from_mailbox(
message = "Error moving message UID"
e = "{0} {1}: {2}".format(message, msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
if len(forensic_report_msg_uids) > 0:
message = "Moving forensic report messages from"
if len(failure_report_msg_uids) > 0:
message = "Moving failure report messages from"
logger.debug(
"{0} {1} to {2}".format(
message, reports_folder, forensic_reports_folder
message, reports_folder, failure_reports_folder
)
)
number_of_forensic_msgs = len(forensic_report_msg_uids)
for i in range(number_of_forensic_msgs):
msg_uid = forensic_report_msg_uids[i]
number_of_failure_msgs = len(failure_report_msg_uids)
for i in range(number_of_failure_msgs):
msg_uid = failure_report_msg_uids[i]
message = "Moving message"
logger.debug(
"{0} {1} of {2}: UID {3}".format(
message, i + 1, number_of_forensic_msgs, msg_uid
message, i + 1, number_of_failure_msgs, msg_uid
)
)
try:
connection.move_message(msg_uid, forensic_reports_folder)
connection.move_message(msg_uid, failure_reports_folder)
except Exception as e:
e = "Error moving message UID {0}: {1}".format(msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
@@ -2135,7 +2164,7 @@ def get_dmarc_reports_from_mailbox(
logger.error("Mailbox error: {0}".format(e))
results = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"failure_reports": failure_reports,
"smtp_tls_reports": smtp_tls_reports,
}
@@ -2217,7 +2246,7 @@ def watch_inbox(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Replace attachment payloads in
forensic report samples with None
failure report samples with None
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
@@ -2261,7 +2290,7 @@ def append_json(
filename: str,
reports: Union[
Sequence[AggregateReport],
Sequence[ForensicReport],
Sequence[FailureReport],
Sequence[SMTPTLSReport],
],
) -> None:
@@ -2304,10 +2333,10 @@ def save_output(
*,
output_directory: str = "output",
aggregate_json_filename: str = "aggregate.json",
forensic_json_filename: str = "forensic.json",
failure_json_filename: str = "forensic.json",
smtp_tls_json_filename: str = "smtp_tls.json",
aggregate_csv_filename: str = "aggregate.csv",
forensic_csv_filename: str = "forensic.csv",
failure_csv_filename: str = "forensic.csv",
smtp_tls_csv_filename: str = "smtp_tls.csv",
):
"""
@@ -2317,15 +2346,15 @@ def save_output(
results: Parsing results
output_directory (str): The path to the directory to save in
aggregate_json_filename (str): Filename for the aggregate JSON file
forensic_json_filename (str): Filename for the forensic JSON file
failure_json_filename (str): Filename for the failure JSON file
smtp_tls_json_filename (str): Filename for the SMTP TLS JSON file
aggregate_csv_filename (str): Filename for the aggregate CSV file
forensic_csv_filename (str): Filename for the forensic CSV file
failure_csv_filename (str): Filename for the failure CSV file
smtp_tls_csv_filename (str): Filename for the SMTP TLS CSV file
"""
aggregate_reports = results["aggregate_reports"]
forensic_reports = results["forensic_reports"]
failure_reports = results["failure_reports"]
smtp_tls_reports = results["smtp_tls_reports"]
output_directory = os.path.expanduser(output_directory)
@@ -2345,12 +2374,12 @@ def save_output(
)
append_json(
os.path.join(output_directory, forensic_json_filename), forensic_reports
os.path.join(output_directory, failure_json_filename), failure_reports
)
append_csv(
os.path.join(output_directory, forensic_csv_filename),
parsed_forensic_reports_to_csv(forensic_reports),
os.path.join(output_directory, failure_csv_filename),
parsed_failure_reports_to_csv(failure_reports),
)
append_json(
@@ -2367,10 +2396,10 @@ def save_output(
os.makedirs(samples_directory)
sample_filenames = []
for forensic_report in forensic_reports:
sample = forensic_report["sample"]
for failure_report in failure_reports:
sample = failure_report["sample"]
message_count = 0
parsed_sample = forensic_report["parsed_sample"]
parsed_sample = failure_report["parsed_sample"]
subject = (
parsed_sample.get("filename_safe_subject")
or parsed_sample.get("subject")
@@ -2504,3 +2533,9 @@ def email_results(
attachments=attachments,
plain_message=message,
)
# Backward-compatible aliases
parse_forensic_report = parse_failure_report
parsed_forensic_reports_to_csv_rows = parsed_failure_reports_to_csv_rows
parsed_forensic_reports_to_csv = parsed_failure_reports_to_csv

View File

@@ -8,7 +8,7 @@ from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# For optional keys, use total=False TypedDicts.
ReportType = Literal["aggregate", "forensic", "smtp_tls"]
ReportType = Literal["aggregate", "failure", "smtp_tls"]
class AggregateReportMetadata(TypedDict):
@@ -31,6 +31,9 @@ class AggregatePolicyPublished(TypedDict):
sp: str
pct: str
fo: str
np: Optional[str]
psd: Optional[str]
t: Optional[str]
class IPSourceInfo(TypedDict):
@@ -119,7 +122,7 @@ ParsedEmail = TypedDict(
"ParsedEmail",
{
# This is a lightly-specified version of mailsuite/mailparser JSON.
# It focuses on the fields parsedmarc uses in forensic handling.
# It focuses on the fields parsedmarc uses in failure report handling.
"headers": Dict[str, Any],
"subject": Optional[str],
"filename_safe_subject": Optional[str],
@@ -138,7 +141,7 @@ ParsedEmail = TypedDict(
)
class ForensicReport(TypedDict):
class FailureReport(TypedDict):
feedback_type: Optional[str]
user_agent: Optional[str]
version: Optional[str]
@@ -159,6 +162,10 @@ class ForensicReport(TypedDict):
parsed_sample: ParsedEmail
# Backward-compatible alias
ForensicReport = FailureReport
class SMTPTLSFailureDetails(TypedDict):
result_type: str
failed_session_count: int
@@ -201,9 +208,13 @@ class AggregateParsedReport(TypedDict):
report: AggregateReport
class ForensicParsedReport(TypedDict):
report_type: Literal["forensic"]
report: ForensicReport
class FailureParsedReport(TypedDict):
report_type: Literal["failure"]
report: FailureReport
# Backward-compatible alias
ForensicParsedReport = FailureParsedReport
class SMTPTLSParsedReport(TypedDict):
@@ -211,10 +222,10 @@ class SMTPTLSParsedReport(TypedDict):
report: SMTPTLSReport
ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport]
ParsedReport = Union[AggregateParsedReport, FailureParsedReport, SMTPTLSParsedReport]
class ParsingResults(TypedDict):
aggregate_reports: List[AggregateReport]
forensic_reports: List[ForensicReport]
failure_reports: List[FailureReport]
smtp_tls_reports: List[SMTPTLSReport]

View File

@@ -0,0 +1,78 @@
<?xml version="1.0"?>
<feedback>
<version>2.0</version>
<report_metadata>
<org_name>example.net</org_name>
<email>postmaster@example.net</email>
<report_id>dmarcbis-test-report-001</report_id>
<date_range>
<begin>1700000000</begin>
<end>1700086399</end>
</date_range>
</report_metadata>
<policy_published>
<domain>example.com</domain>
<adkim>s</adkim>
<aspf>s</aspf>
<p>reject</p>
<sp>quarantine</sp>
<np>reject</np>
<psd>n</psd>
<t>y</t>
<pct>100</pct>
<fo>1</fo>
</policy_published>
<record>
<row>
<source_ip>198.51.100.1</source_ip>
<count>5</count>
<policy_evaluated>
<disposition>none</disposition>
<dkim>pass</dkim>
<spf>pass</spf>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<dkim>
<domain>example.com</domain>
<selector>selector1</selector>
<result>pass</result>
</dkim>
<spf>
<domain>example.com</domain>
<scope>mfrom</scope>
<result>pass</result>
</spf>
</auth_results>
</record>
<record>
<row>
<source_ip>203.0.113.10</source_ip>
<count>2</count>
<policy_evaluated>
<disposition>reject</disposition>
<dkim>fail</dkim>
<spf>fail</spf>
<reason>
<type>other</type>
<comment>sender not authorized</comment>
</reason>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>spoofed.example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<spf>
<domain>spoofed.example.com</domain>
<scope>mfrom</scope>
<result>fail</result>
</spf>
</auth_results>
</record>
</feedback>