Compare commits

...

3 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
a4ac5accd2 Add TypedDict definitions to replace dict[str, Any] for better typing
- Added comprehensive TypedDict definitions for all major data structures
- IPAddressInfo, EmailAddress, ReverseDNSService in utils.py
- AggregateReport, ForensicReport, SMTPTLSReport and related types in __init__.py
- Updated function signatures throughout codebase to use TypedDict types
- Reduced dict[str, Any] usage from 50+ to 11 (remaining are legitimate generic cases)
- All tests pass successfully

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-19 21:29:14 +00:00
copilot-swe-agent[bot]
639f8f674b Replace dict() and OrderedDict() with literal dictionaries for better typing
- Converted all dict() calls to {} literals
- Replaced OrderedDict() with {} literals (Python 3.7+ preserves insertion order)
- Updated all OrderedDict type hints to dict type hints
- Removed unused OrderedDict imports from all files
- All tests pass successfully
- Code passes ruff checks

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-19 21:04:56 +00:00
copilot-swe-agent[bot]
380df4df7c Initial plan 2025-12-19 20:49:40 +00:00
11 changed files with 360 additions and 167 deletions

View File

@@ -4,7 +4,7 @@
from __future__ import annotations
from typing import Dict, List, Any, Union, Optional, IO, Callable
from typing import Dict, List, Any, Union, Optional, IO, Callable, TypedDict
import binascii
import email
@@ -19,7 +19,6 @@ import xml.parsers.expat as expat
import zipfile
import zlib
from base64 import b64decode
from collections import OrderedDict
from csv import DictWriter
from datetime import datetime, timedelta, timezone, tzinfo
from io import BytesIO, StringIO
@@ -39,12 +38,177 @@ from parsedmarc.mail import (
)
from parsedmarc.constants import __version__
from parsedmarc.utils import get_base_domain, get_ip_address_info
from parsedmarc.utils import get_base_domain, get_ip_address_info, IPAddressInfo
from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import parse_email
from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime
# TypedDict definitions for DMARC report structures
class PolicyOverrideReason(TypedDict, total=False):
"""Reason for DMARC policy override"""
type: str
comment: Optional[str]
class PolicyEvaluated(TypedDict):
"""DMARC policy evaluation result"""
disposition: str
dkim: str
spf: str
policy_override_reasons: list[PolicyOverrideReason]
class Alignment(TypedDict):
"""DMARC alignment information"""
spf: bool
dkim: bool
dmarc: bool
class DKIMResult(TypedDict, total=False):
"""DKIM authentication result"""
domain: str
selector: str
result: str
class SPFResult(TypedDict, total=False):
"""SPF authentication result"""
domain: str
scope: str
result: str
class AuthResults(TypedDict):
"""Authentication results"""
dkim: list[DKIMResult]
spf: list[SPFResult]
class DMARCIdentifiers(TypedDict):
"""DMARC identifiers"""
header_from: str
envelope_from: str
envelope_to: Optional[str]
class DMARCRecord(TypedDict):
"""Parsed DMARC aggregate record"""
source: IPAddressInfo
count: int
alignment: Alignment
policy_evaluated: PolicyEvaluated
identifiers: DMARCIdentifiers
auth_results: AuthResults
class PublishedPolicy(TypedDict):
"""Published DMARC policy"""
domain: str
adkim: str
aspf: str
p: str
sp: str
pct: str
fo: str
class ReportMetadata(TypedDict, total=False):
"""DMARC report metadata"""
org_name: str
org_email: str
org_extra_contact_info: Optional[str]
report_id: str
begin_date: str
end_date: str
errors: list[str]
timespan_requires_normalization: bool
original_timespan_seconds: int
class AggregateReport(TypedDict):
"""Parsed DMARC aggregate report"""
xml_schema: str
report_metadata: ReportMetadata
policy_published: PublishedPolicy
records: list[DMARCRecord]
class SMTPTLSFailureDetails(TypedDict, total=False):
"""SMTP TLS failure details"""
result_type: str
failed_session_count: int
sending_mta_ip: Optional[str]
receiving_ip: Optional[str]
receiving_mx_hostname: Optional[str]
receiving_mx_helo: Optional[str]
additional_info_uri: Optional[str]
failure_reason_code: Optional[str]
class SMTPTLSPolicy(TypedDict, total=False):
"""SMTP TLS policy"""
policy_domain: str
policy_type: str
policy_strings: Optional[list[str]]
mx_host_patterns: Optional[list[str]]
successful_session_count: int
total_successful_session_count: int
total_failure_session_count: int
failure_details: list[SMTPTLSFailureDetails]
class SMTPTLSReport(TypedDict):
"""Parsed SMTP TLS report"""
organization_name: str
begin_date: str
end_date: str
contact_info: str
report_id: str
policies: list[SMTPTLSPolicy]
class ForensicReport(TypedDict, total=False):
"""Parsed DMARC forensic report"""
feedback_type: str
user_agent: Optional[str]
version: Optional[str]
original_envelope_id: Optional[str]
original_mail_from: Optional[str]
original_rcpt_to: Optional[str]
arrival_date: str
arrival_date_utc: str
subject: Optional[str]
message_id: Optional[str]
authentication_results: Optional[str]
dkim_domain: Optional[str]
source_ip_address: Optional[str]
source_country: Optional[str]
source_reverse_dns: Optional[str]
source_base_domain: Optional[str]
delivery_result: Optional[str]
auth_failure: Optional[list[str]]
reported_domain: Optional[str]
arrival_date_utc: str
sample: Optional[str]
parsed_sample: Optional[dict]
sample_headers_only: bool
class ParsedReport(TypedDict):
"""Container for parsed report with type"""
report_type: str
report: Union[AggregateReport, ForensicReport, SMTPTLSReport]
class ParseResults(TypedDict):
"""Results from parsing multiple reports"""
aggregate_reports: list[AggregateReport]
forensic_reports: list[ForensicReport]
smtp_tls_reports: list[SMTPTLSReport]
logger.debug("parsedmarc v{0}".format(__version__))
feedback_report_regex = re.compile(r"^([\w\-]+): (.+)$", re.MULTILINE)
@@ -70,7 +234,7 @@ EMAIL_SAMPLE_CONTENT_TYPES = (
IP_ADDRESS_CACHE = ExpiringDict(max_len=10000, max_age_seconds=14400)
SEEN_AGGREGATE_REPORT_IDS = ExpiringDict(max_len=100000000, max_age_seconds=3600)
REVERSE_DNS_MAP = dict()
REVERSE_DNS_MAP = {}
class ParserError(RuntimeError):
@@ -231,8 +395,8 @@ def _bucket_interval_by_day(
def _append_parsed_record(
parsed_record: OrderedDict[str, Any],
records: List[OrderedDict[str, Any]],
parsed_record: DMARCRecord,
records: list[DMARCRecord],
begin_dt: datetime,
end_dt: datetime,
normalize: bool,
@@ -275,7 +439,7 @@ def _append_parsed_record(
def _parse_report_record(
record: OrderedDict,
record: dict,
*,
ip_db_path: Optional[str] = None,
always_use_local_files: Optional[bool] = False,
@@ -284,13 +448,13 @@ def _parse_report_record(
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
) -> OrderedDict[str, Any]:
) -> DMARCRecord:
"""
Converts a record from a DMARC aggregate report into a more consistent
format
Args:
record (OrderedDict): The record to convert
record (dict): The record to convert
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map file
reverse_dns_map_url (str): URL to a reverse DNS map file
@@ -304,7 +468,7 @@ def _parse_report_record(
OrderedDict: The converted record
"""
record = record.copy()
new_record = OrderedDict()
new_record = {}
if record["row"]["source_ip"] is None:
raise ValueError("Source IP address is empty")
new_record_source = get_ip_address_info(
@@ -322,14 +486,12 @@ def _parse_report_record(
new_record["source"] = new_record_source
new_record["count"] = int(record["row"]["count"])
policy_evaluated = record["row"]["policy_evaluated"].copy()
new_policy_evaluated = OrderedDict(
[
("disposition", "none"),
("dkim", "fail"),
("spf", "fail"),
("policy_override_reasons", []),
]
)
new_policy_evaluated = {
"disposition": "none",
"dkim": "fail",
"spf": "fail",
"policy_override_reasons": [],
}
if "disposition" in policy_evaluated:
new_policy_evaluated["disposition"] = policy_evaluated["disposition"]
if new_policy_evaluated["disposition"].strip().lower() == "pass":
@@ -348,10 +510,11 @@ def _parse_report_record(
and policy_evaluated["dkim"].lower() == "pass"
)
dmarc_aligned = spf_aligned or dkim_aligned
new_record["alignment"] = dict()
new_record["alignment"]["spf"] = spf_aligned
new_record["alignment"]["dkim"] = dkim_aligned
new_record["alignment"]["dmarc"] = dmarc_aligned
new_record["alignment"] = {
"spf": spf_aligned,
"dkim": dkim_aligned,
"dmarc": dmarc_aligned,
}
if "reason" in policy_evaluated:
if type(policy_evaluated["reason"]) is list:
reasons = policy_evaluated["reason"]
@@ -366,7 +529,7 @@ def _parse_report_record(
new_record["identifiers"] = record["identities"].copy()
else:
new_record["identifiers"] = record["identifiers"].copy()
new_record["auth_results"] = OrderedDict([("dkim", []), ("spf", [])])
new_record["auth_results"] = {"dkim": [], "spf": []}
if type(new_record["identifiers"]["header_from"]) is str:
lowered_from = new_record["identifiers"]["header_from"].lower()
else:
@@ -385,7 +548,7 @@ def _parse_report_record(
auth_results["dkim"] = [auth_results["dkim"]]
for result in auth_results["dkim"]:
if "domain" in result and result["domain"] is not None:
new_result = OrderedDict([("domain", result["domain"])])
new_result = {"domain": result["domain"]}
if "selector" in result and result["selector"] is not None:
new_result["selector"] = result["selector"]
else:
@@ -400,7 +563,7 @@ def _parse_report_record(
auth_results["spf"] = [auth_results["spf"]]
for result in auth_results["spf"]:
if "domain" in result and result["domain"] is not None:
new_result = OrderedDict([("domain", result["domain"])])
new_result = {"domain": result["domain"]}
if "scope" in result and result["scope"] is not None:
new_result["scope"] = result["scope"]
else:
@@ -440,10 +603,10 @@ def _parse_report_record(
def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]):
try:
new_failure_details = OrderedDict(
result_type=failure_details["result-type"],
failed_session_count=failure_details["failed-session-count"],
)
new_failure_details = {
"result_type": failure_details["result-type"],
"failed_session_count": failure_details["failed-session-count"],
}
if "sending-mta-ip" in failure_details:
new_failure_details["sending_mta_ip"] = failure_details["sending-mta-ip"]
@@ -474,7 +637,7 @@ def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]):
def _parse_smtp_tls_report_policy(policy: dict[str, Any]):
policy_types = ["tlsa", "sts", "no-policy-found"]
try:
policy_domain = policy["policy"]["policy-domain"]
@@ -482,7 +645,10 @@ def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]):
failure_details = []
if policy_type not in policy_types:
raise InvalidSMTPTLSReport(f"Invalid policy type {policy_type}")
new_policy = OrderedDict(policy_domain=policy_domain, policy_type=policy_type)
new_policy = {
"policy_domain": policy_domain,
"policy_type": policy_type,
}
if "policy-string" in policy["policy"]:
if isinstance(policy["policy"]["policy-string"], list):
if len(policy["policy"]["policy-string"]) > 0:
@@ -511,7 +677,7 @@ def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def parse_smtp_tls_report_json(report: str):
def parse_smtp_tls_report_json(report: str) -> SMTPTLSReport:
"""Parses and validates an SMTP TLS report"""
required_fields = [
"organization-name",
@@ -533,14 +699,14 @@ def parse_smtp_tls_report_json(report: str):
for policy in report_dict["policies"]:
policies.append(_parse_smtp_tls_report_policy(policy))
new_report = OrderedDict(
organization_name=report_dict["organization-name"],
begin_date=report_dict["date-range"]["start-datetime"],
end_date=report_dict["date-range"]["end-datetime"],
contact_info=report_dict["contact-info"],
report_id=report_dict["report-id"],
policies=policies,
)
new_report = {
"organization_name": report_dict["organization-name"],
"begin_date": report_dict["date-range"]["start-datetime"],
"end_date": report_dict["date-range"]["end-datetime"],
"contact_info": report_dict["contact-info"],
"report_id": report_dict["report-id"],
"policies": policies,
}
return new_report
@@ -551,21 +717,21 @@ def parse_smtp_tls_report_json(report: str):
def parsed_smtp_tls_reports_to_csv_rows(
reports: Union[OrderedDict[str, Any], List[OrderedDict[str, Any]]],
reports: Union[SMTPTLSReport, list[SMTPTLSReport]],
):
"""Converts one oor more parsed SMTP TLS reports into a list of single
layer OrderedDict objects suitable for use in a CSV"""
if type(reports) is OrderedDict:
if type(reports) is dict:
reports = [reports]
rows = []
for report in reports:
common_fields = OrderedDict(
organization_name=report["organization_name"],
begin_date=report["begin_date"],
end_date=report["end_date"],
report_id=report["report_id"],
)
common_fields = {
"organization_name": report["organization_name"],
"begin_date": report["begin_date"],
"end_date": report["end_date"],
"report_id": report["report_id"],
}
record = common_fields.copy()
for policy in report["policies"]:
if "policy_strings" in policy:
@@ -587,7 +753,7 @@ def parsed_smtp_tls_reports_to_csv_rows(
return rows
def parsed_smtp_tls_reports_to_csv(reports: OrderedDict[str, Any]) -> str:
def parsed_smtp_tls_reports_to_csv(reports: SMTPTLSReport) -> str:
"""
Converts one or more parsed SMTP TLS reports to flat CSV format, including
headers
@@ -644,7 +810,7 @@ def parse_aggregate_report_xml(
timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24.0,
) -> OrderedDict[str, Any]:
) -> AggregateReport:
"""Parses a DMARC XML report string and returns a consistent OrderedDict
Args:
@@ -693,8 +859,8 @@ def parse_aggregate_report_xml(
schema = "draft"
if "version" in report:
schema = report["version"]
new_report = OrderedDict([("xml_schema", schema)])
new_report_metadata = OrderedDict()
new_report = {"xml_schema": schema}
new_report_metadata = {}
if report_metadata["org_name"] is None:
if report_metadata["email"] is not None:
report_metadata["org_name"] = report_metadata["email"].split("@")[-1]
@@ -755,7 +921,7 @@ def parse_aggregate_report_xml(
policy_published = report["policy_published"]
if type(policy_published) is list:
policy_published = policy_published[0]
new_policy_published = OrderedDict()
new_policy_published = {}
new_policy_published["domain"] = policy_published["domain"]
adkim = "r"
if "adkim" in policy_published:
@@ -922,7 +1088,7 @@ def parse_aggregate_report_file(
dns_timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> OrderedDict[str, any]:
) -> AggregateReport:
"""Parses a file at the given path, a file-like object. or bytes as an
aggregate DMARC report
@@ -963,7 +1129,7 @@ def parse_aggregate_report_file(
def parsed_aggregate_reports_to_csv_rows(
reports: list[OrderedDict[str, Any]],
reports: list[AggregateReport],
) -> list[dict[str, Any]]:
"""
Converts one or more parsed aggregate reports to list of dicts in flat CSV
@@ -980,7 +1146,7 @@ def parsed_aggregate_reports_to_csv_rows(
def to_str(obj):
return str(obj).lower()
if type(reports) is OrderedDict:
if type(reports) is dict:
reports = [reports]
rows = []
@@ -1088,7 +1254,7 @@ def parsed_aggregate_reports_to_csv_rows(
return rows
def parsed_aggregate_reports_to_csv(reports: list[OrderedDict[str, Any]]) -> str:
def parsed_aggregate_reports_to_csv(reports: list[AggregateReport]) -> str:
"""
Converts one or more parsed aggregate reports to flat CSV format, including
headers
@@ -1167,7 +1333,7 @@ def parse_forensic_report(
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
strip_attachment_payloads: Optional[bool] = False,
) -> OrderedDict[str, Any]:
) -> ForensicReport:
"""
Converts a DMARC forensic report and sample to a ``OrderedDict``
@@ -1192,7 +1358,7 @@ def parse_forensic_report(
delivery_results = ["delivered", "spam", "policy", "reject", "other"]
try:
parsed_report = OrderedDict()
parsed_report = {}
report_values = feedback_report_regex.findall(feedback_report)
for report_value in report_values:
key = report_value[0].lower().replace("-", "_")
@@ -1295,7 +1461,7 @@ def parse_forensic_report(
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict[str, Any]]):
def parsed_forensic_reports_to_csv_rows(reports: list[AggregateReport]):
"""
Converts one or more parsed forensic reports to a list of dicts in flat CSV
format
@@ -1306,7 +1472,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict[str, Any]]):
Returns:
list: Parsed forensic report data as a list of dicts in flat CSV format
"""
if type(reports) is OrderedDict:
if type(reports) is dict:
reports = [reports]
rows = []
@@ -1331,7 +1497,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict[str, Any]]):
return rows
def parsed_forensic_reports_to_csv(reports: list[dict[str, Any]]) -> str:
def parsed_forensic_reports_to_csv(reports: list[AggregateReport]) -> str:
"""
Converts one or more parsed forensic reports to flat CSV format, including
headers
@@ -1396,7 +1562,7 @@ def parse_report_email(
strip_attachment_payloads: Optional[bool] = False,
keep_alive: Optional[callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> OrderedDict[str, Any]:
) -> ParsedReport:
"""
Parses a DMARC report from an email
@@ -1472,21 +1638,23 @@ def parse_report_email(
if not payload.strip().startswith("{"):
payload = str(b64decode(payload))
smtp_tls_report = parse_smtp_tls_report_json(payload)
return OrderedDict(
[("report_type", "smtp_tls"), ("report", smtp_tls_report)]
)
return {
"report_type": "smtp_tls",
"report": smtp_tls_report,
}
elif content_type == "application/tlsrpt+gzip":
payload = extract_report(payload)
smtp_tls_report = parse_smtp_tls_report_json(payload)
return OrderedDict(
[("report_type", "smtp_tls"), ("report", smtp_tls_report)]
)
return {
"report_type": "smtp_tls",
"report": smtp_tls_report,
}
elif content_type == "text/plain":
if "A message claiming to be from you has failed" in payload:
try:
parts = payload.split("detected.", 1)
field_matches = text_report_regex.findall(parts[0])
fields = dict()
fields = {}
for match in field_matches:
field_name = match[0].lower().replace(" ", "-")
fields[field_name] = match[1].strip()
@@ -1511,9 +1679,10 @@ def parse_report_email(
payload = payload.decode("utf-8", errors="replace")
if payload.strip().startswith("{"):
result = parse_smtp_tls_report_json(payload)
result = OrderedDict(
[("report_type", "smtp_tls"), ("report", smtp_tls_report)]
)
result = {
"report_type": "smtp_tls",
"report": smtp_tls_report,
}
elif payload.strip().startswith("<"):
aggregate_report = parse_aggregate_report_xml(
payload,
@@ -1527,9 +1696,10 @@ def parse_report_email(
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
result = OrderedDict(
[("report_type", "aggregate"), ("report", aggregate_report)]
)
result = {
"report_type": "aggregate",
"report": aggregate_report,
}
return result
@@ -1573,7 +1743,10 @@ def parse_report_email(
except Exception as e:
raise InvalidForensicReport(e.__str__())
result = OrderedDict([("report_type", "forensic"), ("report", forensic_report)])
result = {
"report_type": "forensic",
"report": forensic_report,
}
return result
if result is None:
@@ -1594,7 +1767,7 @@ def parse_report_file(
offline: Optional[bool] = False,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24,
) -> OrderedDict[str, Any]:
) -> ParsedReport:
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
@@ -1640,11 +1813,17 @@ def parse_report_file(
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
results = OrderedDict([("report_type", "aggregate"), ("report", report)])
results = {
"report_type": "aggregate",
"report": report,
}
except InvalidAggregateReport:
try:
report = parse_smtp_tls_report_json(content)
results = OrderedDict([("report_type", "smtp_tls"), ("report", report)])
results = {
"report_type": "smtp_tls",
"report": report,
}
except InvalidSMTPTLSReport:
try:
sa = strip_attachment_payloads
@@ -1678,7 +1857,7 @@ def get_dmarc_reports_from_mbox(
reverse_dns_map_url: Optional[str] = None,
offline: Optional[bool] = False,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> OrderedDict[str, OrderedDict[str, Any]]:
) -> ParseResults:
"""Parses a mailbox in mbox format containing e-mails with attached
DMARC reports
@@ -1746,13 +1925,11 @@ def get_dmarc_reports_from_mbox(
logger.warning(error.__str__())
except mailbox.NoSuchMailboxError:
raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_))
return OrderedDict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
return {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
def get_dmarc_reports_from_mailbox(
@@ -1770,12 +1947,12 @@ def get_dmarc_reports_from_mailbox(
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 6.0,
strip_attachment_payloads: Optional[bool] = False,
results: Optional[OrderedDict[str, Any]] = None,
results: Optional[dict[str, Any]] = None,
batch_size: Optional[int] = 10,
since: Optional[datetime] = None,
create_folders: Optional[bool] = True,
normalize_timespan_threshold_hours: Optional[float] = 24,
) -> OrderedDict[str, OrderedDict[str, Any]]:
) -> ParseResults:
"""
Fetches and parses DMARC reports from a mailbox
@@ -2038,13 +2215,11 @@ def get_dmarc_reports_from_mailbox(
except Exception as e:
e = "Error moving message UID {0}: {1}".format(msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
results = OrderedDict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
results = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
if current_time:
total_messages = len(
@@ -2185,7 +2360,7 @@ def append_csv(filename, csv):
def save_output(
results: OrderedDict[str, Any],
results: dict[str, Any],
*,
output_directory: Optional[str] = "output",
aggregate_json_filename: Optional[str] = "aggregate.json",
@@ -2271,7 +2446,7 @@ def save_output(
sample_file.write(sample)
def get_report_zip(results: OrderedDict[str, Any]) -> bytes:
def get_report_zip(results: ParseResults) -> bytes:
"""
Creates a zip file of parsed report output
@@ -2317,7 +2492,7 @@ def get_report_zip(results: OrderedDict[str, Any]) -> bytes:
def email_results(
results: OrderedDict,
results: dict,
*,
host: str,
mail_from: str,

View File

@@ -10,7 +10,6 @@ from glob import glob
import logging
import math
import yaml
from collections import OrderedDict
import json
from ssl import CERT_NONE, create_default_context
from multiprocessing import Pipe, Process
@@ -1634,13 +1633,11 @@ def _main():
logger.exception("Mailbox Error")
exit(1)
results = OrderedDict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
results = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
process_reports(results)

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from typing import Optional, Union, Any
from collections import OrderedDict
from elasticsearch_dsl.search import Q
from elasticsearch_dsl import (
@@ -25,7 +24,7 @@ from elasticsearch.helpers import reindex
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
from parsedmarc import InvalidForensicReport, AggregateReport, ForensicReport, SMTPTLSReport
class ElasticsearchError(Exception):
@@ -377,7 +376,7 @@ def migrate_indexes(
def save_aggregate_report_to_elasticsearch(
aggregate_report: OrderedDict[str, Any],
aggregate_report: SMTPTLSReport,
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -539,7 +538,7 @@ def save_aggregate_report_to_elasticsearch(
def save_forensic_report_to_elasticsearch(
forensic_report: OrderedDict[str, Any],
forensic_report: SMTPTLSReport,
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -570,7 +569,7 @@ def save_forensic_report_to_elasticsearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers = OrderedDict()
headers = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -597,7 +596,7 @@ def save_forensic_report_to_elasticsearch(
else:
headers["from"] = " <".join(headers["from"]) + ">"
from_ = dict()
from_ = {}
from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_))
q = q & from_query
@@ -609,7 +608,7 @@ def save_forensic_report_to_elasticsearch(
else:
headers["to"] = " <".join(headers["to"]) + ">"
to_ = dict()
to_ = {}
to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_))
q = q & to_query
@@ -706,7 +705,7 @@ def save_forensic_report_to_elasticsearch(
def save_smtp_tls_report_to_elasticsearch(
report: OrderedDict[str, Any],
report: SMTPTLSReport,
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,

View File

@@ -3,12 +3,12 @@
from __future__ import annotations
from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
import logging
import logging.handlers
import json
import threading
from collections import OrderedDict
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
@@ -54,7 +54,7 @@ class GelfClient(object):
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(
self, aggregate_reports: list[OrderedDict[str, Any]]
self, aggregate_reports: list[AggregateReport]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
@@ -64,13 +64,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(
self, forensic_reports: list[OrderedDict[str, Any]]
self, forensic_reports: list[ForensicReport]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: OrderedDict[str, Any]):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from typing import Any, Optional, Union
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from ssl import SSLContext
import json
@@ -10,7 +11,6 @@ from ssl import create_default_context
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
from collections import OrderedDict
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import __version__
@@ -66,7 +66,7 @@ class KafkaClient(object):
raise KafkaError("No Kafka brokers available")
@staticmethod
def strip_metadata(report: OrderedDict[str, Any]):
def strip_metadata(report: dict[str, Any]):
"""
Duplicates org_name, org_email and report_id into JSON root
and removes report_metadata key to bring it more inline
@@ -80,7 +80,7 @@ class KafkaClient(object):
return report
@staticmethod
def generate_date_range(report: OrderedDict[str, Any]):
def generate_date_range(report: dict[str, Any]):
"""
Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS
based on begin and end dates for easier parsing in Kibana.
@@ -99,7 +99,7 @@ class KafkaClient(object):
def save_aggregate_reports_to_kafka(
self,
aggregate_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]],
aggregate_reports: Union[AggregateReport, list[AggregateReport]],
aggregate_topic: str,
):
"""
@@ -146,7 +146,7 @@ class KafkaClient(object):
def save_forensic_reports_to_kafka(
self,
forensic_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]],
forensic_reports: Union[AggregateReport, list[AggregateReport]],
forensic_topic: str,
):
"""
@@ -180,7 +180,7 @@ class KafkaClient(object):
def save_smtp_tls_reports_to_kafka(
self,
smtp_tls_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
smtp_tls_reports: Union[list[SMTPTLSReport], SMTPTLSReport],
smtp_tls_topic: str,
):
"""

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from typing import Any
from collections import OrderedDict
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from parsedmarc.log import logger
from azure.core.exceptions import HttpResponseError
@@ -133,7 +133,7 @@ class LogAnalyticsClient(object):
def publish_results(
self,
results: OrderedDict[str, OrderedDict[str, Any]],
results: ParseResults,
save_aggregate: bool,
save_forensic: bool,
save_smtp_tls: bool,

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from typing import Optional, Union, Any
from collections import OrderedDict
from opensearchpy import (
Q,
@@ -25,7 +24,7 @@ from opensearchpy.helpers import reindex
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
from parsedmarc import InvalidForensicReport, AggregateReport, ForensicReport, SMTPTLSReport
class OpenSearchError(Exception):
@@ -377,7 +376,7 @@ def migrate_indexes(
def save_aggregate_report_to_opensearch(
aggregate_report: OrderedDict[str, Any],
aggregate_report: AggregateReport,
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -539,7 +538,7 @@ def save_aggregate_report_to_opensearch(
def save_forensic_report_to_opensearch(
forensic_report: OrderedDict[str, Any],
forensic_report: ForensicReport,
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -570,7 +569,7 @@ def save_forensic_report_to_opensearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers = OrderedDict()
headers = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -597,7 +596,7 @@ def save_forensic_report_to_opensearch(
else:
headers["from"] = " <".join(headers["from"]) + ">"
from_ = dict()
from_ = {}
from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_))
q = q & from_query
@@ -609,7 +608,7 @@ def save_forensic_report_to_opensearch(
else:
headers["to"] = " <".join(headers["to"]) + ">"
to_ = dict()
to_ = {}
to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_))
q = q & to_query
@@ -706,7 +705,7 @@ def save_forensic_report_to_opensearch(
def save_smtp_tls_report_to_opensearch(
report: OrderedDict[str, Any],
report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,

View File

@@ -3,11 +3,11 @@
from __future__ import annotations
from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
import json
import boto3
from collections import OrderedDict
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -55,16 +55,16 @@ class S3Client(object):
)
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore
def save_aggregate_report_to_s3(self, report: OrderedDict[str, Any]):
def save_aggregate_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
self.save_report_to_s3(report, "aggregate")
def save_forensic_report_to_s3(self, report: OrderedDict[str, Any]):
def save_forensic_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
self.save_report_to_s3(report, "forensic")
def save_smtp_tls_report_to_s3(self, report: OrderedDict[str, Any]):
def save_smtp_tls_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
self.save_report_to_s3(report, "smtp_tls")
def save_report_to_s3(self, report: OrderedDict[str, Any], report_type: str):
def save_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport], report_type: str):
if report_type == "smtp_tls":
report_date = report["begin_date"]
report_id = report["report_id"]

View File

@@ -3,8 +3,8 @@
from __future__ import annotations
from typing import Any, Union
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from collections import OrderedDict
from urllib.parse import urlparse
import socket
@@ -73,7 +73,7 @@ class HECClient(object):
def save_aggregate_reports_to_splunk(
self,
aggregate_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
aggregate_reports: Union[list[AggregateReport], AggregateReport],
):
"""
Saves aggregate DMARC reports to Splunk
@@ -94,7 +94,7 @@ class HECClient(object):
json_str = ""
for report in aggregate_reports:
for record in report["records"]:
new_report: dict[str, Union[str, int, float, dict]] = dict()
new_report: dict[str, Union[str, int, float, dict]] = {}
for metadata in report["report_metadata"]:
new_report[metadata] = report["report_metadata"][metadata]
new_report["interval_begin"] = record["interval_begin"]
@@ -139,7 +139,7 @@ class HECClient(object):
def save_forensic_reports_to_splunk(
self,
forensic_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
forensic_reports: Union[list[AggregateReport], AggregateReport],
):
"""
Saves forensic DMARC reports to Splunk
@@ -175,7 +175,7 @@ class HECClient(object):
raise SplunkError(response["text"])
def save_smtp_tls_reports_to_splunk(
self, reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]]
self, reports: Union[list[AggregateReport], AggregateReport]
):
"""
Saves aggregate DMARC reports to Splunk

View File

@@ -7,8 +7,8 @@ import logging
import logging.handlers
from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from collections import OrderedDict
import json
@@ -37,21 +37,21 @@ class SyslogClient(object):
self.logger.addHandler(log_handler)
def save_aggregate_report_to_syslog(
self, aggregate_reports: list[OrderedDict[str, Any]]
self, aggregate_reports: list[AggregateReport]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog(
self, forensic_reports: list[OrderedDict[str, Any]]
self, forensic_reports: list[ForensicReport]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_syslog(
self, smtp_tls_reports: list[OrderedDict[str, Any]]
self, smtp_tls_reports: list[SMTPTLSReport]
):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:

View File

@@ -4,14 +4,13 @@
from __future__ import annotations
from typing import Optional, Union
from typing import Optional, Union, TypedDict
import logging
import os
from datetime import datetime
from datetime import timezone
from datetime import timedelta
from collections import OrderedDict
from expiringdict import ExpiringDict
import tempfile
import subprocess
@@ -46,6 +45,32 @@ import parsedmarc.resources.dbip
import parsedmarc.resources.maps
from parsedmarc.constants import USER_AGENT
# TypedDict definitions for better typing
class IPAddressInfo(TypedDict, total=False):
"""Information about an IP address"""
ip_address: str
country: Optional[str]
reverse_dns: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
class EmailAddress(TypedDict, total=False):
"""Parsed email address information"""
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
class ReverseDNSService(TypedDict):
"""Reverse DNS service information"""
name: str
type: Optional[str]
parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
null_file = open(os.devnull, "w")
@@ -342,7 +367,7 @@ def get_service_from_reverse_dns_base_domain(
url: Optional[bool] = None,
offline: Optional[bool] = False,
reverse_dns_map: Optional[bool] = None,
) -> str:
) -> ReverseDNSService:
"""
Returns the service name of a given base domain name from reverse DNS.
@@ -363,7 +388,7 @@ def get_service_from_reverse_dns_base_domain(
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = dict(name=row["name"], type=row["type"])
reverse_dns_map[key] = {"name": row["name"], "type": row["type"]}
base_domain = base_domain.lower().strip()
if url is None:
@@ -373,7 +398,7 @@ def get_service_from_reverse_dns_base_domain(
"resources/maps/base_reverse_dns_map.csv"
)
if reverse_dns_map is None:
reverse_dns_map = dict()
reverse_dns_map = {}
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map) == 0:
@@ -405,7 +430,7 @@ def get_service_from_reverse_dns_base_domain(
try:
service = reverse_dns_map[base_domain]
except KeyError:
service = dict(name=base_domain, type=None)
service = {"name": base_domain, "type": None}
return service
@@ -422,7 +447,7 @@ def get_ip_address_info(
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
timeout: Optional[float] = 2.0,
) -> OrderedDict[str, str]:
) -> IPAddressInfo:
"""
Returns reverse DNS and country information for the given IP address
@@ -449,7 +474,7 @@ def get_ip_address_info(
if info:
logger.debug(f"IP address {ip_address} was found in cache")
return info
info = OrderedDict()
info = {}
info["ip_address"] = ip_address
if offline:
reverse_dns = None
@@ -487,7 +512,7 @@ def get_ip_address_info(
return info
def parse_email_address(original_address: str) -> OrderedDict[str, str]:
def parse_email_address(original_address: str) -> EmailAddress:
if original_address[0] == "":
display_name = None
else:
@@ -500,14 +525,12 @@ def parse_email_address(original_address: str) -> OrderedDict[str, str]:
local = address_parts[0].lower()
domain = address_parts[-1].lower()
return OrderedDict(
[
("display_name", display_name),
("address", address),
("local", local),
("domain", domain),
]
)
return {
"display_name": display_name,
"address": address,
"local": local,
"domain": domain,
}
def get_filename_safe_string(string: str) -> str: