diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 9eb4595..39dd8b9 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -4,7 +4,7 @@ from __future__ import annotations -from typing import Dict, List, Any, Union, Optional, IO, Callable +from typing import Dict, List, Any, Union, Optional, IO, Callable, TypedDict import binascii import email @@ -38,12 +38,177 @@ from parsedmarc.mail import ( ) from parsedmarc.constants import __version__ -from parsedmarc.utils import get_base_domain, get_ip_address_info +from parsedmarc.utils import get_base_domain, get_ip_address_info, IPAddressInfo from parsedmarc.utils import is_outlook_msg, convert_outlook_msg from parsedmarc.utils import parse_email from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime +# TypedDict definitions for DMARC report structures +class PolicyOverrideReason(TypedDict, total=False): + """Reason for DMARC policy override""" + type: str + comment: Optional[str] + + +class PolicyEvaluated(TypedDict): + """DMARC policy evaluation result""" + disposition: str + dkim: str + spf: str + policy_override_reasons: list[PolicyOverrideReason] + + +class Alignment(TypedDict): + """DMARC alignment information""" + spf: bool + dkim: bool + dmarc: bool + + +class DKIMResult(TypedDict, total=False): + """DKIM authentication result""" + domain: str + selector: str + result: str + + +class SPFResult(TypedDict, total=False): + """SPF authentication result""" + domain: str + scope: str + result: str + + +class AuthResults(TypedDict): + """Authentication results""" + dkim: list[DKIMResult] + spf: list[SPFResult] + + +class DMARCIdentifiers(TypedDict): + """DMARC identifiers""" + header_from: str + envelope_from: str + envelope_to: Optional[str] + + +class DMARCRecord(TypedDict): + """Parsed DMARC aggregate record""" + source: IPAddressInfo + count: int + alignment: Alignment + policy_evaluated: PolicyEvaluated + identifiers: DMARCIdentifiers + auth_results: AuthResults + + +class PublishedPolicy(TypedDict): + """Published DMARC policy""" + domain: str + adkim: str + aspf: str + p: str + sp: str + pct: str + fo: str + + +class ReportMetadata(TypedDict, total=False): + """DMARC report metadata""" + org_name: str + org_email: str + org_extra_contact_info: Optional[str] + report_id: str + begin_date: str + end_date: str + errors: list[str] + timespan_requires_normalization: bool + original_timespan_seconds: int + + +class AggregateReport(TypedDict): + """Parsed DMARC aggregate report""" + xml_schema: str + report_metadata: ReportMetadata + policy_published: PublishedPolicy + records: list[DMARCRecord] + + +class SMTPTLSFailureDetails(TypedDict, total=False): + """SMTP TLS failure details""" + result_type: str + failed_session_count: int + sending_mta_ip: Optional[str] + receiving_ip: Optional[str] + receiving_mx_hostname: Optional[str] + receiving_mx_helo: Optional[str] + additional_info_uri: Optional[str] + failure_reason_code: Optional[str] + + +class SMTPTLSPolicy(TypedDict, total=False): + """SMTP TLS policy""" + policy_domain: str + policy_type: str + policy_strings: Optional[list[str]] + mx_host_patterns: Optional[list[str]] + successful_session_count: int + total_successful_session_count: int + total_failure_session_count: int + failure_details: list[SMTPTLSFailureDetails] + + +class SMTPTLSReport(TypedDict): + """Parsed SMTP TLS report""" + organization_name: str + begin_date: str + end_date: str + contact_info: str + report_id: str + policies: list[SMTPTLSPolicy] + + +class ForensicReport(TypedDict, total=False): + """Parsed DMARC forensic report""" + feedback_type: str + user_agent: Optional[str] + version: Optional[str] + original_envelope_id: Optional[str] + original_mail_from: Optional[str] + original_rcpt_to: Optional[str] + arrival_date: str + arrival_date_utc: str + subject: Optional[str] + message_id: Optional[str] + authentication_results: Optional[str] + dkim_domain: Optional[str] + source_ip_address: Optional[str] + source_country: Optional[str] + source_reverse_dns: Optional[str] + source_base_domain: Optional[str] + delivery_result: Optional[str] + auth_failure: Optional[list[str]] + reported_domain: Optional[str] + arrival_date_utc: str + sample: Optional[str] + parsed_sample: Optional[dict] + sample_headers_only: bool + + +class ParsedReport(TypedDict): + """Container for parsed report with type""" + report_type: str + report: Union[AggregateReport, ForensicReport, SMTPTLSReport] + + +class ParseResults(TypedDict): + """Results from parsing multiple reports""" + aggregate_reports: list[AggregateReport] + forensic_reports: list[ForensicReport] + smtp_tls_reports: list[SMTPTLSReport] + + logger.debug("parsedmarc v{0}".format(__version__)) feedback_report_regex = re.compile(r"^([\w\-]+): (.+)$", re.MULTILINE) @@ -230,8 +395,8 @@ def _bucket_interval_by_day( def _append_parsed_record( - parsed_record: dict[str, Any], - records: list[dict[str, Any]], + parsed_record: DMARCRecord, + records: list[DMARCRecord], begin_dt: datetime, end_dt: datetime, normalize: bool, @@ -283,7 +448,7 @@ def _parse_report_record( offline: Optional[bool] = False, nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 2.0, -) -> dict[str, Any]: +) -> DMARCRecord: """ Converts a record from a DMARC aggregate report into a more consistent format @@ -512,7 +677,7 @@ def _parse_smtp_tls_report_policy(policy: dict[str, Any]): raise InvalidSMTPTLSReport(str(e)) -def parse_smtp_tls_report_json(report: str): +def parse_smtp_tls_report_json(report: str) -> SMTPTLSReport: """Parses and validates an SMTP TLS report""" required_fields = [ "organization-name", @@ -552,7 +717,7 @@ def parse_smtp_tls_report_json(report: str): def parsed_smtp_tls_reports_to_csv_rows( - reports: Union[dict[str, Any], List[dict[str, Any]]], + reports: Union[SMTPTLSReport, list[SMTPTLSReport]], ): """Converts one oor more parsed SMTP TLS reports into a list of single layer OrderedDict objects suitable for use in a CSV""" @@ -588,7 +753,7 @@ def parsed_smtp_tls_reports_to_csv_rows( return rows -def parsed_smtp_tls_reports_to_csv(reports: dict[str, Any]) -> str: +def parsed_smtp_tls_reports_to_csv(reports: SMTPTLSReport) -> str: """ Converts one or more parsed SMTP TLS reports to flat CSV format, including headers @@ -645,7 +810,7 @@ def parse_aggregate_report_xml( timeout: Optional[float] = 2.0, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: float = 24.0, -) -> dict[str, Any]: +) -> AggregateReport: """Parses a DMARC XML report string and returns a consistent OrderedDict Args: @@ -923,7 +1088,7 @@ def parse_aggregate_report_file( dns_timeout: Optional[float] = 2.0, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: Optional[float] = 24.0, -) -> dict[str, Any]: +) -> AggregateReport: """Parses a file at the given path, a file-like object. or bytes as an aggregate DMARC report @@ -964,7 +1129,7 @@ def parse_aggregate_report_file( def parsed_aggregate_reports_to_csv_rows( - reports: list[dict[str, Any]], + reports: list[AggregateReport], ) -> list[dict[str, Any]]: """ Converts one or more parsed aggregate reports to list of dicts in flat CSV @@ -1089,7 +1254,7 @@ def parsed_aggregate_reports_to_csv_rows( return rows -def parsed_aggregate_reports_to_csv(reports: list[dict[str, Any]]) -> str: +def parsed_aggregate_reports_to_csv(reports: list[AggregateReport]) -> str: """ Converts one or more parsed aggregate reports to flat CSV format, including headers @@ -1168,7 +1333,7 @@ def parse_forensic_report( nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 2.0, strip_attachment_payloads: Optional[bool] = False, -) -> dict[str, Any]: +) -> ForensicReport: """ Converts a DMARC forensic report and sample to a ``OrderedDict`` @@ -1296,7 +1461,7 @@ def parse_forensic_report( raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__())) -def parsed_forensic_reports_to_csv_rows(reports: list[dict[str, Any]]): +def parsed_forensic_reports_to_csv_rows(reports: list[AggregateReport]): """ Converts one or more parsed forensic reports to a list of dicts in flat CSV format @@ -1332,7 +1497,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[dict[str, Any]]): return rows -def parsed_forensic_reports_to_csv(reports: list[dict[str, Any]]) -> str: +def parsed_forensic_reports_to_csv(reports: list[AggregateReport]) -> str: """ Converts one or more parsed forensic reports to flat CSV format, including headers @@ -1397,7 +1562,7 @@ def parse_report_email( strip_attachment_payloads: Optional[bool] = False, keep_alive: Optional[callable] = None, normalize_timespan_threshold_hours: Optional[float] = 24.0, -) -> dict[str, Any]: +) -> ParsedReport: """ Parses a DMARC report from an email @@ -1602,7 +1767,7 @@ def parse_report_file( offline: Optional[bool] = False, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: Optional[float] = 24, -) -> dict[str, Any]: +) -> ParsedReport: """Parses a DMARC aggregate or forensic file at the given path, a file-like object. or bytes @@ -1692,7 +1857,7 @@ def get_dmarc_reports_from_mbox( reverse_dns_map_url: Optional[str] = None, offline: Optional[bool] = False, normalize_timespan_threshold_hours: Optional[float] = 24.0, -) -> dict[str, dict[str, Any]]: +) -> ParseResults: """Parses a mailbox in mbox format containing e-mails with attached DMARC reports @@ -1787,7 +1952,7 @@ def get_dmarc_reports_from_mailbox( since: Optional[datetime] = None, create_folders: Optional[bool] = True, normalize_timespan_threshold_hours: Optional[float] = 24, -) -> dict[str, dict[str, Any]]: +) -> ParseResults: """ Fetches and parses DMARC reports from a mailbox @@ -2281,7 +2446,7 @@ def save_output( sample_file.write(sample) -def get_report_zip(results: dict[str, Any]) -> bytes: +def get_report_zip(results: ParseResults) -> bytes: """ Creates a zip file of parsed report output diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index c627157..4db5ffc 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -24,7 +24,7 @@ from elasticsearch.helpers import reindex from parsedmarc.log import logger from parsedmarc.utils import human_timestamp_to_datetime -from parsedmarc import InvalidForensicReport +from parsedmarc import InvalidForensicReport, AggregateReport, ForensicReport, SMTPTLSReport class ElasticsearchError(Exception): @@ -376,7 +376,7 @@ def migrate_indexes( def save_aggregate_report_to_elasticsearch( - aggregate_report: dict[str, Any], + aggregate_report: SMTPTLSReport, index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -538,7 +538,7 @@ def save_aggregate_report_to_elasticsearch( def save_forensic_report_to_elasticsearch( - forensic_report: dict[str, Any], + forensic_report: SMTPTLSReport, index_suffix: Optional[Any] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -705,7 +705,7 @@ def save_forensic_report_to_elasticsearch( def save_smtp_tls_report_to_elasticsearch( - report: dict[str, Any], + report: SMTPTLSReport, index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, diff --git a/parsedmarc/gelf.py b/parsedmarc/gelf.py index d581aa4..ece545b 100644 --- a/parsedmarc/gelf.py +++ b/parsedmarc/gelf.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any +from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults import logging import logging.handlers @@ -53,7 +54,7 @@ class GelfClient(object): self.logger.addHandler(self.handler) def save_aggregate_report_to_gelf( - self, aggregate_reports: list[dict[str, Any]] + self, aggregate_reports: list[AggregateReport] ): rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) for row in rows: @@ -63,13 +64,13 @@ class GelfClient(object): log_context_data.parsedmarc = None def save_forensic_report_to_gelf( - self, forensic_reports: list[dict[str, Any]] + self, forensic_reports: list[ForensicReport] ): rows = parsed_forensic_reports_to_csv_rows(forensic_reports) for row in rows: self.logger.info(json.dumps(row)) - def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]): + def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport): rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) for row in rows: self.logger.info(json.dumps(row)) diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 269aa97..1f92265 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any, Optional, Union +from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults from ssl import SSLContext import json @@ -98,7 +99,7 @@ class KafkaClient(object): def save_aggregate_reports_to_kafka( self, - aggregate_reports: Union[dict[str, Any], list[dict[str, Any]]], + aggregate_reports: Union[AggregateReport, list[AggregateReport]], aggregate_topic: str, ): """ @@ -145,7 +146,7 @@ class KafkaClient(object): def save_forensic_reports_to_kafka( self, - forensic_reports: Union[dict[str, Any], list[dict[str, Any]]], + forensic_reports: Union[AggregateReport, list[AggregateReport]], forensic_topic: str, ): """ @@ -179,7 +180,7 @@ class KafkaClient(object): def save_smtp_tls_reports_to_kafka( self, - smtp_tls_reports: Union[list[dict[str, Any]], dict[str, Any]], + smtp_tls_reports: Union[list[SMTPTLSReport], SMTPTLSReport], smtp_tls_topic: str, ): """ diff --git a/parsedmarc/loganalytics.py b/parsedmarc/loganalytics.py index b322dca..3f3b9c0 100644 --- a/parsedmarc/loganalytics.py +++ b/parsedmarc/loganalytics.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any +from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults from parsedmarc.log import logger from azure.core.exceptions import HttpResponseError @@ -132,7 +133,7 @@ class LogAnalyticsClient(object): def publish_results( self, - results: dict[str, dict[str, Any]], + results: ParseResults, save_aggregate: bool, save_forensic: bool, save_smtp_tls: bool, diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 5ecf7ae..1e0a639 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -24,7 +24,7 @@ from opensearchpy.helpers import reindex from parsedmarc.log import logger from parsedmarc.utils import human_timestamp_to_datetime -from parsedmarc import InvalidForensicReport +from parsedmarc import InvalidForensicReport, AggregateReport, ForensicReport, SMTPTLSReport class OpenSearchError(Exception): @@ -376,7 +376,7 @@ def migrate_indexes( def save_aggregate_report_to_opensearch( - aggregate_report: dict[str, Any], + aggregate_report: AggregateReport, index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -538,7 +538,7 @@ def save_aggregate_report_to_opensearch( def save_forensic_report_to_opensearch( - forensic_report: dict[str, Any], + forensic_report: ForensicReport, index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, diff --git a/parsedmarc/s3.py b/parsedmarc/s3.py index 65c8a02..6d3a4ec 100644 --- a/parsedmarc/s3.py +++ b/parsedmarc/s3.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any +from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults import json import boto3 @@ -54,16 +55,16 @@ class S3Client(object): ) self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore - def save_aggregate_report_to_s3(self, report: dict[str, Any]): + def save_aggregate_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]): self.save_report_to_s3(report, "aggregate") - def save_forensic_report_to_s3(self, report: dict[str, Any]): + def save_forensic_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]): self.save_report_to_s3(report, "forensic") - def save_smtp_tls_report_to_s3(self, report: dict[str, Any]): + def save_smtp_tls_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]): self.save_report_to_s3(report, "smtp_tls") - def save_report_to_s3(self, report: dict[str, Any], report_type: str): + def save_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport], report_type: str): if report_type == "smtp_tls": report_date = report["begin_date"] report_id = report["report_id"] diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index 44fcd6f..6b58dd4 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any, Union +from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults from urllib.parse import urlparse @@ -72,7 +73,7 @@ class HECClient(object): def save_aggregate_reports_to_splunk( self, - aggregate_reports: Union[list[dict[str, Any]], dict[str, Any]], + aggregate_reports: Union[list[AggregateReport], AggregateReport], ): """ Saves aggregate DMARC reports to Splunk @@ -138,7 +139,7 @@ class HECClient(object): def save_forensic_reports_to_splunk( self, - forensic_reports: Union[list[dict[str, Any]], dict[str, Any]], + forensic_reports: Union[list[AggregateReport], AggregateReport], ): """ Saves forensic DMARC reports to Splunk @@ -174,7 +175,7 @@ class HECClient(object): raise SplunkError(response["text"]) def save_smtp_tls_reports_to_splunk( - self, reports: Union[list[dict[str, Any]], dict[str, Any]] + self, reports: Union[list[AggregateReport], AggregateReport] ): """ Saves aggregate DMARC reports to Splunk diff --git a/parsedmarc/syslog.py b/parsedmarc/syslog.py index 87dbb2f..4ea183b 100644 --- a/parsedmarc/syslog.py +++ b/parsedmarc/syslog.py @@ -7,6 +7,7 @@ import logging import logging.handlers from typing import Any +from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults import json @@ -36,21 +37,21 @@ class SyslogClient(object): self.logger.addHandler(log_handler) def save_aggregate_report_to_syslog( - self, aggregate_reports: list[dict[str, Any]] + self, aggregate_reports: list[AggregateReport] ): rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) for row in rows: self.logger.info(json.dumps(row)) def save_forensic_report_to_syslog( - self, forensic_reports: list[dict[str, Any]] + self, forensic_reports: list[ForensicReport] ): rows = parsed_forensic_reports_to_csv_rows(forensic_reports) for row in rows: self.logger.info(json.dumps(row)) def save_smtp_tls_report_to_syslog( - self, smtp_tls_reports: list[dict[str, Any]] + self, smtp_tls_reports: list[SMTPTLSReport] ): rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) for row in rows: diff --git a/parsedmarc/utils.py b/parsedmarc/utils.py index 50202ae..6c689b3 100644 --- a/parsedmarc/utils.py +++ b/parsedmarc/utils.py @@ -4,7 +4,7 @@ from __future__ import annotations -from typing import Optional, Union +from typing import Optional, Union, TypedDict import logging import os @@ -45,6 +45,32 @@ import parsedmarc.resources.dbip import parsedmarc.resources.maps from parsedmarc.constants import USER_AGENT + +# TypedDict definitions for better typing +class IPAddressInfo(TypedDict, total=False): + """Information about an IP address""" + ip_address: str + country: Optional[str] + reverse_dns: Optional[str] + base_domain: Optional[str] + name: Optional[str] + type: Optional[str] + + +class EmailAddress(TypedDict, total=False): + """Parsed email address information""" + display_name: Optional[str] + address: str + local: Optional[str] + domain: Optional[str] + + +class ReverseDNSService(TypedDict): + """Reverse DNS service information""" + name: str + type: Optional[str] + + parenthesis_regex = re.compile(r"\s*\(.*\)\s*") null_file = open(os.devnull, "w") @@ -341,7 +367,7 @@ def get_service_from_reverse_dns_base_domain( url: Optional[bool] = None, offline: Optional[bool] = False, reverse_dns_map: Optional[bool] = None, -) -> str: +) -> ReverseDNSService: """ Returns the service name of a given base domain name from reverse DNS. @@ -421,7 +447,7 @@ def get_ip_address_info( offline: Optional[bool] = False, nameservers: Optional[list[str]] = None, timeout: Optional[float] = 2.0, -) -> dict[str, str]: +) -> IPAddressInfo: """ Returns reverse DNS and country information for the given IP address @@ -486,7 +512,7 @@ def get_ip_address_info( return info -def parse_email_address(original_address: str) -> dict[str, str]: +def parse_email_address(original_address: str) -> EmailAddress: if original_address[0] == "": display_name = None else: