diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index b65d10d..6784ef6 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -4,20 +4,6 @@ from __future__ import annotations -from typing import ( - Dict, - List, - Any, - Union, - Optional, - IO, - Callable, - BinaryIO, - Protocol, - runtime_checkable, - cast, -) - import binascii import email import email.utils @@ -31,31 +17,46 @@ import xml.parsers.expat as expat import zipfile import zlib from base64 import b64decode -from collections import OrderedDict from csv import DictWriter from datetime import datetime, timedelta, timezone, tzinfo from io import BytesIO, StringIO +from typing import ( + IO, + Any, + BinaryIO, + Callable, + Dict, + List, + Optional, + Protocol, + Union, + cast, + runtime_checkable, +) +import lxml.etree as etree import mailparser import xmltodict from expiringdict import ExpiringDict -import lxml.etree as etree from mailsuite.smtp import send_email +from parsedmarc.constants import __version__ from parsedmarc.log import logger from parsedmarc.mail import ( - MailboxConnection, - IMAPConnection, - MSGraphConnection, GmailConnection, + IMAPConnection, + MailboxConnection, + MSGraphConnection, +) +from parsedmarc.utils import ( + convert_outlook_msg, + get_base_domain, + get_ip_address_info, + human_timestamp_to_datetime, + is_outlook_msg, + parse_email, + timestamp_to_human, ) - -from parsedmarc.constants import __version__ -from parsedmarc.utils import get_base_domain, get_ip_address_info -from parsedmarc.utils import is_outlook_msg, convert_outlook_msg -from parsedmarc.utils import parse_email -from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime - logger.debug("parsedmarc v{0}".format(__version__)) @@ -243,8 +244,8 @@ def _bucket_interval_by_day( def _append_parsed_record( - parsed_record: OrderedDict[str, Any], - records: List[OrderedDict[str, Any]], + parsed_record: dict[str, Any], + records: list[dict[str, Any]], begin_dt: datetime, end_dt: datetime, normalize: bool, @@ -287,7 +288,7 @@ def _append_parsed_record( def _parse_report_record( - record: OrderedDict, + record: dict[str, Any], *, ip_db_path: Optional[str] = None, always_use_local_files: Optional[bool] = False, @@ -296,13 +297,13 @@ def _parse_report_record( offline: Optional[bool] = False, nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 2.0, -) -> OrderedDict[str, Any]: +) -> dict[str, Any]: """ Converts a record from a DMARC aggregate report into a more consistent format Args: - record (OrderedDict): The record to convert + record (dict): The record to convert always_use_local_files (bool): Do not download files reverse_dns_map_path (str): Path to a reverse DNS map file reverse_dns_map_url (str): URL to a reverse DNS map file @@ -313,10 +314,10 @@ def _parse_report_record( dns_timeout (float): Sets the DNS timeout in seconds Returns: - OrderedDict: The converted record + dict: The converted record """ record = record.copy() - new_record = OrderedDict() + new_record: dict[str, Any] = {} if record["row"]["source_ip"] is None: raise ValueError("Source IP address is empty") new_record_source = get_ip_address_info( @@ -334,14 +335,12 @@ def _parse_report_record( new_record["source"] = new_record_source new_record["count"] = int(record["row"]["count"]) policy_evaluated = record["row"]["policy_evaluated"].copy() - new_policy_evaluated = OrderedDict( - [ - ("disposition", "none"), - ("dkim", "fail"), - ("spf", "fail"), - ("policy_override_reasons", []), - ] - ) + new_policy_evaluated: dict[str, Any] = { + "disposition": "none", + "dkim": "fail", + "spf": "fail", + "policy_override_reasons": [], + } if "disposition" in policy_evaluated: new_policy_evaluated["disposition"] = policy_evaluated["disposition"] if new_policy_evaluated["disposition"].strip().lower() == "pass": @@ -378,7 +377,7 @@ def _parse_report_record( new_record["identifiers"] = record["identities"].copy() else: new_record["identifiers"] = record["identifiers"].copy() - new_record["auth_results"] = OrderedDict([("dkim", []), ("spf", [])]) + new_record["auth_results"] = {"dkim": [], "spf": []} if type(new_record["identifiers"]["header_from"]) is str: lowered_from = new_record["identifiers"]["header_from"].lower() else: @@ -397,7 +396,7 @@ def _parse_report_record( auth_results["dkim"] = [auth_results["dkim"]] for result in auth_results["dkim"]: if "domain" in result and result["domain"] is not None: - new_result = OrderedDict([("domain", result["domain"])]) + new_result: dict[str, Any] = {"domain": result["domain"]} if "selector" in result and result["selector"] is not None: new_result["selector"] = result["selector"] else: @@ -412,7 +411,7 @@ def _parse_report_record( auth_results["spf"] = [auth_results["spf"]] for result in auth_results["spf"]: if "domain" in result and result["domain"] is not None: - new_result = OrderedDict([("domain", result["domain"])]) + new_result: dict[str, Any] = {"domain": result["domain"]} if "scope" in result and result["scope"] is not None: new_result["scope"] = result["scope"] else: @@ -452,10 +451,10 @@ def _parse_report_record( def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]): try: - new_failure_details = OrderedDict( - result_type=failure_details["result-type"], - failed_session_count=failure_details["failed-session-count"], - ) + new_failure_details: dict[str, Any] = { + "result_type": failure_details["result-type"], + "failed_session_count": failure_details["failed-session-count"], + } if "sending-mta-ip" in failure_details: new_failure_details["sending_mta_ip"] = failure_details["sending-mta-ip"] @@ -486,7 +485,7 @@ def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]): raise InvalidSMTPTLSReport(str(e)) -def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]): +def _parse_smtp_tls_report_policy(policy: dict[str, Any]): policy_types = ["tlsa", "sts", "no-policy-found"] try: policy_domain = policy["policy"]["policy-domain"] @@ -494,7 +493,10 @@ def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]): failure_details = [] if policy_type not in policy_types: raise InvalidSMTPTLSReport(f"Invalid policy type {policy_type}") - new_policy = OrderedDict(policy_domain=policy_domain, policy_type=policy_type) + new_policy: dict[str, Any] = { + "policy_domain": policy_domain, + "policy_type": policy_type, + } if "policy-string" in policy["policy"]: if isinstance(policy["policy"]["policy-string"], list): if len(policy["policy"]["policy-string"]) > 0: @@ -523,7 +525,7 @@ def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]): raise InvalidSMTPTLSReport(str(e)) -def parse_smtp_tls_report_json(report: str): +def parse_smtp_tls_report_json(report: Union[str, bytes]): """Parses and validates an SMTP TLS report""" required_fields = [ "organization-name", @@ -534,6 +536,9 @@ def parse_smtp_tls_report_json(report: str): ] try: + if isinstance(report, bytes): + report = report.decode("utf-8", errors="replace") + policies = [] report_dict = json.loads(report) for required_field in required_fields: @@ -545,39 +550,39 @@ def parse_smtp_tls_report_json(report: str): for policy in report_dict["policies"]: policies.append(_parse_smtp_tls_report_policy(policy)) - new_report = OrderedDict( - organization_name=report_dict["organization-name"], - begin_date=report_dict["date-range"]["start-datetime"], - end_date=report_dict["date-range"]["end-datetime"], - contact_info=report_dict["contact-info"], - report_id=report_dict["report-id"], - policies=policies, - ) + new_report: dict[str, Any] = { + "organization_name": report_dict["organization-name"], + "begin_date": report_dict["date-range"]["start-datetime"], + "end_date": report_dict["date-range"]["end-datetime"], + "contact_info": report_dict["contact-info"], + "report_id": report_dict["report-id"], + "policies": policies, + } return new_report except KeyError as e: - InvalidSMTPTLSReport(f"Missing required field: {e}") + raise InvalidSMTPTLSReport(f"Missing required field: {e}") except Exception as e: raise InvalidSMTPTLSReport(str(e)) def parsed_smtp_tls_reports_to_csv_rows( - reports: Union[OrderedDict[str, Any], List[OrderedDict[str, Any]]], + reports: Union[dict[str, Any], list[dict[str, Any]]], ): """Converts one oor more parsed SMTP TLS reports into a list of single - layer OrderedDict objects suitable for use in a CSV""" - if type(reports) is OrderedDict: + layer dict objects suitable for use in a CSV""" + if isinstance(reports, dict): reports = [reports] rows = [] for report in reports: - common_fields = OrderedDict( - organization_name=report["organization_name"], - begin_date=report["begin_date"], - end_date=report["end_date"], - report_id=report["report_id"], - ) + common_fields = { + "organization_name": report["organization_name"], + "begin_date": report["begin_date"], + "end_date": report["end_date"], + "report_id": report["report_id"], + } record = common_fields.copy() for policy in report["policies"]: if "policy_strings" in policy: @@ -599,7 +604,7 @@ def parsed_smtp_tls_reports_to_csv_rows( return rows -def parsed_smtp_tls_reports_to_csv(reports: OrderedDict[str, Any]) -> str: +def parsed_smtp_tls_reports_to_csv(reports: dict[str, Any]) -> str: """ Converts one or more parsed SMTP TLS reports to flat CSV format, including headers @@ -656,8 +661,8 @@ def parse_aggregate_report_xml( timeout: Optional[float] = 2.0, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: float = 24.0, -) -> OrderedDict[str, Any]: - """Parses a DMARC XML report string and returns a consistent OrderedDict +) -> dict[str, Any]: + """Parses a DMARC XML report string and returns a consistent dict Args: xml (str): A string of DMARC aggregate report XML @@ -673,7 +678,7 @@ def parse_aggregate_report_xml( normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: - OrderedDict: The parsed aggregate DMARC report + dict: The parsed aggregate DMARC report """ errors = [] # Parse XML and recover from errors @@ -705,8 +710,8 @@ def parse_aggregate_report_xml( schema = "draft" if "version" in report: schema = report["version"] - new_report = OrderedDict([("xml_schema", schema)]) - new_report_metadata = OrderedDict() + new_report: dict[str, Any] = {"xml_schema": schema} + new_report_metadata: dict[str, Any] = {} if report_metadata["org_name"] is None: if report_metadata["email"] is not None: report_metadata["org_name"] = report_metadata["email"].split("@")[-1] @@ -767,7 +772,7 @@ def parse_aggregate_report_xml( policy_published = report["policy_published"] if type(policy_published) is list: policy_published = policy_published[0] - new_policy_published = OrderedDict() + new_policy_published: dict[str, Any] = {} new_policy_published["domain"] = policy_published["domain"] adkim = "r" if "adkim" in policy_published: @@ -943,7 +948,7 @@ def parse_aggregate_report_file( dns_timeout: Optional[float] = 2.0, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: Optional[float] = 24.0, -) -> OrderedDict[str, any]: +) -> dict[str, Any]: """Parses a file at the given path, a file-like object. or bytes as an aggregate DMARC report @@ -961,7 +966,7 @@ def parse_aggregate_report_file( normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: - OrderedDict: The parsed DMARC aggregate report + dict: The parsed DMARC aggregate report """ try: @@ -984,7 +989,7 @@ def parse_aggregate_report_file( def parsed_aggregate_reports_to_csv_rows( - reports: list[OrderedDict[str, Any]], + reports: Union[dict[str, Any], list[dict[str, Any]]], ) -> list[dict[str, Any]]: """ Converts one or more parsed aggregate reports to list of dicts in flat CSV @@ -1001,7 +1006,7 @@ def parsed_aggregate_reports_to_csv_rows( def to_str(obj): return str(obj).lower() - if type(reports) is OrderedDict: + if isinstance(reports, dict): reports = [reports] rows = [] @@ -1109,7 +1114,9 @@ def parsed_aggregate_reports_to_csv_rows( return rows -def parsed_aggregate_reports_to_csv(reports: list[OrderedDict[str, Any]]) -> str: +def parsed_aggregate_reports_to_csv( + reports: Union[dict[str, Any], list[dict[str, Any]]], +) -> str: """ Converts one or more parsed aggregate reports to flat CSV format, including headers @@ -1188,9 +1195,9 @@ def parse_forensic_report( nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 2.0, strip_attachment_payloads: Optional[bool] = False, -) -> OrderedDict[str, Any]: +) -> dict[str, Any]: """ - Converts a DMARC forensic report and sample to a ``OrderedDict`` + Converts a DMARC forensic report and sample to a dict Args: feedback_report (str): A message's feedback report as a string @@ -1208,12 +1215,12 @@ def parse_forensic_report( forensic report results Returns: - OrderedDict: A parsed report and sample + dict: A parsed report and sample """ delivery_results = ["delivered", "spam", "policy", "reject", "other"] try: - parsed_report = OrderedDict() + parsed_report: dict[str, Any] = {} report_values = feedback_report_regex.findall(feedback_report) for report_value in report_values: key = report_value[0].lower().replace("-", "_") @@ -1316,7 +1323,9 @@ def parse_forensic_report( raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__())) -def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict[str, Any]]): +def parsed_forensic_reports_to_csv_rows( + reports: Union[dict[str, Any], list[dict[str, Any]]], +): """ Converts one or more parsed forensic reports to a list of dicts in flat CSV format @@ -1327,7 +1336,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict[str, Any]]): Returns: list: Parsed forensic report data as a list of dicts in flat CSV format """ - if type(reports) is OrderedDict: + if isinstance(reports, dict): reports = [reports] rows = [] @@ -1412,12 +1421,12 @@ def parse_report_email( always_use_local_files: Optional[bool] = False, reverse_dns_map_path: Optional[str] = None, reverse_dns_map_url: Optional[str] = None, - nameservers: list[str] = None, + nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 2.0, strip_attachment_payloads: Optional[bool] = False, - keep_alive: Optional[callable] = None, - normalize_timespan_threshold_hours: Optional[float] = 24.0, -) -> OrderedDict[str, Any]: + keep_alive: Optional[Callable] = None, + normalize_timespan_threshold_hours: float = 24.0, +) -> dict[str, Any]: """ Parses a DMARC report from an email @@ -1436,22 +1445,23 @@ def parse_report_email( normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: - OrderedDict: + dict: * ``report_type``: ``aggregate`` or ``forensic`` * ``report``: The parsed report """ result = None + msg_date: datetime = datetime.now(timezone.utc) try: - if is_outlook_msg(input_): + if isinstance(input_, bytes) and is_outlook_msg(input_): input_ = convert_outlook_msg(input_) - if type(input_) is bytes: + if isinstance(input_, bytes): input_ = input_.decode(encoding="utf8", errors="replace") msg = mailparser.parse_from_string(input_) msg_headers = json.loads(msg.headers_json) - date = email.utils.format_datetime(datetime.now(timezone.utc)) if "Date" in msg_headers: - date = human_timestamp_to_datetime(msg_headers["Date"]) + msg_date = human_timestamp_to_datetime(msg_headers["Date"]) + date = email.utils.format_datetime(msg_date) msg = email.message_from_string(input_) except Exception as e: @@ -1493,15 +1503,11 @@ def parse_report_email( if not payload.strip().startswith("{"): payload = str(b64decode(payload)) smtp_tls_report = parse_smtp_tls_report_json(payload) - return OrderedDict( - [("report_type", "smtp_tls"), ("report", smtp_tls_report)] - ) + return {"report_type": "smtp_tls", "report": smtp_tls_report} elif content_type == "application/tlsrpt+gzip": payload = extract_report(payload) smtp_tls_report = parse_smtp_tls_report_json(payload) - return OrderedDict( - [("report_type", "smtp_tls"), ("report", smtp_tls_report)] - ) + return {"report_type": "smtp_tls", "report": smtp_tls_report} elif content_type == "text/plain": if "A message claiming to be from you has failed" in payload: try: @@ -1531,10 +1537,9 @@ def parse_report_email( if isinstance(payload, bytes): payload = payload.decode("utf-8", errors="replace") if payload.strip().startswith("{"): - result = parse_smtp_tls_report_json(payload) - result = OrderedDict( - [("report_type", "smtp_tls"), ("report", smtp_tls_report)] - ) + smtp_tls_report = parse_smtp_tls_report_json(payload) + result = {"report_type": "smtp_tls", "report": smtp_tls_report} + return result elif payload.strip().startswith("<"): aggregate_report = parse_aggregate_report_xml( payload, @@ -1548,9 +1553,7 @@ def parse_report_email( keep_alive=keep_alive, normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) - result = OrderedDict( - [("report_type", "aggregate"), ("report", aggregate_report)] - ) + result = {"report_type": "aggregate", "report": aggregate_report} return result @@ -1574,7 +1577,7 @@ def parse_report_email( forensic_report = parse_forensic_report( feedback_report, sample, - date, + msg_date, offline=offline, ip_db_path=ip_db_path, always_use_local_files=always_use_local_files, @@ -1594,7 +1597,7 @@ def parse_report_email( except Exception as e: raise InvalidForensicReport(e.__str__()) - result = OrderedDict([("report_type", "forensic"), ("report", forensic_report)]) + result = {"report_type": "forensic", "report": forensic_report} return result if result is None: @@ -1615,7 +1618,7 @@ def parse_report_file( offline: Optional[bool] = False, keep_alive: Optional[Callable] = None, normalize_timespan_threshold_hours: Optional[float] = 24, -) -> OrderedDict[str, Any]: +) -> dict[str, Any]: """Parses a DMARC aggregate or forensic file at the given path, a file-like object. or bytes @@ -1634,7 +1637,7 @@ def parse_report_file( keep_alive (callable): Keep alive function Returns: - OrderedDict: The parsed DMARC report + dict: The parsed DMARC report """ if type(input_) is str: logger.debug("Parsing {0}".format(input_)) @@ -1648,6 +1651,8 @@ def parse_report_file( file_object.close() if content.startswith(MAGIC_ZIP) or content.startswith(MAGIC_GZIP): content = extract_report(content) + + results: Optional[dict[str, Any]] = None try: report = parse_aggregate_report_file( content, @@ -1661,11 +1666,11 @@ def parse_report_file( keep_alive=keep_alive, normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) - results = OrderedDict([("report_type", "aggregate"), ("report", report)]) + results = {"report_type": "aggregate", "report": report} except InvalidAggregateReport: try: report = parse_smtp_tls_report_json(content) - results = OrderedDict([("report_type", "smtp_tls"), ("report", report)]) + results = {"report_type": "smtp_tls", "report": report} except InvalidSMTPTLSReport: try: sa = strip_attachment_payloads @@ -1684,6 +1689,9 @@ def parse_report_file( ) except InvalidDMARCReport: raise ParserError("Not a valid report") + + if results is None: + raise ParserError("Not a valid report") return results @@ -1699,7 +1707,7 @@ def get_dmarc_reports_from_mbox( reverse_dns_map_url: Optional[str] = None, offline: Optional[bool] = False, normalize_timespan_threshold_hours: Optional[float] = 24.0, -) -> OrderedDict[str, OrderedDict[str, Any]]: +) -> dict[str, list[dict[str, Any]]]: """Parses a mailbox in mbox format containing e-mails with attached DMARC reports @@ -1718,7 +1726,7 @@ def get_dmarc_reports_from_mbox( normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: - OrderedDict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports`` + dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports`` """ aggregate_reports = [] @@ -1767,13 +1775,11 @@ def get_dmarc_reports_from_mbox( logger.warning(error.__str__()) except mailbox.NoSuchMailboxError: raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_)) - return OrderedDict( - [ - ("aggregate_reports", aggregate_reports), - ("forensic_reports", forensic_reports), - ("smtp_tls_reports", smtp_tls_reports), - ] - ) + return { + "aggregate_reports": aggregate_reports, + "forensic_reports": forensic_reports, + "smtp_tls_reports": smtp_tls_reports, + } def get_dmarc_reports_from_mailbox( @@ -1791,12 +1797,12 @@ def get_dmarc_reports_from_mailbox( nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 6.0, strip_attachment_payloads: Optional[bool] = False, - results: Optional[OrderedDict[str, Any]] = None, + results: Optional[dict[str, Any]] = None, batch_size: Optional[int] = 10, since: Optional[datetime] = None, create_folders: Optional[bool] = True, normalize_timespan_threshold_hours: Optional[float] = 24, -) -> OrderedDict[str, OrderedDict[str, Any]]: +) -> dict[str, list[dict[str, Any]]]: """ Fetches and parses DMARC reports from a mailbox @@ -1825,7 +1831,7 @@ def get_dmarc_reports_from_mailbox( normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: - OrderedDict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports`` + dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports`` """ if delete and test: raise ValueError("delete and test options are mutually exclusive") @@ -2059,13 +2065,11 @@ def get_dmarc_reports_from_mailbox( except Exception as e: e = "Error moving message UID {0}: {1}".format(msg_uid, e) logger.error("Mailbox error: {0}".format(e)) - results = OrderedDict( - [ - ("aggregate_reports", aggregate_reports), - ("forensic_reports", forensic_reports), - ("smtp_tls_reports", smtp_tls_reports), - ] - ) + results = { + "aggregate_reports": aggregate_reports, + "forensic_reports": forensic_reports, + "smtp_tls_reports": smtp_tls_reports, + } if current_time: total_messages = len( @@ -2206,7 +2210,7 @@ def append_csv(filename, csv): def save_output( - results: OrderedDict[str, Any], + results: dict[str, Any], *, output_directory: Optional[str] = "output", aggregate_json_filename: Optional[str] = "aggregate.json", @@ -2220,7 +2224,7 @@ def save_output( Save report data in the given directory Args: - results (OrderedDict): Parsing results + results (dict): Parsing results output_directory (str): The path to the directory to save in aggregate_json_filename (str): Filename for the aggregate JSON file forensic_json_filename (str): Filename for the forensic JSON file @@ -2292,12 +2296,12 @@ def save_output( sample_file.write(sample) -def get_report_zip(results: OrderedDict[str, Any]) -> bytes: +def get_report_zip(results: dict[str, Any]) -> bytes: """ Creates a zip file of parsed report output Args: - results (OrderedDict): The parsed results + results (dict): The parsed results Returns: bytes: zip file bytes @@ -2338,7 +2342,7 @@ def get_report_zip(results: OrderedDict[str, Any]) -> bytes: def email_results( - results: OrderedDict, + results: dict[str, Any], *, host: str, mail_from: str, @@ -2358,7 +2362,7 @@ def email_results( Emails parsing results as a zip file Args: - results (OrderedDict): Parsing results + results (dict): Parsing results host (str): Mail server hostname or IP address mail_from: The value of the message from header mail_to (list): A list of addresses to mail to diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 83c97b2..c7d20d3 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -3,54 +3,55 @@ """A CLI for parsing DMARC reports""" -from argparse import Namespace, ArgumentParser -import os -from configparser import ConfigParser -from glob import glob +import http.client +import json import logging import math -import yaml -from collections import OrderedDict -import json -from ssl import CERT_NONE, create_default_context -from multiprocessing import Pipe, Process +import os import sys -import http.client +from argparse import ArgumentParser, Namespace +from configparser import ConfigParser +from glob import glob +from multiprocessing import Pipe, Process +from ssl import CERT_NONE, create_default_context + +import yaml from tqdm import tqdm from parsedmarc import ( - get_dmarc_reports_from_mailbox, - watch_inbox, - parse_report_file, - get_dmarc_reports_from_mbox, - elastic, - opensearch, - kafkaclient, - splunk, - save_output, - email_results, + SEEN_AGGREGATE_REPORT_IDS, + InvalidDMARCReport, ParserError, __version__, - InvalidDMARCReport, - s3, - syslog, - loganalytics, + elastic, + email_results, gelf, + get_dmarc_reports_from_mailbox, + get_dmarc_reports_from_mbox, + kafkaclient, + loganalytics, + opensearch, + parse_report_file, + s3, + save_output, + splunk, + syslog, + watch_inbox, webhook, ) +from parsedmarc.log import logger from parsedmarc.mail import ( - IMAPConnection, - MSGraphConnection, GmailConnection, + IMAPConnection, MaildirConnection, + MSGraphConnection, ) from parsedmarc.mail.graph import AuthMethod +from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox -from parsedmarc.log import logger -from parsedmarc.utils import is_mbox, get_reverse_dns, get_base_domain -from parsedmarc import SEEN_AGGREGATE_REPORT_IDS - -http.client._MAXHEADERS = 200 # pylint:disable=protected-access +# Increase the max header limit for very large emails. `_MAXHEADERS` is a +# private stdlib attribute and may not exist in type stubs. +setattr(http.client, "_MAXHEADERS", 200) formatter = logging.Formatter( fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s", @@ -105,6 +106,7 @@ def _main(): """Called when the module is executed""" def get_index_prefix(report): + domain = None if index_prefix_domain_map is None: return None if "policy_published" in report: @@ -1634,13 +1636,11 @@ def _main(): logger.exception("Mailbox Error") exit(1) - results = OrderedDict( - [ - ("aggregate_reports", aggregate_reports), - ("forensic_reports", forensic_reports), - ("smtp_tls_reports", smtp_tls_reports), - ] - ) + results = { + "aggregate_reports": aggregate_reports, + "forensic_reports": forensic_reports, + "smtp_tls_reports": smtp_tls_reports, + } process_reports(results) diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index a3249e2..7da745a 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -2,30 +2,28 @@ from __future__ import annotations -from typing import Optional, Union, Any +from typing import Any, Optional, Union -from collections import OrderedDict - -from elasticsearch_dsl.search import Q +from elasticsearch.helpers import reindex from elasticsearch_dsl import ( - connections, - Object, + Boolean, + Date, Document, Index, - Nested, InnerDoc, Integer, - Text, - Boolean, Ip, - Date, + Nested, + Object, Search, + Text, + connections, ) -from elasticsearch.helpers import reindex +from elasticsearch_dsl.search import Q +from parsedmarc import InvalidForensicReport from parsedmarc.log import logger from parsedmarc.utils import human_timestamp_to_datetime -from parsedmarc import InvalidForensicReport class ElasticsearchError(Exception): @@ -377,7 +375,7 @@ def migrate_indexes( def save_aggregate_report_to_elasticsearch( - aggregate_report: OrderedDict[str, Any], + aggregate_report: dict[str, Any], index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -388,7 +386,7 @@ def save_aggregate_report_to_elasticsearch( Saves a parsed DMARC aggregate report to Elasticsearch Args: - aggregate_report (OrderedDict): A parsed forensic report + aggregate_report (dict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes @@ -539,7 +537,7 @@ def save_aggregate_report_to_elasticsearch( def save_forensic_report_to_elasticsearch( - forensic_report: OrderedDict[str, Any], + forensic_report: dict[str, Any], index_suffix: Optional[Any] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -550,7 +548,7 @@ def save_forensic_report_to_elasticsearch( Saves a parsed DMARC forensic report to Elasticsearch Args: - forensic_report (OrderedDict): A parsed forensic report + forensic_report (dict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily @@ -570,7 +568,7 @@ def save_forensic_report_to_elasticsearch( sample_date = forensic_report["parsed_sample"]["date"] sample_date = human_timestamp_to_datetime(sample_date) original_headers = forensic_report["parsed_sample"]["headers"] - headers = OrderedDict() + headers: dict[str, Any] = {} for original_header in original_headers: headers[original_header.lower()] = original_headers[original_header] @@ -706,7 +704,7 @@ def save_forensic_report_to_elasticsearch( def save_smtp_tls_report_to_elasticsearch( - report: OrderedDict[str, Any], + report: dict[str, Any], index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -717,7 +715,7 @@ def save_smtp_tls_report_to_elasticsearch( Saves a parsed SMTP TLS report to Elasticsearch Args: - report (OrderedDict): A parsed SMTP TLS report + report (dict): A parsed SMTP TLS report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes diff --git a/parsedmarc/gelf.py b/parsedmarc/gelf.py index e3b5b77..61b949a 100644 --- a/parsedmarc/gelf.py +++ b/parsedmarc/gelf.py @@ -2,21 +2,19 @@ from __future__ import annotations -from typing import Any - +import json import logging import logging.handlers -import json import threading -from collections import OrderedDict +from typing import Any + +from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler from parsedmarc import ( parsed_aggregate_reports_to_csv_rows, parsed_forensic_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows, ) -from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler - log_context_data = threading.local() @@ -53,9 +51,7 @@ class GelfClient(object): ) self.logger.addHandler(self.handler) - def save_aggregate_report_to_gelf( - self, aggregate_reports: list[OrderedDict[str, Any]] - ): + def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]): rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) for row in rows: log_context_data.parsedmarc = row @@ -63,14 +59,12 @@ class GelfClient(object): log_context_data.parsedmarc = None - def save_forensic_report_to_gelf( - self, forensic_reports: list[OrderedDict[str, Any]] - ): + def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]): rows = parsed_forensic_reports_to_csv_rows(forensic_reports) for row in rows: self.logger.info(json.dumps(row)) - def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: OrderedDict[str, Any]): + def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]): rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) for row in rows: self.logger.info(json.dumps(row)) diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 9e9c2ce..5263846 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -2,19 +2,16 @@ from __future__ import annotations -from typing import Any, Optional, Union -from ssl import SSLContext - import json -from ssl import create_default_context +from ssl import SSLContext, create_default_context +from typing import Any, Optional, Union from kafka import KafkaProducer from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError -from collections import OrderedDict -from parsedmarc.utils import human_timestamp_to_datetime from parsedmarc import __version__ from parsedmarc.log import logger +from parsedmarc.utils import human_timestamp_to_datetime class KafkaError(RuntimeError): @@ -66,7 +63,7 @@ class KafkaClient(object): raise KafkaError("No Kafka brokers available") @staticmethod - def strip_metadata(report: OrderedDict[str, Any]): + def strip_metadata(report: dict[str, Any]): """ Duplicates org_name, org_email and report_id into JSON root and removes report_metadata key to bring it more inline @@ -80,7 +77,7 @@ class KafkaClient(object): return report @staticmethod - def generate_date_range(report: OrderedDict[str, Any]): + def generate_date_range(report: dict[str, Any]): """ Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS based on begin and end dates for easier parsing in Kibana. @@ -99,7 +96,7 @@ class KafkaClient(object): def save_aggregate_reports_to_kafka( self, - aggregate_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]], + aggregate_reports: Union[dict[str, Any], list[dict[str, Any]]], aggregate_topic: str, ): """ @@ -111,9 +108,7 @@ class KafkaClient(object): aggregate_topic (str): The name of the Kafka topic """ - if isinstance(aggregate_reports, dict) or isinstance( - aggregate_reports, OrderedDict - ): + if isinstance(aggregate_reports, dict): aggregate_reports = [aggregate_reports] if len(aggregate_reports) < 1: @@ -146,7 +141,7 @@ class KafkaClient(object): def save_forensic_reports_to_kafka( self, - forensic_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]], + forensic_reports: Union[dict[str, Any], list[dict[str, Any]]], forensic_topic: str, ): """ @@ -180,7 +175,7 @@ class KafkaClient(object): def save_smtp_tls_reports_to_kafka( self, - smtp_tls_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], + smtp_tls_reports: Union[list[dict[str, Any]], dict[str, Any]], smtp_tls_topic: str, ): """ diff --git a/parsedmarc/loganalytics.py b/parsedmarc/loganalytics.py index 1c5daa2..10a941b 100644 --- a/parsedmarc/loganalytics.py +++ b/parsedmarc/loganalytics.py @@ -3,13 +3,13 @@ from __future__ import annotations from typing import Any -from collections import OrderedDict -from parsedmarc.log import logger from azure.core.exceptions import HttpResponseError from azure.identity import ClientSecretCredential from azure.monitor.ingestion import LogsIngestionClient +from parsedmarc.log import logger + class LogAnalyticsException(Exception): """Raised when an Elasticsearch error occurs""" @@ -133,7 +133,7 @@ class LogAnalyticsClient(object): def publish_results( self, - results: OrderedDict[str, OrderedDict[str, Any]], + results: dict[str, Any], save_aggregate: bool, save_forensic: bool, save_smtp_tls: bool, diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 16cc3cf..c21a613 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -2,30 +2,28 @@ from __future__ import annotations -from typing import Optional, Union, Any - -from collections import OrderedDict +from typing import Any, Optional, Union from opensearchpy import ( - Q, - connections, - Object, + Boolean, + Date, Document, Index, - Nested, InnerDoc, Integer, - Text, - Boolean, Ip, - Date, + Nested, + Object, + Q, Search, + Text, + connections, ) from opensearchpy.helpers import reindex +from parsedmarc import InvalidForensicReport from parsedmarc.log import logger from parsedmarc.utils import human_timestamp_to_datetime -from parsedmarc import InvalidForensicReport class OpenSearchError(Exception): @@ -377,7 +375,7 @@ def migrate_indexes( def save_aggregate_report_to_opensearch( - aggregate_report: OrderedDict[str, Any], + aggregate_report: dict[str, Any], index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -388,7 +386,7 @@ def save_aggregate_report_to_opensearch( Saves a parsed DMARC aggregate report to OpenSearch Args: - aggregate_report (OrderedDict): A parsed forensic report + aggregate_report (dict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes @@ -441,6 +439,8 @@ def save_aggregate_report_to_opensearch( ) if len(existing) > 0: + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") raise AlreadySaved( "An aggregate report ID {0} from {1} about {2} " "with a date range of {3} UTC to {4} UTC already " @@ -539,7 +539,7 @@ def save_aggregate_report_to_opensearch( def save_forensic_report_to_opensearch( - forensic_report: OrderedDict[str, Any], + forensic_report: dict[str, Any], index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -550,7 +550,7 @@ def save_forensic_report_to_opensearch( Saves a parsed DMARC forensic report to OpenSearch Args: - forensic_report (OrderedDict): A parsed forensic report + forensic_report (dict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily @@ -570,7 +570,7 @@ def save_forensic_report_to_opensearch( sample_date = forensic_report["parsed_sample"]["date"] sample_date = human_timestamp_to_datetime(sample_date) original_headers = forensic_report["parsed_sample"]["headers"] - headers = OrderedDict() + headers: dict[str, Any] = {} for original_header in original_headers: headers[original_header.lower()] = original_headers[original_header] @@ -706,7 +706,7 @@ def save_forensic_report_to_opensearch( def save_smtp_tls_report_to_opensearch( - report: OrderedDict[str, Any], + report: dict[str, Any], index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -717,7 +717,7 @@ def save_smtp_tls_report_to_opensearch( Saves a parsed SMTP TLS report to OpenSearch Args: - report (OrderedDict): A parsed SMTP TLS report + report (dict): A parsed SMTP TLS report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes diff --git a/parsedmarc/s3.py b/parsedmarc/s3.py index 3fb0a5a..d6778fa 100644 --- a/parsedmarc/s3.py +++ b/parsedmarc/s3.py @@ -2,13 +2,11 @@ from __future__ import annotations +import json from typing import Any -import json import boto3 -from collections import OrderedDict - from parsedmarc.log import logger from parsedmarc.utils import human_timestamp_to_datetime @@ -55,16 +53,16 @@ class S3Client(object): ) self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore - def save_aggregate_report_to_s3(self, report: OrderedDict[str, Any]): + def save_aggregate_report_to_s3(self, report: dict[str, Any]): self.save_report_to_s3(report, "aggregate") - def save_forensic_report_to_s3(self, report: OrderedDict[str, Any]): + def save_forensic_report_to_s3(self, report: dict[str, Any]): self.save_report_to_s3(report, "forensic") - def save_smtp_tls_report_to_s3(self, report: OrderedDict[str, Any]): + def save_smtp_tls_report_to_s3(self, report: dict[str, Any]): self.save_report_to_s3(report, "smtp_tls") - def save_report_to_s3(self, report: OrderedDict[str, Any], report_type: str): + def save_report_to_s3(self, report: dict[str, Any], report_type: str): if report_type == "smtp_tls": report_date = report["begin_date"] report_id = report["report_id"] diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index 0390e44..28f7c0f 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -2,16 +2,13 @@ from __future__ import annotations -from typing import Any, Union - -from collections import OrderedDict - -from urllib.parse import urlparse -import socket import json +import socket +from typing import Any, Union +from urllib.parse import urlparse -import urllib3 import requests +import urllib3 from parsedmarc.constants import USER_AGENT from parsedmarc.log import logger @@ -73,7 +70,7 @@ class HECClient(object): def save_aggregate_reports_to_splunk( self, - aggregate_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], + aggregate_reports: Union[list[dict[str, Any]], dict[str, Any]], ): """ Saves aggregate DMARC reports to Splunk @@ -139,7 +136,7 @@ class HECClient(object): def save_forensic_reports_to_splunk( self, - forensic_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], + forensic_reports: Union[list[dict[str, Any]], dict[str, Any]], ): """ Saves forensic DMARC reports to Splunk @@ -175,7 +172,7 @@ class HECClient(object): raise SplunkError(response["text"]) def save_smtp_tls_reports_to_splunk( - self, reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]] + self, reports: Union[list[dict[str, Any]], dict[str, Any]] ): """ Saves aggregate DMARC reports to Splunk diff --git a/parsedmarc/syslog.py b/parsedmarc/syslog.py index 7502c0c..c08e348 100644 --- a/parsedmarc/syslog.py +++ b/parsedmarc/syslog.py @@ -3,15 +3,11 @@ from __future__ import annotations +import json import logging import logging.handlers - from typing import Any -from collections import OrderedDict - -import json - from parsedmarc import ( parsed_aggregate_reports_to_csv_rows, parsed_forensic_reports_to_csv_rows, @@ -36,23 +32,17 @@ class SyslogClient(object): log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port)) self.logger.addHandler(log_handler) - def save_aggregate_report_to_syslog( - self, aggregate_reports: list[OrderedDict[str, Any]] - ): + def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]): rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) for row in rows: self.logger.info(json.dumps(row)) - def save_forensic_report_to_syslog( - self, forensic_reports: list[OrderedDict[str, Any]] - ): + def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]): rows = parsed_forensic_reports_to_csv_rows(forensic_reports) for row in rows: self.logger.info(json.dumps(row)) - def save_smtp_tls_report_to_syslog( - self, smtp_tls_reports: list[OrderedDict[str, Any]] - ): + def save_smtp_tls_report_to_syslog(self, smtp_tls_reports: list[dict[str, Any]]): rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) for row in rows: self.logger.info(json.dumps(row)) diff --git a/parsedmarc/utils.py b/parsedmarc/utils.py index 6acf9ef..a39b154 100644 --- a/parsedmarc/utils.py +++ b/parsedmarc/utils.py @@ -4,26 +4,23 @@ from __future__ import annotations +import base64 +import csv +import hashlib +import io +import json +import logging +import mailbox +import os +import re +import shutil +import subprocess +import tempfile +from datetime import datetime, timedelta, timezone from typing import Optional, Union -import logging -import os -from datetime import datetime -from datetime import timezone -from datetime import timedelta -from collections import OrderedDict -from expiringdict import ExpiringDict -import tempfile -import subprocess -import shutil import mailparser -import json -import hashlib -import base64 -import mailbox -import re -import csv -import io +from expiringdict import ExpiringDict try: from importlib.resources import files @@ -32,19 +29,19 @@ except ImportError: from importlib.resources import files -from dateutil.parser import parse as parse_date -import dns.reversename -import dns.resolver import dns.exception +import dns.resolver +import dns.reversename import geoip2.database import geoip2.errors import publicsuffixlist import requests +from dateutil.parser import parse as parse_date -from parsedmarc.log import logger import parsedmarc.resources.dbip import parsedmarc.resources.maps from parsedmarc.constants import USER_AGENT +from parsedmarc.log import logger parenthesis_regex = re.compile(r"\s*\(.*\)\s*") @@ -422,7 +419,7 @@ def get_ip_address_info( offline: Optional[bool] = False, nameservers: Optional[list[str]] = None, timeout: Optional[float] = 2.0, -) -> OrderedDict[str, str]: +) -> dict[str, str]: """ Returns reverse DNS and country information for the given IP address @@ -440,7 +437,7 @@ def get_ip_address_info( timeout (float): Sets the DNS timeout in seconds Returns: - OrderedDict: ``ip_address``, ``reverse_dns``, ``country`` + dict: ``ip_address``, ``reverse_dns``, ``country`` """ ip_address = ip_address.lower() @@ -449,7 +446,7 @@ def get_ip_address_info( if info: logger.debug(f"IP address {ip_address} was found in cache") return info - info = OrderedDict() + info: dict[str, str] = {} info["ip_address"] = ip_address if offline: reverse_dns = None @@ -487,7 +484,7 @@ def get_ip_address_info( return info -def parse_email_address(original_address: str) -> OrderedDict[str, str]: +def parse_email_address(original_address: str) -> dict[str, str]: if original_address[0] == "": display_name = None else: @@ -500,14 +497,12 @@ def parse_email_address(original_address: str) -> OrderedDict[str, str]: local = address_parts[0].lower() domain = address_parts[-1].lower() - return OrderedDict( - [ - ("display_name", display_name), - ("address", address), - ("local", local), - ("domain", domain), - ] - ) + return { + "display_name": display_name, + "address": address, + "local": local, + "domain": domain, + } def get_filename_safe_string(string: str) -> str: