diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index ea00f5f..9f5fe6a 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -117,12 +117,18 @@ class AlignmentInfo(TypedDict): dmarc: bool +class PolicyOverrideReason(TypedDict, total=False): + """DMARC policy override reason""" + type: NotRequired[str] + comment: NotRequired[Optional[str]] + + class PolicyEvaluated(TypedDict): """DMARC policy evaluation result""" disposition: str dkim: str spf: str - policy_override_reasons: list[dict[str, Any]] + policy_override_reasons: list[PolicyOverrideReason] class DKIMAuthResult(TypedDict): @@ -154,7 +160,7 @@ class Identifiers(TypedDict): class ParsedReportRecord(TypedDict): """A parsed DMARC aggregate report record""" - source: dict[str, Any] + source: IPAddressInfo count: int alignment: AlignmentInfo policy_evaluated: PolicyEvaluated @@ -162,6 +168,13 @@ class ParsedReportRecord(TypedDict): auth_results: AuthResults +class ParsedReportRecordWithMetadata(ParsedReportRecord, total=False): + """A parsed DMARC report record with normalization metadata""" + normalized_timespan: bool + interval_begin: NotRequired[str] + interval_end: NotRequired[str] + + class ReportMetadata(TypedDict, total=False): """DMARC report metadata""" org_name: str @@ -204,12 +217,56 @@ class SMTPTLSFailureDetails(TypedDict): failure_reason_code: NotRequired[Optional[str]] -class SMTPTLSPolicy(TypedDict): +class SMTPTLSPolicy(TypedDict, total=False): """SMTP TLS policy information""" policy_type: str - policy_string: list[str] policy_domain: str - mx_host_pattern: list[str] + policy_strings: NotRequired[list[str]] + mx_host_patterns: NotRequired[list[str]] + successful_session_count: int + failed_session_count: int + failure_details: NotRequired[list[SMTPTLSFailureDetails]] + + +class ParsedSMTPTLSReport(TypedDict): + """A complete parsed SMTP TLS report""" + organization_name: str + begin_date: str + end_date: str + contact_info: str + report_id: str + policies: list[SMTPTLSPolicy] + + +class ParsedForensicReport(TypedDict, total=False): + """A parsed DMARC forensic report""" + feedback_type: str + user_agent: NotRequired[Optional[str]] + version: NotRequired[Optional[str]] + original_envelope_id: NotRequired[Optional[str]] + original_mail_from: NotRequired[Optional[str]] + original_rcpt_to: NotRequired[Optional[str]] + arrival_date: str + arrival_date_utc: str + subject: NotRequired[str] + message_id: str + authentication_results: str + delivery_result: str + auth_failure: list[str] + reported_domain: str + arrival_date_utc: str + source: IPAddressInfo + authentication_mechanisms: list[str] + dkim_domain: NotRequired[Optional[str]] + sample_headers_only: bool + sample: NotRequired[str] + parsed_sample: NotRequired[dict[str, Any]] + + +class ReportTypeWrapper(TypedDict): + """Wrapper for report type identification""" + report_type: str + report: Union[ParsedAggregateReport, ParsedForensicReport, ParsedSMTPTLSReport] def _bucket_interval_by_day( @@ -555,7 +612,7 @@ def _parse_report_record( return new_record -def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]) -> dict[str, Any]: +def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]) -> SMTPTLSFailureDetails: try: new_failure_details: dict[str, Any] = { "result_type": failure_details["result-type"], @@ -591,7 +648,7 @@ def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]) -> dict[str raise InvalidSMTPTLSReport(str(e)) -def _parse_smtp_tls_report_policy(policy: dict[str, Any]) -> dict[str, Any]: +def _parse_smtp_tls_report_policy(policy: dict[str, Any]) -> SMTPTLSPolicy: policy_types = ["tlsa", "sts", "no-policy-found"] try: policy_domain = policy["policy"]["policy-domain"] @@ -631,7 +688,7 @@ def _parse_smtp_tls_report_policy(policy: dict[str, Any]) -> dict[str, Any]: raise InvalidSMTPTLSReport(str(e)) -def parse_smtp_tls_report_json(report: str) -> dict[str, Any]: +def parse_smtp_tls_report_json(report: str) -> ParsedSMTPTLSReport: """Parses and validates an SMTP TLS report""" required_fields = [ "organization-name", @@ -671,7 +728,7 @@ def parse_smtp_tls_report_json(report: str) -> dict[str, Any]: def parsed_smtp_tls_reports_to_csv_rows( - reports: Union[dict[str, Any], list[dict[str, Any]]], + reports: Union[ParsedSMTPTLSReport, list[ParsedSMTPTLSReport]], ) -> list[dict[str, Any]]: """Converts one or more parsed SMTP TLS reports into a list of single layer dict objects suitable for use in a CSV""" @@ -707,7 +764,7 @@ def parsed_smtp_tls_reports_to_csv_rows( return rows -def parsed_smtp_tls_reports_to_csv(reports: dict[str, Any]) -> str: +def parsed_smtp_tls_reports_to_csv(reports: ParsedSMTPTLSReport) -> str: """ Converts one or more parsed SMTP TLS reports to flat CSV format, including headers @@ -764,7 +821,7 @@ def parse_aggregate_report_xml( timeout: Optional[float] = 2.0, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: float = 24.0, -) -> dict[str, Any]: +) -> ParsedAggregateReport: """Parses a DMARC XML report string and returns a consistent dict Args: @@ -1042,7 +1099,7 @@ def parse_aggregate_report_file( dns_timeout: Optional[float] = 2.0, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: Optional[float] = 24.0, -) -> dict[str, Any]: +) -> ParsedAggregateReport: """Parses a file at the given path, a file-like object. or bytes as an aggregate DMARC report @@ -1083,7 +1140,7 @@ def parse_aggregate_report_file( def parsed_aggregate_reports_to_csv_rows( - reports: list[dict[str, Any]], + reports: list[ParsedAggregateReport], ) -> list[dict[str, Any]]: """ Converts one or more parsed aggregate reports to list of dicts in flat CSV @@ -1208,7 +1265,7 @@ def parsed_aggregate_reports_to_csv_rows( return rows -def parsed_aggregate_reports_to_csv(reports: list[dict[str, Any]]) -> str: +def parsed_aggregate_reports_to_csv(reports: list[ParsedAggregateReport]) -> str: """ Converts one or more parsed aggregate reports to flat CSV format, including headers @@ -1287,7 +1344,7 @@ def parse_forensic_report( nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 2.0, strip_attachment_payloads: Optional[bool] = False, -) -> dict[str, Any]: +) -> ParsedForensicReport: """ Converts a DMARC forensic report and sample to a ``dict`` @@ -1415,7 +1472,7 @@ def parse_forensic_report( raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__())) -def parsed_forensic_reports_to_csv_rows(reports: list[dict[str, Any]]): +def parsed_forensic_reports_to_csv_rows(reports: list[ParsedForensicReport]): """ Converts one or more parsed forensic reports to a list of dicts in flat CSV format @@ -1451,7 +1508,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[dict[str, Any]]): return rows -def parsed_forensic_reports_to_csv(reports: list[dict[str, Any]]) -> str: +def parsed_forensic_reports_to_csv(reports: list[ParsedForensicReport]) -> str: """ Converts one or more parsed forensic reports to flat CSV format, including headers @@ -1718,7 +1775,7 @@ def parse_report_file( offline: Optional[bool] = False, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: Optional[float] = 24, -) -> dict[str, Any]: +) -> ReportTypeWrapper: """Parses a DMARC aggregate or forensic file at the given path, a file-like object. or bytes