From 4d2db6a0fbb5c5ae1e0cb729c5bb7e8aff2a0c9a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 20:33:52 +0000 Subject: [PATCH] Rename forensic references to failure with backward-compatible aliases - Rename parse_forensic_report -> parse_failure_report - Rename parsed_forensic_reports_to_csv_rows -> parsed_failure_reports_to_csv_rows - Rename parsed_forensic_reports_to_csv -> parsed_failure_reports_to_csv - Update all internal variable names (forensic_report -> failure_report, etc.) - Change report_type from 'forensic' to 'failure' - Use FailureReport type instead of ForensicReport - Use InvalidFailureReport instead of InvalidForensicReport in function bodies - Update all docstrings and log messages - Add backward-compatible aliases at end of file Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- parsedmarc/__init__.py | 175 +++++++++++------- parsedmarc/types.py | 27 ++- ....net!example.com!1700000000!1700086399.xml | 78 ++++++++ 3 files changed, 202 insertions(+), 78 deletions(-) create mode 100644 samples/aggregate/dmarcbis-example.net!example.com!1700000000!1700086399.xml diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index e36113f..a04aae8 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -48,6 +48,7 @@ from parsedmarc.mail import ( ) from parsedmarc.types import ( AggregateReport, + FailureReport, ForensicReport, ParsedReport, ParsingResults, @@ -107,8 +108,12 @@ class InvalidAggregateReport(InvalidDMARCReport): """Raised when an invalid DMARC aggregate report is encountered""" -class InvalidForensicReport(InvalidDMARCReport): - """Raised when an invalid DMARC forensic report is encountered""" +class InvalidFailureReport(InvalidDMARCReport): + """Raised when an invalid DMARC failure report is encountered""" + + +# Backward-compatible alias +InvalidForensicReport = InvalidFailureReport def _bucket_interval_by_day( @@ -810,6 +815,21 @@ def parse_aggregate_report_xml( if policy_published["fo"] is not None: fo = policy_published["fo"] new_policy_published["fo"] = fo + np_ = None + if "np" in policy_published: + if policy_published["np"] is not None: + np_ = policy_published["np"] + new_policy_published["np"] = np_ + psd = None + if "psd" in policy_published: + if policy_published["psd"] is not None: + psd = policy_published["psd"] + new_policy_published["psd"] = psd + t = None + if "t" in policy_published: + if policy_published["t"] is not None: + t = policy_published["t"] + new_policy_published["t"] = t new_report["policy_published"] = new_policy_published if type(report["record"]) is list: @@ -1067,6 +1087,9 @@ def parsed_aggregate_reports_to_csv_rows( sp = report["policy_published"]["sp"] pct = report["policy_published"]["pct"] fo = report["policy_published"]["fo"] + np_ = report["policy_published"].get("np", None) + psd = report["policy_published"].get("psd", None) + t = report["policy_published"].get("t", None) report_dict: dict[str, Any] = dict( xml_schema=xml_schema, @@ -1085,6 +1108,9 @@ def parsed_aggregate_reports_to_csv_rows( sp=sp, pct=pct, fo=fo, + np=np_, + psd=psd, + t=t, ) for record in report["records"]: @@ -1182,6 +1208,9 @@ def parsed_aggregate_reports_to_csv( "sp", "pct", "fo", + "np", + "psd", + "t", "source_ip_address", "source_country", "source_reverse_dns", @@ -1219,7 +1248,7 @@ def parsed_aggregate_reports_to_csv( return csv_file_object.getvalue() -def parse_forensic_report( +def parse_failure_report( feedback_report: str, sample: str, msg_date: datetime, @@ -1232,9 +1261,9 @@ def parse_forensic_report( nameservers: Optional[list[str]] = None, dns_timeout: float = 2.0, strip_attachment_payloads: bool = False, -) -> ForensicReport: +) -> FailureReport: """ - Converts a DMARC forensic report and sample to a dict + Converts a DMARC failure report and sample to a dict Args: feedback_report (str): A message's feedback report as a string @@ -1249,7 +1278,7 @@ def parse_forensic_report( (Cloudflare's public DNS resolvers by default) dns_timeout (float): Sets the DNS timeout in seconds strip_attachment_payloads (bool): Remove attachment payloads from - forensic report results + failure report results Returns: dict: A parsed report and sample @@ -1265,7 +1294,7 @@ def parse_forensic_report( if "arrival_date" not in parsed_report: if msg_date is None: - raise InvalidForensicReport("Forensic sample is not a valid email") + raise InvalidFailureReport("Failure sample is not a valid email") parsed_report["arrival_date"] = msg_date.isoformat() if "version" not in parsed_report: @@ -1351,27 +1380,27 @@ def parse_forensic_report( parsed_report["sample"] = sample parsed_report["parsed_sample"] = parsed_sample - return cast(ForensicReport, parsed_report) + return cast(FailureReport, parsed_report) except KeyError as error: - raise InvalidForensicReport("Missing value: {0}".format(error.__str__())) + raise InvalidFailureReport("Missing value: {0}".format(error.__str__())) except Exception as error: - raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__())) + raise InvalidFailureReport("Unexpected error: {0}".format(error.__str__())) -def parsed_forensic_reports_to_csv_rows( - reports: Union[ForensicReport, list[ForensicReport]], +def parsed_failure_reports_to_csv_rows( + reports: Union[FailureReport, list[FailureReport]], ) -> list[dict[str, Any]]: """ - Converts one or more parsed forensic reports to a list of dicts in flat CSV + Converts one or more parsed failure reports to a list of dicts in flat CSV format Args: - reports: A parsed forensic report or list of parsed forensic reports + reports: A parsed failure report or list of parsed failure reports Returns: - list: Parsed forensic report data as a list of dicts in flat CSV format + list: Parsed failure report data as a list of dicts in flat CSV format """ if isinstance(reports, dict): reports = [reports] @@ -1398,18 +1427,18 @@ def parsed_forensic_reports_to_csv_rows( return rows -def parsed_forensic_reports_to_csv( - reports: Union[ForensicReport, list[ForensicReport]], +def parsed_failure_reports_to_csv( + reports: Union[FailureReport, list[FailureReport]], ) -> str: """ - Converts one or more parsed forensic reports to flat CSV format, including + Converts one or more parsed failure reports to flat CSV format, including headers Args: - reports: A parsed forensic report or list of parsed forensic reports + reports: A parsed failure report or list of parsed failure reports Returns: - str: Parsed forensic report data in flat CSV format, including headers + str: Parsed failure report data in flat CSV format, including headers """ fields = [ "feedback_type", @@ -1441,7 +1470,7 @@ def parsed_forensic_reports_to_csv( csv_writer = DictWriter(csv_file, fieldnames=fields) csv_writer.writeheader() - rows = parsed_forensic_reports_to_csv_rows(reports) + rows = parsed_failure_reports_to_csv_rows(reports) for row in rows: new_row: dict[str, Any] = {} @@ -1479,13 +1508,13 @@ def parse_report_email( nameservers (list): A list of one or more nameservers to use dns_timeout (float): Sets the DNS timeout in seconds strip_attachment_payloads (bool): Remove attachment payloads from - forensic report results + failure report results keep_alive (callable): keep alive function normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: dict: - * ``report_type``: ``aggregate`` or ``forensic`` + * ``report_type``: ``aggregate`` or ``failure`` * ``report``: The parsed report """ result: Optional[ParsedReport] = None @@ -1628,7 +1657,7 @@ def parse_report_email( if feedback_report and sample: try: - forensic_report = parse_forensic_report( + failure_report = parse_failure_report( feedback_report, sample, msg_date, @@ -1641,17 +1670,17 @@ def parse_report_email( dns_timeout=dns_timeout, strip_attachment_payloads=strip_attachment_payloads, ) - except InvalidForensicReport as e: + except InvalidFailureReport as e: error = ( 'Message with subject "{0}" ' "is not a valid " - "forensic DMARC report: {1}".format(subject, e) + "failure DMARC report: {1}".format(subject, e) ) - raise InvalidForensicReport(error) + raise InvalidFailureReport(error) except Exception as e: - raise InvalidForensicReport(e.__str__()) + raise InvalidFailureReport(e.__str__()) - result = {"report_type": "forensic", "report": forensic_report} + result = {"report_type": "failure", "report": failure_report} return result if result is None: @@ -1675,7 +1704,7 @@ def parse_report_file( keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: float = 24, ) -> ParsedReport: - """Parses a DMARC aggregate or forensic file at the given path, a + """Parses a DMARC aggregate or failure file at the given path, a file-like object. or bytes Args: @@ -1685,7 +1714,7 @@ def parse_report_file( (Cloudflare's public DNS resolvers by default) dns_timeout (float): Sets the DNS timeout in seconds strip_attachment_payloads (bool): Remove attachment payloads from - forensic report results + failure report results ip_db_path (str): Path to a MMDB file from MaxMind or DBIP always_use_local_files (bool): Do not download files reverse_dns_map_path (str): Path to a reverse DNS map @@ -1776,7 +1805,7 @@ def get_dmarc_reports_from_mbox( (Cloudflare's public DNS resolvers by default) dns_timeout (float): Sets the DNS timeout in seconds strip_attachment_payloads (bool): Remove attachment payloads from - forensic report results + failure report results always_use_local_files (bool): Do not download files reverse_dns_map_path (str): Path to a reverse DNS map file reverse_dns_map_url (str): URL to a reverse DNS map file @@ -1785,11 +1814,11 @@ def get_dmarc_reports_from_mbox( normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: - dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports`` + dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports`` """ aggregate_reports: list[AggregateReport] = [] - forensic_reports: list[ForensicReport] = [] + failure_reports: list[FailureReport] = [] smtp_tls_reports: list[SMTPTLSReport] = [] try: mbox = mailbox.mbox(input_) @@ -1826,8 +1855,8 @@ def get_dmarc_reports_from_mbox( "Skipping duplicate aggregate report " f"from {report_org} with ID: {report_id}" ) - elif parsed_email["report_type"] == "forensic": - forensic_reports.append(parsed_email["report"]) + elif parsed_email["report_type"] == "failure": + failure_reports.append(parsed_email["report"]) elif parsed_email["report_type"] == "smtp_tls": smtp_tls_reports.append(parsed_email["report"]) except InvalidDMARCReport as error: @@ -1836,7 +1865,7 @@ def get_dmarc_reports_from_mbox( raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_)) return { "aggregate_reports": aggregate_reports, - "forensic_reports": forensic_reports, + "failure_reports": failure_reports, "smtp_tls_reports": smtp_tls_reports, } @@ -1879,7 +1908,7 @@ def get_dmarc_reports_from_mailbox( nameservers (list): A list of DNS nameservers to query dns_timeout (float): Set the DNS query timeout strip_attachment_payloads (bool): Remove attachment payloads from - forensic report results + failure report results results (dict): Results from the previous run batch_size (int): Number of messages to read and process before saving (use 0 for no limit) @@ -1890,7 +1919,7 @@ def get_dmarc_reports_from_mailbox( normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: - dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports`` + dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports`` """ if delete and test: raise ValueError("delete and test options are mutually exclusive") @@ -1902,25 +1931,25 @@ def get_dmarc_reports_from_mailbox( current_time: Optional[Union[datetime, date, str]] = None aggregate_reports: list[AggregateReport] = [] - forensic_reports: list[ForensicReport] = [] + failure_reports: list[FailureReport] = [] smtp_tls_reports: list[SMTPTLSReport] = [] aggregate_report_msg_uids = [] - forensic_report_msg_uids = [] + failure_report_msg_uids = [] smtp_tls_msg_uids = [] aggregate_reports_folder = "{0}/Aggregate".format(archive_folder) - forensic_reports_folder = "{0}/Forensic".format(archive_folder) + failure_reports_folder = "{0}/Forensic".format(archive_folder) smtp_tls_reports_folder = "{0}/SMTP-TLS".format(archive_folder) invalid_reports_folder = "{0}/Invalid".format(archive_folder) if results: aggregate_reports = results["aggregate_reports"].copy() - forensic_reports = results["forensic_reports"].copy() + failure_reports = results["failure_reports"].copy() smtp_tls_reports = results["smtp_tls_reports"].copy() if not test and create_folders: connection.create_folder(archive_folder) connection.create_folder(aggregate_reports_folder) - connection.create_folder(forensic_reports_folder) + connection.create_folder(failure_reports_folder) connection.create_folder(smtp_tls_reports_folder) connection.create_folder(invalid_reports_folder) @@ -2022,9 +2051,9 @@ def get_dmarc_reports_from_mailbox( f"Skipping duplicate aggregate report with ID: {report_id}" ) aggregate_report_msg_uids.append(message_id) - elif parsed_email["report_type"] == "forensic": - forensic_reports.append(parsed_email["report"]) - forensic_report_msg_uids.append(message_id) + elif parsed_email["report_type"] == "failure": + failure_reports.append(parsed_email["report"]) + failure_report_msg_uids.append(message_id) elif parsed_email["report_type"] == "smtp_tls": smtp_tls_reports.append(parsed_email["report"]) smtp_tls_msg_uids.append(message_id) @@ -2051,7 +2080,7 @@ def get_dmarc_reports_from_mailbox( if not test: if delete: processed_messages = ( - aggregate_report_msg_uids + forensic_report_msg_uids + smtp_tls_msg_uids + aggregate_report_msg_uids + failure_report_msg_uids + smtp_tls_msg_uids ) number_of_processed_msgs = len(processed_messages) @@ -2091,24 +2120,24 @@ def get_dmarc_reports_from_mailbox( message = "Error moving message UID" e = "{0} {1}: {2}".format(message, msg_uid, e) logger.error("Mailbox error: {0}".format(e)) - if len(forensic_report_msg_uids) > 0: - message = "Moving forensic report messages from" + if len(failure_report_msg_uids) > 0: + message = "Moving failure report messages from" logger.debug( "{0} {1} to {2}".format( - message, reports_folder, forensic_reports_folder + message, reports_folder, failure_reports_folder ) ) - number_of_forensic_msgs = len(forensic_report_msg_uids) - for i in range(number_of_forensic_msgs): - msg_uid = forensic_report_msg_uids[i] + number_of_failure_msgs = len(failure_report_msg_uids) + for i in range(number_of_failure_msgs): + msg_uid = failure_report_msg_uids[i] message = "Moving message" logger.debug( "{0} {1} of {2}: UID {3}".format( - message, i + 1, number_of_forensic_msgs, msg_uid + message, i + 1, number_of_failure_msgs, msg_uid ) ) try: - connection.move_message(msg_uid, forensic_reports_folder) + connection.move_message(msg_uid, failure_reports_folder) except Exception as e: e = "Error moving message UID {0}: {1}".format(msg_uid, e) logger.error("Mailbox error: {0}".format(e)) @@ -2135,7 +2164,7 @@ def get_dmarc_reports_from_mailbox( logger.error("Mailbox error: {0}".format(e)) results = { "aggregate_reports": aggregate_reports, - "forensic_reports": forensic_reports, + "failure_reports": failure_reports, "smtp_tls_reports": smtp_tls_reports, } @@ -2217,7 +2246,7 @@ def watch_inbox( (Cloudflare's public DNS resolvers by default) dns_timeout (float): Set the DNS query timeout strip_attachment_payloads (bool): Replace attachment payloads in - forensic report samples with None + failure report samples with None batch_size (int): Number of messages to read and process before saving since: Search for messages since certain time normalize_timespan_threshold_hours (float): Normalize timespans beyond this @@ -2261,7 +2290,7 @@ def append_json( filename: str, reports: Union[ Sequence[AggregateReport], - Sequence[ForensicReport], + Sequence[FailureReport], Sequence[SMTPTLSReport], ], ) -> None: @@ -2304,10 +2333,10 @@ def save_output( *, output_directory: str = "output", aggregate_json_filename: str = "aggregate.json", - forensic_json_filename: str = "forensic.json", + failure_json_filename: str = "forensic.json", smtp_tls_json_filename: str = "smtp_tls.json", aggregate_csv_filename: str = "aggregate.csv", - forensic_csv_filename: str = "forensic.csv", + failure_csv_filename: str = "forensic.csv", smtp_tls_csv_filename: str = "smtp_tls.csv", ): """ @@ -2317,15 +2346,15 @@ def save_output( results: Parsing results output_directory (str): The path to the directory to save in aggregate_json_filename (str): Filename for the aggregate JSON file - forensic_json_filename (str): Filename for the forensic JSON file + failure_json_filename (str): Filename for the failure JSON file smtp_tls_json_filename (str): Filename for the SMTP TLS JSON file aggregate_csv_filename (str): Filename for the aggregate CSV file - forensic_csv_filename (str): Filename for the forensic CSV file + failure_csv_filename (str): Filename for the failure CSV file smtp_tls_csv_filename (str): Filename for the SMTP TLS CSV file """ aggregate_reports = results["aggregate_reports"] - forensic_reports = results["forensic_reports"] + failure_reports = results["failure_reports"] smtp_tls_reports = results["smtp_tls_reports"] output_directory = os.path.expanduser(output_directory) @@ -2345,12 +2374,12 @@ def save_output( ) append_json( - os.path.join(output_directory, forensic_json_filename), forensic_reports + os.path.join(output_directory, failure_json_filename), failure_reports ) append_csv( - os.path.join(output_directory, forensic_csv_filename), - parsed_forensic_reports_to_csv(forensic_reports), + os.path.join(output_directory, failure_csv_filename), + parsed_failure_reports_to_csv(failure_reports), ) append_json( @@ -2367,10 +2396,10 @@ def save_output( os.makedirs(samples_directory) sample_filenames = [] - for forensic_report in forensic_reports: - sample = forensic_report["sample"] + for failure_report in failure_reports: + sample = failure_report["sample"] message_count = 0 - parsed_sample = forensic_report["parsed_sample"] + parsed_sample = failure_report["parsed_sample"] subject = ( parsed_sample.get("filename_safe_subject") or parsed_sample.get("subject") @@ -2504,3 +2533,9 @@ def email_results( attachments=attachments, plain_message=message, ) + + +# Backward-compatible aliases +parse_forensic_report = parse_failure_report +parsed_forensic_reports_to_csv_rows = parsed_failure_reports_to_csv_rows +parsed_forensic_reports_to_csv = parsed_failure_reports_to_csv diff --git a/parsedmarc/types.py b/parsedmarc/types.py index f0d367d..61a9a83 100644 --- a/parsedmarc/types.py +++ b/parsedmarc/types.py @@ -8,7 +8,7 @@ from typing import Any, Dict, List, Literal, Optional, TypedDict, Union # For optional keys, use total=False TypedDicts. -ReportType = Literal["aggregate", "forensic", "smtp_tls"] +ReportType = Literal["aggregate", "failure", "smtp_tls"] class AggregateReportMetadata(TypedDict): @@ -31,6 +31,9 @@ class AggregatePolicyPublished(TypedDict): sp: str pct: str fo: str + np: Optional[str] + psd: Optional[str] + t: Optional[str] class IPSourceInfo(TypedDict): @@ -119,7 +122,7 @@ ParsedEmail = TypedDict( "ParsedEmail", { # This is a lightly-specified version of mailsuite/mailparser JSON. - # It focuses on the fields parsedmarc uses in forensic handling. + # It focuses on the fields parsedmarc uses in failure report handling. "headers": Dict[str, Any], "subject": Optional[str], "filename_safe_subject": Optional[str], @@ -138,7 +141,7 @@ ParsedEmail = TypedDict( ) -class ForensicReport(TypedDict): +class FailureReport(TypedDict): feedback_type: Optional[str] user_agent: Optional[str] version: Optional[str] @@ -159,6 +162,10 @@ class ForensicReport(TypedDict): parsed_sample: ParsedEmail +# Backward-compatible alias +ForensicReport = FailureReport + + class SMTPTLSFailureDetails(TypedDict): result_type: str failed_session_count: int @@ -201,9 +208,13 @@ class AggregateParsedReport(TypedDict): report: AggregateReport -class ForensicParsedReport(TypedDict): - report_type: Literal["forensic"] - report: ForensicReport +class FailureParsedReport(TypedDict): + report_type: Literal["failure"] + report: FailureReport + + +# Backward-compatible alias +ForensicParsedReport = FailureParsedReport class SMTPTLSParsedReport(TypedDict): @@ -211,10 +222,10 @@ class SMTPTLSParsedReport(TypedDict): report: SMTPTLSReport -ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport] +ParsedReport = Union[AggregateParsedReport, FailureParsedReport, SMTPTLSParsedReport] class ParsingResults(TypedDict): aggregate_reports: List[AggregateReport] - forensic_reports: List[ForensicReport] + failure_reports: List[FailureReport] smtp_tls_reports: List[SMTPTLSReport] diff --git a/samples/aggregate/dmarcbis-example.net!example.com!1700000000!1700086399.xml b/samples/aggregate/dmarcbis-example.net!example.com!1700000000!1700086399.xml new file mode 100644 index 0000000..59cb1a5 --- /dev/null +++ b/samples/aggregate/dmarcbis-example.net!example.com!1700000000!1700086399.xml @@ -0,0 +1,78 @@ + + + 2.0 + + example.net + postmaster@example.net + dmarcbis-test-report-001 + + 1700000000 + 1700086399 + + + + example.com + s + s +

reject

+ quarantine + reject + n + y + 100 + 1 +
+ + + 198.51.100.1 + 5 + + none + pass + pass + + + + example.com + example.com + + + + example.com + selector1 + pass + + + example.com + mfrom + pass + + + + + + 203.0.113.10 + 2 + + reject + fail + fail + + other + sender not authorized + + + + + spoofed.example.com + example.com + + + + spoofed.example.com + mfrom + fail + + + +