Use literal dicts instead of ordered dicts and other code cleanup

This commit is contained in:
Sean Whalen
2025-12-24 15:04:10 -05:00
parent b5773c6b4a
commit bb8f4002bf
11 changed files with 280 additions and 309 deletions

View File

@@ -4,20 +4,6 @@
from __future__ import annotations
from typing import (
Dict,
List,
Any,
Union,
Optional,
IO,
Callable,
BinaryIO,
Protocol,
runtime_checkable,
cast,
)
import binascii
import email
import email.utils
@@ -31,31 +17,46 @@ import xml.parsers.expat as expat
import zipfile
import zlib
from base64 import b64decode
from collections import OrderedDict
from csv import DictWriter
from datetime import datetime, timedelta, timezone, tzinfo
from io import BytesIO, StringIO
from typing import (
IO,
Any,
BinaryIO,
Callable,
Dict,
List,
Optional,
Protocol,
Union,
cast,
runtime_checkable,
)
import lxml.etree as etree
import mailparser
import xmltodict
from expiringdict import ExpiringDict
import lxml.etree as etree
from mailsuite.smtp import send_email
from parsedmarc.constants import __version__
from parsedmarc.log import logger
from parsedmarc.mail import (
MailboxConnection,
IMAPConnection,
MSGraphConnection,
GmailConnection,
IMAPConnection,
MailboxConnection,
MSGraphConnection,
)
from parsedmarc.utils import (
convert_outlook_msg,
get_base_domain,
get_ip_address_info,
human_timestamp_to_datetime,
is_outlook_msg,
parse_email,
timestamp_to_human,
)
from parsedmarc.constants import __version__
from parsedmarc.utils import get_base_domain, get_ip_address_info
from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import parse_email
from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime
logger.debug("parsedmarc v{0}".format(__version__))
@@ -243,8 +244,8 @@ def _bucket_interval_by_day(
def _append_parsed_record(
parsed_record: OrderedDict[str, Any],
records: List[OrderedDict[str, Any]],
parsed_record: dict[str, Any],
records: list[dict[str, Any]],
begin_dt: datetime,
end_dt: datetime,
normalize: bool,
@@ -287,7 +288,7 @@ def _append_parsed_record(
def _parse_report_record(
record: OrderedDict,
record: dict[str, Any],
*,
ip_db_path: Optional[str] = None,
always_use_local_files: Optional[bool] = False,
@@ -296,13 +297,13 @@ def _parse_report_record(
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
) -> OrderedDict[str, Any]:
) -> dict[str, Any]:
"""
Converts a record from a DMARC aggregate report into a more consistent
format
Args:
record (OrderedDict): The record to convert
record (dict): The record to convert
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map file
reverse_dns_map_url (str): URL to a reverse DNS map file
@@ -313,10 +314,10 @@ def _parse_report_record(
dns_timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict: The converted record
dict: The converted record
"""
record = record.copy()
new_record = OrderedDict()
new_record: dict[str, Any] = {}
if record["row"]["source_ip"] is None:
raise ValueError("Source IP address is empty")
new_record_source = get_ip_address_info(
@@ -334,14 +335,12 @@ def _parse_report_record(
new_record["source"] = new_record_source
new_record["count"] = int(record["row"]["count"])
policy_evaluated = record["row"]["policy_evaluated"].copy()
new_policy_evaluated = OrderedDict(
[
("disposition", "none"),
("dkim", "fail"),
("spf", "fail"),
("policy_override_reasons", []),
]
)
new_policy_evaluated: dict[str, Any] = {
"disposition": "none",
"dkim": "fail",
"spf": "fail",
"policy_override_reasons": [],
}
if "disposition" in policy_evaluated:
new_policy_evaluated["disposition"] = policy_evaluated["disposition"]
if new_policy_evaluated["disposition"].strip().lower() == "pass":
@@ -378,7 +377,7 @@ def _parse_report_record(
new_record["identifiers"] = record["identities"].copy()
else:
new_record["identifiers"] = record["identifiers"].copy()
new_record["auth_results"] = OrderedDict([("dkim", []), ("spf", [])])
new_record["auth_results"] = {"dkim": [], "spf": []}
if type(new_record["identifiers"]["header_from"]) is str:
lowered_from = new_record["identifiers"]["header_from"].lower()
else:
@@ -397,7 +396,7 @@ def _parse_report_record(
auth_results["dkim"] = [auth_results["dkim"]]
for result in auth_results["dkim"]:
if "domain" in result and result["domain"] is not None:
new_result = OrderedDict([("domain", result["domain"])])
new_result: dict[str, Any] = {"domain": result["domain"]}
if "selector" in result and result["selector"] is not None:
new_result["selector"] = result["selector"]
else:
@@ -412,7 +411,7 @@ def _parse_report_record(
auth_results["spf"] = [auth_results["spf"]]
for result in auth_results["spf"]:
if "domain" in result and result["domain"] is not None:
new_result = OrderedDict([("domain", result["domain"])])
new_result: dict[str, Any] = {"domain": result["domain"]}
if "scope" in result and result["scope"] is not None:
new_result["scope"] = result["scope"]
else:
@@ -452,10 +451,10 @@ def _parse_report_record(
def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]):
try:
new_failure_details = OrderedDict(
result_type=failure_details["result-type"],
failed_session_count=failure_details["failed-session-count"],
)
new_failure_details: dict[str, Any] = {
"result_type": failure_details["result-type"],
"failed_session_count": failure_details["failed-session-count"],
}
if "sending-mta-ip" in failure_details:
new_failure_details["sending_mta_ip"] = failure_details["sending-mta-ip"]
@@ -486,7 +485,7 @@ def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]):
def _parse_smtp_tls_report_policy(policy: dict[str, Any]):
policy_types = ["tlsa", "sts", "no-policy-found"]
try:
policy_domain = policy["policy"]["policy-domain"]
@@ -494,7 +493,10 @@ def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]):
failure_details = []
if policy_type not in policy_types:
raise InvalidSMTPTLSReport(f"Invalid policy type {policy_type}")
new_policy = OrderedDict(policy_domain=policy_domain, policy_type=policy_type)
new_policy: dict[str, Any] = {
"policy_domain": policy_domain,
"policy_type": policy_type,
}
if "policy-string" in policy["policy"]:
if isinstance(policy["policy"]["policy-string"], list):
if len(policy["policy"]["policy-string"]) > 0:
@@ -523,7 +525,7 @@ def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def parse_smtp_tls_report_json(report: str):
def parse_smtp_tls_report_json(report: Union[str, bytes]):
"""Parses and validates an SMTP TLS report"""
required_fields = [
"organization-name",
@@ -534,6 +536,9 @@ def parse_smtp_tls_report_json(report: str):
]
try:
if isinstance(report, bytes):
report = report.decode("utf-8", errors="replace")
policies = []
report_dict = json.loads(report)
for required_field in required_fields:
@@ -545,39 +550,39 @@ def parse_smtp_tls_report_json(report: str):
for policy in report_dict["policies"]:
policies.append(_parse_smtp_tls_report_policy(policy))
new_report = OrderedDict(
organization_name=report_dict["organization-name"],
begin_date=report_dict["date-range"]["start-datetime"],
end_date=report_dict["date-range"]["end-datetime"],
contact_info=report_dict["contact-info"],
report_id=report_dict["report-id"],
policies=policies,
)
new_report: dict[str, Any] = {
"organization_name": report_dict["organization-name"],
"begin_date": report_dict["date-range"]["start-datetime"],
"end_date": report_dict["date-range"]["end-datetime"],
"contact_info": report_dict["contact-info"],
"report_id": report_dict["report-id"],
"policies": policies,
}
return new_report
except KeyError as e:
InvalidSMTPTLSReport(f"Missing required field: {e}")
raise InvalidSMTPTLSReport(f"Missing required field: {e}")
except Exception as e:
raise InvalidSMTPTLSReport(str(e))
def parsed_smtp_tls_reports_to_csv_rows(
reports: Union[OrderedDict[str, Any], List[OrderedDict[str, Any]]],
reports: Union[dict[str, Any], list[dict[str, Any]]],
):
"""Converts one oor more parsed SMTP TLS reports into a list of single
layer OrderedDict objects suitable for use in a CSV"""
if type(reports) is OrderedDict:
layer dict objects suitable for use in a CSV"""
if isinstance(reports, dict):
reports = [reports]
rows = []
for report in reports:
common_fields = OrderedDict(
organization_name=report["organization_name"],
begin_date=report["begin_date"],
end_date=report["end_date"],
report_id=report["report_id"],
)
common_fields = {
"organization_name": report["organization_name"],
"begin_date": report["begin_date"],
"end_date": report["end_date"],
"report_id": report["report_id"],
}
record = common_fields.copy()
for policy in report["policies"]:
if "policy_strings" in policy:
@@ -599,7 +604,7 @@ def parsed_smtp_tls_reports_to_csv_rows(
return rows
def parsed_smtp_tls_reports_to_csv(reports: OrderedDict[str, Any]) -> str:
def parsed_smtp_tls_reports_to_csv(reports: dict[str, Any]) -> str:
"""
Converts one or more parsed SMTP TLS reports to flat CSV format, including
headers
@@ -656,8 +661,8 @@ def parse_aggregate_report_xml(
timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24.0,
) -> OrderedDict[str, Any]:
"""Parses a DMARC XML report string and returns a consistent OrderedDict
) -> dict[str, Any]:
"""Parses a DMARC XML report string and returns a consistent dict
Args:
xml (str): A string of DMARC aggregate report XML
@@ -673,7 +678,7 @@ def parse_aggregate_report_xml(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: The parsed aggregate DMARC report
dict: The parsed aggregate DMARC report
"""
errors = []
# Parse XML and recover from errors
@@ -705,8 +710,8 @@ def parse_aggregate_report_xml(
schema = "draft"
if "version" in report:
schema = report["version"]
new_report = OrderedDict([("xml_schema", schema)])
new_report_metadata = OrderedDict()
new_report: dict[str, Any] = {"xml_schema": schema}
new_report_metadata: dict[str, Any] = {}
if report_metadata["org_name"] is None:
if report_metadata["email"] is not None:
report_metadata["org_name"] = report_metadata["email"].split("@")[-1]
@@ -767,7 +772,7 @@ def parse_aggregate_report_xml(
policy_published = report["policy_published"]
if type(policy_published) is list:
policy_published = policy_published[0]
new_policy_published = OrderedDict()
new_policy_published: dict[str, Any] = {}
new_policy_published["domain"] = policy_published["domain"]
adkim = "r"
if "adkim" in policy_published:
@@ -943,7 +948,7 @@ def parse_aggregate_report_file(
dns_timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> OrderedDict[str, any]:
) -> dict[str, Any]:
"""Parses a file at the given path, a file-like object. or bytes as an
aggregate DMARC report
@@ -961,7 +966,7 @@ def parse_aggregate_report_file(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: The parsed DMARC aggregate report
dict: The parsed DMARC aggregate report
"""
try:
@@ -984,7 +989,7 @@ def parse_aggregate_report_file(
def parsed_aggregate_reports_to_csv_rows(
reports: list[OrderedDict[str, Any]],
reports: Union[dict[str, Any], list[dict[str, Any]]],
) -> list[dict[str, Any]]:
"""
Converts one or more parsed aggregate reports to list of dicts in flat CSV
@@ -1001,7 +1006,7 @@ def parsed_aggregate_reports_to_csv_rows(
def to_str(obj):
return str(obj).lower()
if type(reports) is OrderedDict:
if isinstance(reports, dict):
reports = [reports]
rows = []
@@ -1109,7 +1114,9 @@ def parsed_aggregate_reports_to_csv_rows(
return rows
def parsed_aggregate_reports_to_csv(reports: list[OrderedDict[str, Any]]) -> str:
def parsed_aggregate_reports_to_csv(
reports: Union[dict[str, Any], list[dict[str, Any]]],
) -> str:
"""
Converts one or more parsed aggregate reports to flat CSV format, including
headers
@@ -1188,9 +1195,9 @@ def parse_forensic_report(
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
strip_attachment_payloads: Optional[bool] = False,
) -> OrderedDict[str, Any]:
) -> dict[str, Any]:
"""
Converts a DMARC forensic report and sample to a ``OrderedDict``
Converts a DMARC forensic report and sample to a dict
Args:
feedback_report (str): A message's feedback report as a string
@@ -1208,12 +1215,12 @@ def parse_forensic_report(
forensic report results
Returns:
OrderedDict: A parsed report and sample
dict: A parsed report and sample
"""
delivery_results = ["delivered", "spam", "policy", "reject", "other"]
try:
parsed_report = OrderedDict()
parsed_report: dict[str, Any] = {}
report_values = feedback_report_regex.findall(feedback_report)
for report_value in report_values:
key = report_value[0].lower().replace("-", "_")
@@ -1316,7 +1323,9 @@ def parse_forensic_report(
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict[str, Any]]):
def parsed_forensic_reports_to_csv_rows(
reports: Union[dict[str, Any], list[dict[str, Any]]],
):
"""
Converts one or more parsed forensic reports to a list of dicts in flat CSV
format
@@ -1327,7 +1336,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict[str, Any]]):
Returns:
list: Parsed forensic report data as a list of dicts in flat CSV format
"""
if type(reports) is OrderedDict:
if isinstance(reports, dict):
reports = [reports]
rows = []
@@ -1412,12 +1421,12 @@ def parse_report_email(
always_use_local_files: Optional[bool] = False,
reverse_dns_map_path: Optional[str] = None,
reverse_dns_map_url: Optional[str] = None,
nameservers: list[str] = None,
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
strip_attachment_payloads: Optional[bool] = False,
keep_alive: Optional[callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> OrderedDict[str, Any]:
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24.0,
) -> dict[str, Any]:
"""
Parses a DMARC report from an email
@@ -1436,22 +1445,23 @@ def parse_report_email(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict:
dict:
* ``report_type``: ``aggregate`` or ``forensic``
* ``report``: The parsed report
"""
result = None
msg_date: datetime = datetime.now(timezone.utc)
try:
if is_outlook_msg(input_):
if isinstance(input_, bytes) and is_outlook_msg(input_):
input_ = convert_outlook_msg(input_)
if type(input_) is bytes:
if isinstance(input_, bytes):
input_ = input_.decode(encoding="utf8", errors="replace")
msg = mailparser.parse_from_string(input_)
msg_headers = json.loads(msg.headers_json)
date = email.utils.format_datetime(datetime.now(timezone.utc))
if "Date" in msg_headers:
date = human_timestamp_to_datetime(msg_headers["Date"])
msg_date = human_timestamp_to_datetime(msg_headers["Date"])
date = email.utils.format_datetime(msg_date)
msg = email.message_from_string(input_)
except Exception as e:
@@ -1493,15 +1503,11 @@ def parse_report_email(
if not payload.strip().startswith("{"):
payload = str(b64decode(payload))
smtp_tls_report = parse_smtp_tls_report_json(payload)
return OrderedDict(
[("report_type", "smtp_tls"), ("report", smtp_tls_report)]
)
return {"report_type": "smtp_tls", "report": smtp_tls_report}
elif content_type == "application/tlsrpt+gzip":
payload = extract_report(payload)
smtp_tls_report = parse_smtp_tls_report_json(payload)
return OrderedDict(
[("report_type", "smtp_tls"), ("report", smtp_tls_report)]
)
return {"report_type": "smtp_tls", "report": smtp_tls_report}
elif content_type == "text/plain":
if "A message claiming to be from you has failed" in payload:
try:
@@ -1531,10 +1537,9 @@ def parse_report_email(
if isinstance(payload, bytes):
payload = payload.decode("utf-8", errors="replace")
if payload.strip().startswith("{"):
result = parse_smtp_tls_report_json(payload)
result = OrderedDict(
[("report_type", "smtp_tls"), ("report", smtp_tls_report)]
)
smtp_tls_report = parse_smtp_tls_report_json(payload)
result = {"report_type": "smtp_tls", "report": smtp_tls_report}
return result
elif payload.strip().startswith("<"):
aggregate_report = parse_aggregate_report_xml(
payload,
@@ -1548,9 +1553,7 @@ def parse_report_email(
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
result = OrderedDict(
[("report_type", "aggregate"), ("report", aggregate_report)]
)
result = {"report_type": "aggregate", "report": aggregate_report}
return result
@@ -1574,7 +1577,7 @@ def parse_report_email(
forensic_report = parse_forensic_report(
feedback_report,
sample,
date,
msg_date,
offline=offline,
ip_db_path=ip_db_path,
always_use_local_files=always_use_local_files,
@@ -1594,7 +1597,7 @@ def parse_report_email(
except Exception as e:
raise InvalidForensicReport(e.__str__())
result = OrderedDict([("report_type", "forensic"), ("report", forensic_report)])
result = {"report_type": "forensic", "report": forensic_report}
return result
if result is None:
@@ -1615,7 +1618,7 @@ def parse_report_file(
offline: Optional[bool] = False,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24,
) -> OrderedDict[str, Any]:
) -> dict[str, Any]:
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
@@ -1634,7 +1637,7 @@ def parse_report_file(
keep_alive (callable): Keep alive function
Returns:
OrderedDict: The parsed DMARC report
dict: The parsed DMARC report
"""
if type(input_) is str:
logger.debug("Parsing {0}".format(input_))
@@ -1648,6 +1651,8 @@ def parse_report_file(
file_object.close()
if content.startswith(MAGIC_ZIP) or content.startswith(MAGIC_GZIP):
content = extract_report(content)
results: Optional[dict[str, Any]] = None
try:
report = parse_aggregate_report_file(
content,
@@ -1661,11 +1666,11 @@ def parse_report_file(
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
results = OrderedDict([("report_type", "aggregate"), ("report", report)])
results = {"report_type": "aggregate", "report": report}
except InvalidAggregateReport:
try:
report = parse_smtp_tls_report_json(content)
results = OrderedDict([("report_type", "smtp_tls"), ("report", report)])
results = {"report_type": "smtp_tls", "report": report}
except InvalidSMTPTLSReport:
try:
sa = strip_attachment_payloads
@@ -1684,6 +1689,9 @@ def parse_report_file(
)
except InvalidDMARCReport:
raise ParserError("Not a valid report")
if results is None:
raise ParserError("Not a valid report")
return results
@@ -1699,7 +1707,7 @@ def get_dmarc_reports_from_mbox(
reverse_dns_map_url: Optional[str] = None,
offline: Optional[bool] = False,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> OrderedDict[str, OrderedDict[str, Any]]:
) -> dict[str, list[dict[str, Any]]]:
"""Parses a mailbox in mbox format containing e-mails with attached
DMARC reports
@@ -1718,7 +1726,7 @@ def get_dmarc_reports_from_mbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
"""
aggregate_reports = []
@@ -1767,13 +1775,11 @@ def get_dmarc_reports_from_mbox(
logger.warning(error.__str__())
except mailbox.NoSuchMailboxError:
raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_))
return OrderedDict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
return {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
def get_dmarc_reports_from_mailbox(
@@ -1791,12 +1797,12 @@ def get_dmarc_reports_from_mailbox(
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 6.0,
strip_attachment_payloads: Optional[bool] = False,
results: Optional[OrderedDict[str, Any]] = None,
results: Optional[dict[str, Any]] = None,
batch_size: Optional[int] = 10,
since: Optional[datetime] = None,
create_folders: Optional[bool] = True,
normalize_timespan_threshold_hours: Optional[float] = 24,
) -> OrderedDict[str, OrderedDict[str, Any]]:
) -> dict[str, list[dict[str, Any]]]:
"""
Fetches and parses DMARC reports from a mailbox
@@ -1825,7 +1831,7 @@ def get_dmarc_reports_from_mailbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
"""
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
@@ -2059,13 +2065,11 @@ def get_dmarc_reports_from_mailbox(
except Exception as e:
e = "Error moving message UID {0}: {1}".format(msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
results = OrderedDict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
results = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
if current_time:
total_messages = len(
@@ -2206,7 +2210,7 @@ def append_csv(filename, csv):
def save_output(
results: OrderedDict[str, Any],
results: dict[str, Any],
*,
output_directory: Optional[str] = "output",
aggregate_json_filename: Optional[str] = "aggregate.json",
@@ -2220,7 +2224,7 @@ def save_output(
Save report data in the given directory
Args:
results (OrderedDict): Parsing results
results (dict): Parsing results
output_directory (str): The path to the directory to save in
aggregate_json_filename (str): Filename for the aggregate JSON file
forensic_json_filename (str): Filename for the forensic JSON file
@@ -2292,12 +2296,12 @@ def save_output(
sample_file.write(sample)
def get_report_zip(results: OrderedDict[str, Any]) -> bytes:
def get_report_zip(results: dict[str, Any]) -> bytes:
"""
Creates a zip file of parsed report output
Args:
results (OrderedDict): The parsed results
results (dict): The parsed results
Returns:
bytes: zip file bytes
@@ -2338,7 +2342,7 @@ def get_report_zip(results: OrderedDict[str, Any]) -> bytes:
def email_results(
results: OrderedDict,
results: dict[str, Any],
*,
host: str,
mail_from: str,
@@ -2358,7 +2362,7 @@ def email_results(
Emails parsing results as a zip file
Args:
results (OrderedDict): Parsing results
results (dict): Parsing results
host (str): Mail server hostname or IP address
mail_from: The value of the message from header
mail_to (list): A list of addresses to mail to

View File

@@ -3,54 +3,55 @@
"""A CLI for parsing DMARC reports"""
from argparse import Namespace, ArgumentParser
import os
from configparser import ConfigParser
from glob import glob
import http.client
import json
import logging
import math
import yaml
from collections import OrderedDict
import json
from ssl import CERT_NONE, create_default_context
from multiprocessing import Pipe, Process
import os
import sys
import http.client
from argparse import ArgumentParser, Namespace
from configparser import ConfigParser
from glob import glob
from multiprocessing import Pipe, Process
from ssl import CERT_NONE, create_default_context
import yaml
from tqdm import tqdm
from parsedmarc import (
get_dmarc_reports_from_mailbox,
watch_inbox,
parse_report_file,
get_dmarc_reports_from_mbox,
elastic,
opensearch,
kafkaclient,
splunk,
save_output,
email_results,
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
ParserError,
__version__,
InvalidDMARCReport,
s3,
syslog,
loganalytics,
elastic,
email_results,
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
kafkaclient,
loganalytics,
opensearch,
parse_report_file,
s3,
save_output,
splunk,
syslog,
watch_inbox,
webhook,
)
from parsedmarc.log import logger
from parsedmarc.mail import (
IMAPConnection,
MSGraphConnection,
GmailConnection,
IMAPConnection,
MaildirConnection,
MSGraphConnection,
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
from parsedmarc.log import logger
from parsedmarc.utils import is_mbox, get_reverse_dns, get_base_domain
from parsedmarc import SEEN_AGGREGATE_REPORT_IDS
http.client._MAXHEADERS = 200 # pylint:disable=protected-access
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
setattr(http.client, "_MAXHEADERS", 200)
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
@@ -105,6 +106,7 @@ def _main():
"""Called when the module is executed"""
def get_index_prefix(report):
domain = None
if index_prefix_domain_map is None:
return None
if "policy_published" in report:
@@ -1634,13 +1636,11 @@ def _main():
logger.exception("Mailbox Error")
exit(1)
results = OrderedDict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
results = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
process_reports(results)

View File

@@ -2,30 +2,28 @@
from __future__ import annotations
from typing import Optional, Union, Any
from typing import Any, Optional, Union
from collections import OrderedDict
from elasticsearch_dsl.search import Q
from elasticsearch.helpers import reindex
from elasticsearch_dsl import (
connections,
Object,
Boolean,
Date,
Document,
Index,
Nested,
InnerDoc,
Integer,
Text,
Boolean,
Ip,
Date,
Nested,
Object,
Search,
Text,
connections,
)
from elasticsearch.helpers import reindex
from elasticsearch_dsl.search import Q
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class ElasticsearchError(Exception):
@@ -377,7 +375,7 @@ def migrate_indexes(
def save_aggregate_report_to_elasticsearch(
aggregate_report: OrderedDict[str, Any],
aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -388,7 +386,7 @@ def save_aggregate_report_to_elasticsearch(
Saves a parsed DMARC aggregate report to Elasticsearch
Args:
aggregate_report (OrderedDict): A parsed forensic report
aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -539,7 +537,7 @@ def save_aggregate_report_to_elasticsearch(
def save_forensic_report_to_elasticsearch(
forensic_report: OrderedDict[str, Any],
forensic_report: dict[str, Any],
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -550,7 +548,7 @@ def save_forensic_report_to_elasticsearch(
Saves a parsed DMARC forensic report to Elasticsearch
Args:
forensic_report (OrderedDict): A parsed forensic report
forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -570,7 +568,7 @@ def save_forensic_report_to_elasticsearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers = OrderedDict()
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -706,7 +704,7 @@ def save_forensic_report_to_elasticsearch(
def save_smtp_tls_report_to_elasticsearch(
report: OrderedDict[str, Any],
report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -717,7 +715,7 @@ def save_smtp_tls_report_to_elasticsearch(
Saves a parsed SMTP TLS report to Elasticsearch
Args:
report (OrderedDict): A parsed SMTP TLS report
report (dict): A parsed SMTP TLS report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes

View File

@@ -2,21 +2,19 @@
from __future__ import annotations
from typing import Any
import json
import logging
import logging.handlers
import json
import threading
from collections import OrderedDict
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler
log_context_data = threading.local()
@@ -53,9 +51,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(
self, aggregate_reports: list[OrderedDict[str, Any]]
):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -63,14 +59,12 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(
self, forensic_reports: list[OrderedDict[str, Any]]
):
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: OrderedDict[str, Any]):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))

View File

@@ -2,19 +2,16 @@
from __future__ import annotations
from typing import Any, Optional, Union
from ssl import SSLContext
import json
from ssl import create_default_context
from ssl import SSLContext, create_default_context
from typing import Any, Optional, Union
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
from collections import OrderedDict
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import __version__
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
class KafkaError(RuntimeError):
@@ -66,7 +63,7 @@ class KafkaClient(object):
raise KafkaError("No Kafka brokers available")
@staticmethod
def strip_metadata(report: OrderedDict[str, Any]):
def strip_metadata(report: dict[str, Any]):
"""
Duplicates org_name, org_email and report_id into JSON root
and removes report_metadata key to bring it more inline
@@ -80,7 +77,7 @@ class KafkaClient(object):
return report
@staticmethod
def generate_date_range(report: OrderedDict[str, Any]):
def generate_date_range(report: dict[str, Any]):
"""
Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS
based on begin and end dates for easier parsing in Kibana.
@@ -99,7 +96,7 @@ class KafkaClient(object):
def save_aggregate_reports_to_kafka(
self,
aggregate_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]],
aggregate_reports: Union[dict[str, Any], list[dict[str, Any]]],
aggregate_topic: str,
):
"""
@@ -111,9 +108,7 @@ class KafkaClient(object):
aggregate_topic (str): The name of the Kafka topic
"""
if isinstance(aggregate_reports, dict) or isinstance(
aggregate_reports, OrderedDict
):
if isinstance(aggregate_reports, dict):
aggregate_reports = [aggregate_reports]
if len(aggregate_reports) < 1:
@@ -146,7 +141,7 @@ class KafkaClient(object):
def save_forensic_reports_to_kafka(
self,
forensic_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]],
forensic_reports: Union[dict[str, Any], list[dict[str, Any]]],
forensic_topic: str,
):
"""
@@ -180,7 +175,7 @@ class KafkaClient(object):
def save_smtp_tls_reports_to_kafka(
self,
smtp_tls_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
smtp_tls_reports: Union[list[dict[str, Any]], dict[str, Any]],
smtp_tls_topic: str,
):
"""

View File

@@ -3,13 +3,13 @@
from __future__ import annotations
from typing import Any
from collections import OrderedDict
from parsedmarc.log import logger
from azure.core.exceptions import HttpResponseError
from azure.identity import ClientSecretCredential
from azure.monitor.ingestion import LogsIngestionClient
from parsedmarc.log import logger
class LogAnalyticsException(Exception):
"""Raised when an Elasticsearch error occurs"""
@@ -133,7 +133,7 @@ class LogAnalyticsClient(object):
def publish_results(
self,
results: OrderedDict[str, OrderedDict[str, Any]],
results: dict[str, Any],
save_aggregate: bool,
save_forensic: bool,
save_smtp_tls: bool,

View File

@@ -2,30 +2,28 @@
from __future__ import annotations
from typing import Optional, Union, Any
from collections import OrderedDict
from typing import Any, Optional, Union
from opensearchpy import (
Q,
connections,
Object,
Boolean,
Date,
Document,
Index,
Nested,
InnerDoc,
Integer,
Text,
Boolean,
Ip,
Date,
Nested,
Object,
Q,
Search,
Text,
connections,
)
from opensearchpy.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class OpenSearchError(Exception):
@@ -377,7 +375,7 @@ def migrate_indexes(
def save_aggregate_report_to_opensearch(
aggregate_report: OrderedDict[str, Any],
aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -388,7 +386,7 @@ def save_aggregate_report_to_opensearch(
Saves a parsed DMARC aggregate report to OpenSearch
Args:
aggregate_report (OrderedDict): A parsed forensic report
aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -441,6 +439,8 @@ def save_aggregate_report_to_opensearch(
)
if len(existing) > 0:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
@@ -539,7 +539,7 @@ def save_aggregate_report_to_opensearch(
def save_forensic_report_to_opensearch(
forensic_report: OrderedDict[str, Any],
forensic_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -550,7 +550,7 @@ def save_forensic_report_to_opensearch(
Saves a parsed DMARC forensic report to OpenSearch
Args:
forensic_report (OrderedDict): A parsed forensic report
forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -570,7 +570,7 @@ def save_forensic_report_to_opensearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers = OrderedDict()
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -706,7 +706,7 @@ def save_forensic_report_to_opensearch(
def save_smtp_tls_report_to_opensearch(
report: OrderedDict[str, Any],
report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -717,7 +717,7 @@ def save_smtp_tls_report_to_opensearch(
Saves a parsed SMTP TLS report to OpenSearch
Args:
report (OrderedDict): A parsed SMTP TLS report
report (dict): A parsed SMTP TLS report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes

View File

@@ -2,13 +2,11 @@
from __future__ import annotations
import json
from typing import Any
import json
import boto3
from collections import OrderedDict
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -55,16 +53,16 @@ class S3Client(object):
)
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore
def save_aggregate_report_to_s3(self, report: OrderedDict[str, Any]):
def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate")
def save_forensic_report_to_s3(self, report: OrderedDict[str, Any]):
def save_forensic_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "forensic")
def save_smtp_tls_report_to_s3(self, report: OrderedDict[str, Any]):
def save_smtp_tls_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "smtp_tls")
def save_report_to_s3(self, report: OrderedDict[str, Any], report_type: str):
def save_report_to_s3(self, report: dict[str, Any], report_type: str):
if report_type == "smtp_tls":
report_date = report["begin_date"]
report_id = report["report_id"]

View File

@@ -2,16 +2,13 @@
from __future__ import annotations
from typing import Any, Union
from collections import OrderedDict
from urllib.parse import urlparse
import socket
import json
import socket
from typing import Any, Union
from urllib.parse import urlparse
import urllib3
import requests
import urllib3
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
@@ -73,7 +70,7 @@ class HECClient(object):
def save_aggregate_reports_to_splunk(
self,
aggregate_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
aggregate_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
"""
Saves aggregate DMARC reports to Splunk
@@ -139,7 +136,7 @@ class HECClient(object):
def save_forensic_reports_to_splunk(
self,
forensic_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
forensic_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
"""
Saves forensic DMARC reports to Splunk
@@ -175,7 +172,7 @@ class HECClient(object):
raise SplunkError(response["text"])
def save_smtp_tls_reports_to_splunk(
self, reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]]
self, reports: Union[list[dict[str, Any]], dict[str, Any]]
):
"""
Saves aggregate DMARC reports to Splunk

View File

@@ -3,15 +3,11 @@
from __future__ import annotations
import json
import logging
import logging.handlers
from typing import Any
from collections import OrderedDict
import json
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
@@ -36,23 +32,17 @@ class SyslogClient(object):
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
self.logger.addHandler(log_handler)
def save_aggregate_report_to_syslog(
self, aggregate_reports: list[OrderedDict[str, Any]]
):
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog(
self, forensic_reports: list[OrderedDict[str, Any]]
):
def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_syslog(
self, smtp_tls_reports: list[OrderedDict[str, Any]]
):
def save_smtp_tls_report_to_syslog(self, smtp_tls_reports: list[dict[str, Any]]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))

View File

@@ -4,26 +4,23 @@
from __future__ import annotations
import base64
import csv
import hashlib
import io
import json
import logging
import mailbox
import os
import re
import shutil
import subprocess
import tempfile
from datetime import datetime, timedelta, timezone
from typing import Optional, Union
import logging
import os
from datetime import datetime
from datetime import timezone
from datetime import timedelta
from collections import OrderedDict
from expiringdict import ExpiringDict
import tempfile
import subprocess
import shutil
import mailparser
import json
import hashlib
import base64
import mailbox
import re
import csv
import io
from expiringdict import ExpiringDict
try:
from importlib.resources import files
@@ -32,19 +29,19 @@ except ImportError:
from importlib.resources import files
from dateutil.parser import parse as parse_date
import dns.reversename
import dns.resolver
import dns.exception
import dns.resolver
import dns.reversename
import geoip2.database
import geoip2.errors
import publicsuffixlist
import requests
from dateutil.parser import parse as parse_date
from parsedmarc.log import logger
import parsedmarc.resources.dbip
import parsedmarc.resources.maps
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
@@ -422,7 +419,7 @@ def get_ip_address_info(
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
timeout: Optional[float] = 2.0,
) -> OrderedDict[str, str]:
) -> dict[str, str]:
"""
Returns reverse DNS and country information for the given IP address
@@ -440,7 +437,7 @@ def get_ip_address_info(
timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict: ``ip_address``, ``reverse_dns``, ``country``
dict: ``ip_address``, ``reverse_dns``, ``country``
"""
ip_address = ip_address.lower()
@@ -449,7 +446,7 @@ def get_ip_address_info(
if info:
logger.debug(f"IP address {ip_address} was found in cache")
return info
info = OrderedDict()
info: dict[str, str] = {}
info["ip_address"] = ip_address
if offline:
reverse_dns = None
@@ -487,7 +484,7 @@ def get_ip_address_info(
return info
def parse_email_address(original_address: str) -> OrderedDict[str, str]:
def parse_email_address(original_address: str) -> dict[str, str]:
if original_address[0] == "":
display_name = None
else:
@@ -500,14 +497,12 @@ def parse_email_address(original_address: str) -> OrderedDict[str, str]:
local = address_parts[0].lower()
domain = address_parts[-1].lower()
return OrderedDict(
[
("display_name", display_name),
("address", address),
("local", local),
("domain", domain),
]
)
return {
"display_name": display_name,
"address": address,
"local": local,
"domain": domain,
}
def get_filename_safe_string(string: str) -> str: