Add comprehensive TypedDicts to minimize Any usage in public APIs

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-17 22:28:19 +00:00
parent febbb107c4
commit 2174f23eb5

View File

@@ -117,12 +117,18 @@ class AlignmentInfo(TypedDict):
dmarc: bool
class PolicyOverrideReason(TypedDict, total=False):
"""DMARC policy override reason"""
type: NotRequired[str]
comment: NotRequired[Optional[str]]
class PolicyEvaluated(TypedDict):
"""DMARC policy evaluation result"""
disposition: str
dkim: str
spf: str
policy_override_reasons: list[dict[str, Any]]
policy_override_reasons: list[PolicyOverrideReason]
class DKIMAuthResult(TypedDict):
@@ -154,7 +160,7 @@ class Identifiers(TypedDict):
class ParsedReportRecord(TypedDict):
"""A parsed DMARC aggregate report record"""
source: dict[str, Any]
source: IPAddressInfo
count: int
alignment: AlignmentInfo
policy_evaluated: PolicyEvaluated
@@ -162,6 +168,13 @@ class ParsedReportRecord(TypedDict):
auth_results: AuthResults
class ParsedReportRecordWithMetadata(ParsedReportRecord, total=False):
"""A parsed DMARC report record with normalization metadata"""
normalized_timespan: bool
interval_begin: NotRequired[str]
interval_end: NotRequired[str]
class ReportMetadata(TypedDict, total=False):
"""DMARC report metadata"""
org_name: str
@@ -204,12 +217,56 @@ class SMTPTLSFailureDetails(TypedDict):
failure_reason_code: NotRequired[Optional[str]]
class SMTPTLSPolicy(TypedDict):
class SMTPTLSPolicy(TypedDict, total=False):
"""SMTP TLS policy information"""
policy_type: str
policy_string: list[str]
policy_domain: str
mx_host_pattern: list[str]
policy_strings: NotRequired[list[str]]
mx_host_patterns: NotRequired[list[str]]
successful_session_count: int
failed_session_count: int
failure_details: NotRequired[list[SMTPTLSFailureDetails]]
class ParsedSMTPTLSReport(TypedDict):
"""A complete parsed SMTP TLS report"""
organization_name: str
begin_date: str
end_date: str
contact_info: str
report_id: str
policies: list[SMTPTLSPolicy]
class ParsedForensicReport(TypedDict, total=False):
"""A parsed DMARC forensic report"""
feedback_type: str
user_agent: NotRequired[Optional[str]]
version: NotRequired[Optional[str]]
original_envelope_id: NotRequired[Optional[str]]
original_mail_from: NotRequired[Optional[str]]
original_rcpt_to: NotRequired[Optional[str]]
arrival_date: str
arrival_date_utc: str
subject: NotRequired[str]
message_id: str
authentication_results: str
delivery_result: str
auth_failure: list[str]
reported_domain: str
arrival_date_utc: str
source: IPAddressInfo
authentication_mechanisms: list[str]
dkim_domain: NotRequired[Optional[str]]
sample_headers_only: bool
sample: NotRequired[str]
parsed_sample: NotRequired[dict[str, Any]]
class ReportTypeWrapper(TypedDict):
"""Wrapper for report type identification"""
report_type: str
report: Union[ParsedAggregateReport, ParsedForensicReport, ParsedSMTPTLSReport]
def _bucket_interval_by_day(
@@ -555,7 +612,7 @@ def _parse_report_record(
return new_record
def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]) -> dict[str, Any]:
def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]) -> SMTPTLSFailureDetails:
try:
new_failure_details: dict[str, Any] = {
"result_type": failure_details["result-type"],
@@ -591,7 +648,7 @@ def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]) -> dict[str
raise InvalidSMTPTLSReport(str(e))
def _parse_smtp_tls_report_policy(policy: dict[str, Any]) -> dict[str, Any]:
def _parse_smtp_tls_report_policy(policy: dict[str, Any]) -> SMTPTLSPolicy:
policy_types = ["tlsa", "sts", "no-policy-found"]
try:
policy_domain = policy["policy"]["policy-domain"]
@@ -631,7 +688,7 @@ def _parse_smtp_tls_report_policy(policy: dict[str, Any]) -> dict[str, Any]:
raise InvalidSMTPTLSReport(str(e))
def parse_smtp_tls_report_json(report: str) -> dict[str, Any]:
def parse_smtp_tls_report_json(report: str) -> ParsedSMTPTLSReport:
"""Parses and validates an SMTP TLS report"""
required_fields = [
"organization-name",
@@ -671,7 +728,7 @@ def parse_smtp_tls_report_json(report: str) -> dict[str, Any]:
def parsed_smtp_tls_reports_to_csv_rows(
reports: Union[dict[str, Any], list[dict[str, Any]]],
reports: Union[ParsedSMTPTLSReport, list[ParsedSMTPTLSReport]],
) -> list[dict[str, Any]]:
"""Converts one or more parsed SMTP TLS reports into a list of single
layer dict objects suitable for use in a CSV"""
@@ -707,7 +764,7 @@ def parsed_smtp_tls_reports_to_csv_rows(
return rows
def parsed_smtp_tls_reports_to_csv(reports: dict[str, Any]) -> str:
def parsed_smtp_tls_reports_to_csv(reports: ParsedSMTPTLSReport) -> str:
"""
Converts one or more parsed SMTP TLS reports to flat CSV format, including
headers
@@ -764,7 +821,7 @@ def parse_aggregate_report_xml(
timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24.0,
) -> dict[str, Any]:
) -> ParsedAggregateReport:
"""Parses a DMARC XML report string and returns a consistent dict
Args:
@@ -1042,7 +1099,7 @@ def parse_aggregate_report_file(
dns_timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> dict[str, Any]:
) -> ParsedAggregateReport:
"""Parses a file at the given path, a file-like object. or bytes as an
aggregate DMARC report
@@ -1083,7 +1140,7 @@ def parse_aggregate_report_file(
def parsed_aggregate_reports_to_csv_rows(
reports: list[dict[str, Any]],
reports: list[ParsedAggregateReport],
) -> list[dict[str, Any]]:
"""
Converts one or more parsed aggregate reports to list of dicts in flat CSV
@@ -1208,7 +1265,7 @@ def parsed_aggregate_reports_to_csv_rows(
return rows
def parsed_aggregate_reports_to_csv(reports: list[dict[str, Any]]) -> str:
def parsed_aggregate_reports_to_csv(reports: list[ParsedAggregateReport]) -> str:
"""
Converts one or more parsed aggregate reports to flat CSV format, including
headers
@@ -1287,7 +1344,7 @@ def parse_forensic_report(
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
strip_attachment_payloads: Optional[bool] = False,
) -> dict[str, Any]:
) -> ParsedForensicReport:
"""
Converts a DMARC forensic report and sample to a ``dict``
@@ -1415,7 +1472,7 @@ def parse_forensic_report(
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(reports: list[dict[str, Any]]):
def parsed_forensic_reports_to_csv_rows(reports: list[ParsedForensicReport]):
"""
Converts one or more parsed forensic reports to a list of dicts in flat CSV
format
@@ -1451,7 +1508,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[dict[str, Any]]):
return rows
def parsed_forensic_reports_to_csv(reports: list[dict[str, Any]]) -> str:
def parsed_forensic_reports_to_csv(reports: list[ParsedForensicReport]) -> str:
"""
Converts one or more parsed forensic reports to flat CSV format, including
headers
@@ -1718,7 +1775,7 @@ def parse_report_file(
offline: Optional[bool] = False,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24,
) -> dict[str, Any]:
) -> ReportTypeWrapper:
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes