Compare commits

..

6 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
2174f23eb5 Add comprehensive TypedDicts to minimize Any usage in public APIs
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 22:28:19 +00:00
copilot-swe-agent[bot]
febbb107c4 Fix Python 3.9 compatibility: replace pipe union syntax with Union/Optional
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 22:18:57 +00:00
copilot-swe-agent[bot]
9a64b494e7 Fix code review issues: incomplete isinstance and variable name mismatch
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:45:21 +00:00
copilot-swe-agent[bot]
e93209c766 Fix function signatures and improve type annotations
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:42:25 +00:00
copilot-swe-agent[bot]
d1c22466be Replace OrderedDict with dict and add TypedDict definitions
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:36:57 +00:00
copilot-swe-agent[bot]
3d1b2522d3 Initial plan 2025-12-17 21:19:30 +00:00
11 changed files with 333 additions and 354 deletions

View File

@@ -4,7 +4,7 @@
from __future__ import annotations
from typing import Dict, List, Any, Union, Optional, IO, Callable, TypedDict
from typing import Any, Union, Optional, IO, Callable, TypedDict, NotRequired
import binascii
import email
@@ -38,177 +38,12 @@ from parsedmarc.mail import (
)
from parsedmarc.constants import __version__
from parsedmarc.utils import get_base_domain, get_ip_address_info, IPAddressInfo
from parsedmarc.utils import get_base_domain, get_ip_address_info
from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import parse_email
from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime
# TypedDict definitions for DMARC report structures
class PolicyOverrideReason(TypedDict, total=False):
"""Reason for DMARC policy override"""
type: str
comment: Optional[str]
class PolicyEvaluated(TypedDict):
"""DMARC policy evaluation result"""
disposition: str
dkim: str
spf: str
policy_override_reasons: list[PolicyOverrideReason]
class Alignment(TypedDict):
"""DMARC alignment information"""
spf: bool
dkim: bool
dmarc: bool
class DKIMResult(TypedDict, total=False):
"""DKIM authentication result"""
domain: str
selector: str
result: str
class SPFResult(TypedDict, total=False):
"""SPF authentication result"""
domain: str
scope: str
result: str
class AuthResults(TypedDict):
"""Authentication results"""
dkim: list[DKIMResult]
spf: list[SPFResult]
class DMARCIdentifiers(TypedDict):
"""DMARC identifiers"""
header_from: str
envelope_from: str
envelope_to: Optional[str]
class DMARCRecord(TypedDict):
"""Parsed DMARC aggregate record"""
source: IPAddressInfo
count: int
alignment: Alignment
policy_evaluated: PolicyEvaluated
identifiers: DMARCIdentifiers
auth_results: AuthResults
class PublishedPolicy(TypedDict):
"""Published DMARC policy"""
domain: str
adkim: str
aspf: str
p: str
sp: str
pct: str
fo: str
class ReportMetadata(TypedDict, total=False):
"""DMARC report metadata"""
org_name: str
org_email: str
org_extra_contact_info: Optional[str]
report_id: str
begin_date: str
end_date: str
errors: list[str]
timespan_requires_normalization: bool
original_timespan_seconds: int
class AggregateReport(TypedDict):
"""Parsed DMARC aggregate report"""
xml_schema: str
report_metadata: ReportMetadata
policy_published: PublishedPolicy
records: list[DMARCRecord]
class SMTPTLSFailureDetails(TypedDict, total=False):
"""SMTP TLS failure details"""
result_type: str
failed_session_count: int
sending_mta_ip: Optional[str]
receiving_ip: Optional[str]
receiving_mx_hostname: Optional[str]
receiving_mx_helo: Optional[str]
additional_info_uri: Optional[str]
failure_reason_code: Optional[str]
class SMTPTLSPolicy(TypedDict, total=False):
"""SMTP TLS policy"""
policy_domain: str
policy_type: str
policy_strings: Optional[list[str]]
mx_host_patterns: Optional[list[str]]
successful_session_count: int
total_successful_session_count: int
total_failure_session_count: int
failure_details: list[SMTPTLSFailureDetails]
class SMTPTLSReport(TypedDict):
"""Parsed SMTP TLS report"""
organization_name: str
begin_date: str
end_date: str
contact_info: str
report_id: str
policies: list[SMTPTLSPolicy]
class ForensicReport(TypedDict, total=False):
"""Parsed DMARC forensic report"""
feedback_type: str
user_agent: Optional[str]
version: Optional[str]
original_envelope_id: Optional[str]
original_mail_from: Optional[str]
original_rcpt_to: Optional[str]
arrival_date: str
arrival_date_utc: str
subject: Optional[str]
message_id: Optional[str]
authentication_results: Optional[str]
dkim_domain: Optional[str]
source_ip_address: Optional[str]
source_country: Optional[str]
source_reverse_dns: Optional[str]
source_base_domain: Optional[str]
delivery_result: Optional[str]
auth_failure: Optional[list[str]]
reported_domain: Optional[str]
arrival_date_utc: str
sample: Optional[str]
parsed_sample: Optional[dict]
sample_headers_only: bool
class ParsedReport(TypedDict):
"""Container for parsed report with type"""
report_type: str
report: Union[AggregateReport, ForensicReport, SMTPTLSReport]
class ParseResults(TypedDict):
"""Results from parsing multiple reports"""
aggregate_reports: list[AggregateReport]
forensic_reports: list[ForensicReport]
smtp_tls_reports: list[SMTPTLSReport]
logger.debug("parsedmarc v{0}".format(__version__))
feedback_report_regex = re.compile(r"^([\w\-]+): (.+)$", re.MULTILINE)
@@ -234,7 +69,7 @@ EMAIL_SAMPLE_CONTENT_TYPES = (
IP_ADDRESS_CACHE = ExpiringDict(max_len=10000, max_age_seconds=14400)
SEEN_AGGREGATE_REPORT_IDS = ExpiringDict(max_len=100000000, max_age_seconds=3600)
REVERSE_DNS_MAP = {}
REVERSE_DNS_MAP = dict()
class ParserError(RuntimeError):
@@ -257,11 +92,188 @@ class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered"""
# TypedDict definitions for structured data
class DateIntervalBucket(TypedDict):
"""Represents a time bucket for interval normalization"""
begin: datetime
end: datetime
count: int
class IPAddressInfo(TypedDict, total=False):
"""Information about an IP address"""
ip_address: str
country: NotRequired[Optional[str]]
reverse_dns: NotRequired[Optional[str]]
base_domain: NotRequired[Optional[str]]
type: NotRequired[Optional[str]]
name: NotRequired[Optional[str]]
class AlignmentInfo(TypedDict):
"""DMARC alignment information"""
spf: bool
dkim: bool
dmarc: bool
class PolicyOverrideReason(TypedDict, total=False):
"""DMARC policy override reason"""
type: NotRequired[str]
comment: NotRequired[Optional[str]]
class PolicyEvaluated(TypedDict):
"""DMARC policy evaluation result"""
disposition: str
dkim: str
spf: str
policy_override_reasons: list[PolicyOverrideReason]
class DKIMAuthResult(TypedDict):
"""DKIM authentication result"""
domain: str
selector: str
result: str
class SPFAuthResult(TypedDict):
"""SPF authentication result"""
domain: str
scope: str
result: str
class AuthResults(TypedDict):
"""Authentication results for DKIM and SPF"""
dkim: list[DKIMAuthResult]
spf: list[SPFAuthResult]
class Identifiers(TypedDict):
"""Message identifiers"""
header_from: str
envelope_from: Optional[str]
envelope_to: Optional[str]
class ParsedReportRecord(TypedDict):
"""A parsed DMARC aggregate report record"""
source: IPAddressInfo
count: int
alignment: AlignmentInfo
policy_evaluated: PolicyEvaluated
identifiers: Identifiers
auth_results: AuthResults
class ParsedReportRecordWithMetadata(ParsedReportRecord, total=False):
"""A parsed DMARC report record with normalization metadata"""
normalized_timespan: bool
interval_begin: NotRequired[str]
interval_end: NotRequired[str]
class ReportMetadata(TypedDict, total=False):
"""DMARC report metadata"""
org_name: str
org_email: NotRequired[Optional[str]]
org_extra_contact_info: NotRequired[Optional[str]]
report_id: str
begin_date: str
end_date: str
errors: NotRequired[list[str]]
class PolicyPublished(TypedDict, total=False):
"""DMARC policy as published in DNS"""
domain: str
adkim: NotRequired[str]
aspf: NotRequired[str]
p: str
sp: NotRequired[str]
pct: NotRequired[str]
fo: NotRequired[str]
class ParsedAggregateReport(TypedDict):
"""A complete parsed DMARC aggregate report"""
xml_schema: str
report_metadata: ReportMetadata
policy_published: PolicyPublished
records: list[ParsedReportRecord]
class SMTPTLSFailureDetails(TypedDict):
"""SMTP TLS failure details"""
result_type: str
sending_mta_ip: NotRequired[Optional[str]]
receiving_mx_hostname: NotRequired[Optional[str]]
receiving_mx_helo: NotRequired[Optional[str]]
receiving_ip: NotRequired[Optional[str]]
failed_session_count: int
additional_information: NotRequired[Optional[str]]
failure_reason_code: NotRequired[Optional[str]]
class SMTPTLSPolicy(TypedDict, total=False):
"""SMTP TLS policy information"""
policy_type: str
policy_domain: str
policy_strings: NotRequired[list[str]]
mx_host_patterns: NotRequired[list[str]]
successful_session_count: int
failed_session_count: int
failure_details: NotRequired[list[SMTPTLSFailureDetails]]
class ParsedSMTPTLSReport(TypedDict):
"""A complete parsed SMTP TLS report"""
organization_name: str
begin_date: str
end_date: str
contact_info: str
report_id: str
policies: list[SMTPTLSPolicy]
class ParsedForensicReport(TypedDict, total=False):
"""A parsed DMARC forensic report"""
feedback_type: str
user_agent: NotRequired[Optional[str]]
version: NotRequired[Optional[str]]
original_envelope_id: NotRequired[Optional[str]]
original_mail_from: NotRequired[Optional[str]]
original_rcpt_to: NotRequired[Optional[str]]
arrival_date: str
arrival_date_utc: str
subject: NotRequired[str]
message_id: str
authentication_results: str
delivery_result: str
auth_failure: list[str]
reported_domain: str
arrival_date_utc: str
source: IPAddressInfo
authentication_mechanisms: list[str]
dkim_domain: NotRequired[Optional[str]]
sample_headers_only: bool
sample: NotRequired[str]
parsed_sample: NotRequired[dict[str, Any]]
class ReportTypeWrapper(TypedDict):
"""Wrapper for report type identification"""
report_type: str
report: Union[ParsedAggregateReport, ParsedForensicReport, ParsedSMTPTLSReport]
def _bucket_interval_by_day(
begin: datetime,
end: datetime,
total_count: int,
) -> List[Dict[str, Any]]:
) -> list[DateIntervalBucket]:
"""
Split the interval [begin, end) into daily buckets and distribute
`total_count` proportionally across those buckets.
@@ -323,7 +335,7 @@ def _bucket_interval_by_day(
if day_cursor > begin:
day_cursor -= timedelta(days=1)
day_buckets: List[Dict[str, Any]] = []
day_buckets: list[dict[str, Any]] = []
while day_cursor < end:
day_start = day_cursor
@@ -355,12 +367,12 @@ def _bucket_interval_by_day(
# Then apply a "largest remainder" rounding strategy to ensure the sum
# equals exactly total_count.
exact_values: List[float] = [
exact_values: list[float] = [
(b["seconds"] / interval_seconds) * total_count for b in day_buckets
]
floor_values: List[int] = [int(x) for x in exact_values]
fractional_parts: List[float] = [x - int(x) for x in exact_values]
floor_values: list[int] = [int(x) for x in exact_values]
fractional_parts: list[float] = [x - int(x) for x in exact_values]
# How many counts do we still need to distribute after flooring?
remainder = total_count - sum(floor_values)
@@ -380,7 +392,7 @@ def _bucket_interval_by_day(
final_counts[idx] += 1
# --- Step 3: Build the final per-day result list -------------------------
results: List[Dict[str, Any]] = []
results: list[DateIntervalBucket] = []
for bucket, count in zip(day_buckets, final_counts):
if count > 0:
results.append(
@@ -395,8 +407,8 @@ def _bucket_interval_by_day(
def _append_parsed_record(
parsed_record: DMARCRecord,
records: list[DMARCRecord],
parsed_record: dict[str, Any],
records: list[dict[str, Any]],
begin_dt: datetime,
end_dt: datetime,
normalize: bool,
@@ -439,7 +451,7 @@ def _append_parsed_record(
def _parse_report_record(
record: dict,
record: dict[str, Any],
*,
ip_db_path: Optional[str] = None,
always_use_local_files: Optional[bool] = False,
@@ -448,7 +460,7 @@ def _parse_report_record(
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
) -> DMARCRecord:
) -> dict[str, Any]:
"""
Converts a record from a DMARC aggregate report into a more consistent
format
@@ -465,10 +477,10 @@ def _parse_report_record(
dns_timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict: The converted record
dict: The converted record
"""
record = record.copy()
new_record = {}
new_record: dict[str, Any] = {}
if record["row"]["source_ip"] is None:
raise ValueError("Source IP address is empty")
new_record_source = get_ip_address_info(
@@ -486,7 +498,7 @@ def _parse_report_record(
new_record["source"] = new_record_source
new_record["count"] = int(record["row"]["count"])
policy_evaluated = record["row"]["policy_evaluated"].copy()
new_policy_evaluated = {
new_policy_evaluated: dict[str, Any] = {
"disposition": "none",
"dkim": "fail",
"spf": "fail",
@@ -510,11 +522,10 @@ def _parse_report_record(
and policy_evaluated["dkim"].lower() == "pass"
)
dmarc_aligned = spf_aligned or dkim_aligned
new_record["alignment"] = {
"spf": spf_aligned,
"dkim": dkim_aligned,
"dmarc": dmarc_aligned,
}
new_record["alignment"] = dict()
new_record["alignment"]["spf"] = spf_aligned
new_record["alignment"]["dkim"] = dkim_aligned
new_record["alignment"]["dmarc"] = dmarc_aligned
if "reason" in policy_evaluated:
if type(policy_evaluated["reason"]) is list:
reasons = policy_evaluated["reason"]
@@ -548,7 +559,7 @@ def _parse_report_record(
auth_results["dkim"] = [auth_results["dkim"]]
for result in auth_results["dkim"]:
if "domain" in result and result["domain"] is not None:
new_result = {"domain": result["domain"]}
new_result: dict[str, str] = {"domain": result["domain"]}
if "selector" in result and result["selector"] is not None:
new_result["selector"] = result["selector"]
else:
@@ -563,16 +574,16 @@ def _parse_report_record(
auth_results["spf"] = [auth_results["spf"]]
for result in auth_results["spf"]:
if "domain" in result and result["domain"] is not None:
new_result = {"domain": result["domain"]}
new_spf_result: dict[str, str] = {"domain": result["domain"]}
if "scope" in result and result["scope"] is not None:
new_result["scope"] = result["scope"]
new_spf_result["scope"] = result["scope"]
else:
new_result["scope"] = "mfrom"
new_spf_result["scope"] = "mfrom"
if "result" in result and result["result"] is not None:
new_result["result"] = result["result"]
new_spf_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_record["auth_results"]["spf"].append(new_result)
new_spf_result["result"] = "none"
new_record["auth_results"]["spf"].append(new_spf_result)
if "envelope_from" not in new_record["identifiers"]:
envelope_from = None
@@ -601,9 +612,9 @@ def _parse_report_record(
return new_record
def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]):
def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]) -> SMTPTLSFailureDetails:
try:
new_failure_details = {
new_failure_details: dict[str, Any] = {
"result_type": failure_details["result-type"],
"failed_session_count": failure_details["failed-session-count"],
}
@@ -637,7 +648,7 @@ def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def _parse_smtp_tls_report_policy(policy: dict[str, Any]):
def _parse_smtp_tls_report_policy(policy: dict[str, Any]) -> SMTPTLSPolicy:
policy_types = ["tlsa", "sts", "no-policy-found"]
try:
policy_domain = policy["policy"]["policy-domain"]
@@ -645,7 +656,7 @@ def _parse_smtp_tls_report_policy(policy: dict[str, Any]):
failure_details = []
if policy_type not in policy_types:
raise InvalidSMTPTLSReport(f"Invalid policy type {policy_type}")
new_policy = {
new_policy: dict[str, Any] = {
"policy_domain": policy_domain,
"policy_type": policy_type,
}
@@ -677,7 +688,7 @@ def _parse_smtp_tls_report_policy(policy: dict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def parse_smtp_tls_report_json(report: str) -> SMTPTLSReport:
def parse_smtp_tls_report_json(report: str) -> ParsedSMTPTLSReport:
"""Parses and validates an SMTP TLS report"""
required_fields = [
"organization-name",
@@ -699,7 +710,7 @@ def parse_smtp_tls_report_json(report: str) -> SMTPTLSReport:
for policy in report_dict["policies"]:
policies.append(_parse_smtp_tls_report_policy(policy))
new_report = {
new_report: dict[str, Any] = {
"organization_name": report_dict["organization-name"],
"begin_date": report_dict["date-range"]["start-datetime"],
"end_date": report_dict["date-range"]["end-datetime"],
@@ -711,22 +722,22 @@ def parse_smtp_tls_report_json(report: str) -> SMTPTLSReport:
return new_report
except KeyError as e:
InvalidSMTPTLSReport(f"Missing required field: {e}")
raise InvalidSMTPTLSReport(f"Missing required field: {e}")
except Exception as e:
raise InvalidSMTPTLSReport(str(e))
def parsed_smtp_tls_reports_to_csv_rows(
reports: Union[SMTPTLSReport, list[SMTPTLSReport]],
):
"""Converts one oor more parsed SMTP TLS reports into a list of single
layer OrderedDict objects suitable for use in a CSV"""
reports: Union[ParsedSMTPTLSReport, list[ParsedSMTPTLSReport]],
) -> list[dict[str, Any]]:
"""Converts one or more parsed SMTP TLS reports into a list of single
layer dict objects suitable for use in a CSV"""
if type(reports) is dict:
reports = [reports]
rows = []
for report in reports:
common_fields = {
common_fields: dict[str, Any] = {
"organization_name": report["organization_name"],
"begin_date": report["begin_date"],
"end_date": report["end_date"],
@@ -753,7 +764,7 @@ def parsed_smtp_tls_reports_to_csv_rows(
return rows
def parsed_smtp_tls_reports_to_csv(reports: SMTPTLSReport) -> str:
def parsed_smtp_tls_reports_to_csv(reports: ParsedSMTPTLSReport) -> str:
"""
Converts one or more parsed SMTP TLS reports to flat CSV format, including
headers
@@ -810,8 +821,8 @@ def parse_aggregate_report_xml(
timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24.0,
) -> AggregateReport:
"""Parses a DMARC XML report string and returns a consistent OrderedDict
) -> ParsedAggregateReport:
"""Parses a DMARC XML report string and returns a consistent dict
Args:
xml (str): A string of DMARC aggregate report XML
@@ -827,7 +838,7 @@ def parse_aggregate_report_xml(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: The parsed aggregate DMARC report
dict: The parsed aggregate DMARC report
"""
errors = []
# Parse XML and recover from errors
@@ -859,8 +870,8 @@ def parse_aggregate_report_xml(
schema = "draft"
if "version" in report:
schema = report["version"]
new_report = {"xml_schema": schema}
new_report_metadata = {}
new_report: dict[str, Any] = {"xml_schema": schema}
new_report_metadata: dict[str, Any] = {}
if report_metadata["org_name"] is None:
if report_metadata["email"] is not None:
report_metadata["org_name"] = report_metadata["email"].split("@")[-1]
@@ -921,7 +932,7 @@ def parse_aggregate_report_xml(
policy_published = report["policy_published"]
if type(policy_published) is list:
policy_published = policy_published[0]
new_policy_published = {}
new_policy_published: dict[str, Any] = {}
new_policy_published["domain"] = policy_published["domain"]
adkim = "r"
if "adkim" in policy_published:
@@ -1026,14 +1037,14 @@ def extract_report(content: Union[bytes, str, IO[Any]]) -> str:
str: The extracted text
"""
file_object = None
file_object: Union[BytesIO, IO[Any]]
try:
if isinstance(content, str):
try:
file_object = BytesIO(b64decode(content))
except binascii.Error:
return content
elif type(content) is bytes:
elif isinstance(content, bytes):
file_object = BytesIO(content)
else:
file_object = content
@@ -1088,12 +1099,12 @@ def parse_aggregate_report_file(
dns_timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> AggregateReport:
) -> ParsedAggregateReport:
"""Parses a file at the given path, a file-like object. or bytes as an
aggregate DMARC report
Args:
_input (str | bytes | IO): A path to a file, a file like object, or bytes
_input (Union[str, bytes, IO]): A path to a file, a file like object, or bytes
offline (bool): Do not query online for geolocation or DNS
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map file
@@ -1106,7 +1117,7 @@ def parse_aggregate_report_file(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: The parsed DMARC aggregate report
dict: The parsed DMARC aggregate report
"""
try:
@@ -1129,7 +1140,7 @@ def parse_aggregate_report_file(
def parsed_aggregate_reports_to_csv_rows(
reports: list[AggregateReport],
reports: list[ParsedAggregateReport],
) -> list[dict[str, Any]]:
"""
Converts one or more parsed aggregate reports to list of dicts in flat CSV
@@ -1254,7 +1265,7 @@ def parsed_aggregate_reports_to_csv_rows(
return rows
def parsed_aggregate_reports_to_csv(reports: list[AggregateReport]) -> str:
def parsed_aggregate_reports_to_csv(reports: list[ParsedAggregateReport]) -> str:
"""
Converts one or more parsed aggregate reports to flat CSV format, including
headers
@@ -1333,9 +1344,9 @@ def parse_forensic_report(
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
strip_attachment_payloads: Optional[bool] = False,
) -> ForensicReport:
) -> ParsedForensicReport:
"""
Converts a DMARC forensic report and sample to a ``OrderedDict``
Converts a DMARC forensic report and sample to a ``dict``
Args:
feedback_report (str): A message's feedback report as a string
@@ -1353,12 +1364,12 @@ def parse_forensic_report(
forensic report results
Returns:
OrderedDict: A parsed report and sample
dict: A parsed report and sample
"""
delivery_results = ["delivered", "spam", "policy", "reject", "other"]
try:
parsed_report = {}
parsed_report: dict[str, Any] = {}
report_values = feedback_report_regex.findall(feedback_report)
for report_value in report_values:
key = report_value[0].lower().replace("-", "_")
@@ -1461,7 +1472,7 @@ def parse_forensic_report(
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(reports: list[AggregateReport]):
def parsed_forensic_reports_to_csv_rows(reports: list[ParsedForensicReport]):
"""
Converts one or more parsed forensic reports to a list of dicts in flat CSV
format
@@ -1497,7 +1508,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[AggregateReport]):
return rows
def parsed_forensic_reports_to_csv(reports: list[AggregateReport]) -> str:
def parsed_forensic_reports_to_csv(reports: list[ParsedForensicReport]) -> str:
"""
Converts one or more parsed forensic reports to flat CSV format, including
headers
@@ -1562,7 +1573,7 @@ def parse_report_email(
strip_attachment_payloads: Optional[bool] = False,
keep_alive: Optional[callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> ParsedReport:
) -> dict[str, Any]:
"""
Parses a DMARC report from an email
@@ -1581,7 +1592,7 @@ def parse_report_email(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict:
dict:
* ``report_type``: ``aggregate`` or ``forensic``
* ``report``: The parsed report
"""
@@ -1654,7 +1665,7 @@ def parse_report_email(
try:
parts = payload.split("detected.", 1)
field_matches = text_report_regex.findall(parts[0])
fields = {}
fields = dict()
for match in field_matches:
field_name = match[0].lower().replace(" ", "-")
fields[field_name] = match[1].strip()
@@ -1678,7 +1689,7 @@ def parse_report_email(
if isinstance(payload, bytes):
payload = payload.decode("utf-8", errors="replace")
if payload.strip().startswith("{"):
result = parse_smtp_tls_report_json(payload)
smtp_tls_report = parse_smtp_tls_report_json(payload)
result = {
"report_type": "smtp_tls",
"report": smtp_tls_report,
@@ -1743,10 +1754,7 @@ def parse_report_email(
except Exception as e:
raise InvalidForensicReport(e.__str__())
result = {
"report_type": "forensic",
"report": forensic_report,
}
result = {"report_type": "forensic", "report": forensic_report}
return result
if result is None:
@@ -1767,12 +1775,12 @@ def parse_report_file(
offline: Optional[bool] = False,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24,
) -> ParsedReport:
) -> ReportTypeWrapper:
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
Args:
input_ (str | bytes | IO): A path to a file, a file like object, or bytes
input_ (Union[str, bytes, IO]): A path to a file, a file like object, or bytes
nameservers (list): A list of one or more nameservers to use
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
@@ -1786,7 +1794,7 @@ def parse_report_file(
keep_alive (callable): Keep alive function
Returns:
OrderedDict: The parsed DMARC report
dict: The parsed DMARC report
"""
if type(input_) is str:
logger.debug("Parsing {0}".format(input_))
@@ -1813,17 +1821,11 @@ def parse_report_file(
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
results = {
"report_type": "aggregate",
"report": report,
}
results = {"report_type": "aggregate", "report": report}
except InvalidAggregateReport:
try:
report = parse_smtp_tls_report_json(content)
results = {
"report_type": "smtp_tls",
"report": report,
}
results = {"report_type": "smtp_tls", "report": report}
except InvalidSMTPTLSReport:
try:
sa = strip_attachment_payloads
@@ -1857,7 +1859,7 @@ def get_dmarc_reports_from_mbox(
reverse_dns_map_url: Optional[str] = None,
offline: Optional[bool] = False,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> ParseResults:
) -> dict[str, dict[str, Any]]:
"""Parses a mailbox in mbox format containing e-mails with attached
DMARC reports
@@ -1876,7 +1878,7 @@ def get_dmarc_reports_from_mbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
"""
aggregate_reports = []
@@ -1952,7 +1954,7 @@ def get_dmarc_reports_from_mailbox(
since: Optional[datetime] = None,
create_folders: Optional[bool] = True,
normalize_timespan_threshold_hours: Optional[float] = 24,
) -> ParseResults:
) -> dict[str, dict[str, Any]]:
"""
Fetches and parses DMARC reports from a mailbox
@@ -1981,7 +1983,7 @@ def get_dmarc_reports_from_mailbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
"""
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
@@ -2215,7 +2217,7 @@ def get_dmarc_reports_from_mailbox(
except Exception as e:
e = "Error moving message UID {0}: {1}".format(msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
results = {
results: dict[str, Any] = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
@@ -2374,7 +2376,7 @@ def save_output(
Save report data in the given directory
Args:
results (OrderedDict): Parsing results
results (dict): Parsing results
output_directory (str): The path to the directory to save in
aggregate_json_filename (str): Filename for the aggregate JSON file
forensic_json_filename (str): Filename for the forensic JSON file
@@ -2446,12 +2448,12 @@ def save_output(
sample_file.write(sample)
def get_report_zip(results: ParseResults) -> bytes:
def get_report_zip(results: dict[str, Any]) -> bytes:
"""
Creates a zip file of parsed report output
Args:
results (OrderedDict): The parsed results
results (dict): The parsed results
Returns:
bytes: zip file bytes
@@ -2492,7 +2494,7 @@ def get_report_zip(results: ParseResults) -> bytes:
def email_results(
results: dict,
results: dict[str, Any],
*,
host: str,
mail_from: str,
@@ -2512,7 +2514,7 @@ def email_results(
Emails parsing results as a zip file
Args:
results (OrderedDict): Parsing results
results (dict): Parsing results
host (str): Mail server hostname or IP address
mail_from: The value of the message from header
mail_to (list): A list of addresses to mail to

View File

@@ -1633,11 +1633,13 @@ def _main():
logger.exception("Mailbox Error")
exit(1)
results = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
results = dict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
process_reports(results)

View File

@@ -24,7 +24,7 @@ from elasticsearch.helpers import reindex
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport, AggregateReport, ForensicReport, SMTPTLSReport
from parsedmarc import InvalidForensicReport
class ElasticsearchError(Exception):
@@ -278,7 +278,7 @@ def set_hosts(
Sets the Elasticsearch hosts to use
Args:
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
hosts (Union[str, list[str]]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
username (str): The username to use for authentication
@@ -376,7 +376,7 @@ def migrate_indexes(
def save_aggregate_report_to_elasticsearch(
aggregate_report: SMTPTLSReport,
aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -387,7 +387,7 @@ def save_aggregate_report_to_elasticsearch(
Saves a parsed DMARC aggregate report to Elasticsearch
Args:
aggregate_report (OrderedDict): A parsed forensic report
aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -538,7 +538,7 @@ def save_aggregate_report_to_elasticsearch(
def save_forensic_report_to_elasticsearch(
forensic_report: SMTPTLSReport,
forensic_report: dict[str, Any],
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -549,7 +549,7 @@ def save_forensic_report_to_elasticsearch(
Saves a parsed DMARC forensic report to Elasticsearch
Args:
forensic_report (OrderedDict): A parsed forensic report
forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -569,7 +569,7 @@ def save_forensic_report_to_elasticsearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers = {}
headers = dict()
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -596,7 +596,7 @@ def save_forensic_report_to_elasticsearch(
else:
headers["from"] = " <".join(headers["from"]) + ">"
from_ = {}
from_ = dict()
from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_))
q = q & from_query
@@ -608,7 +608,7 @@ def save_forensic_report_to_elasticsearch(
else:
headers["to"] = " <".join(headers["to"]) + ">"
to_ = {}
to_ = dict()
to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_))
q = q & to_query
@@ -705,7 +705,7 @@ def save_forensic_report_to_elasticsearch(
def save_smtp_tls_report_to_elasticsearch(
report: SMTPTLSReport,
report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -716,7 +716,7 @@ def save_smtp_tls_report_to_elasticsearch(
Saves a parsed SMTP TLS report to Elasticsearch
Args:
report (OrderedDict): A parsed SMTP TLS report
report (dict): A parsed SMTP TLS report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
import logging
import logging.handlers
@@ -54,7 +53,7 @@ class GelfClient(object):
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(
self, aggregate_reports: list[AggregateReport]
self, aggregate_reports: list[dict[str, Any]]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
@@ -64,13 +63,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(
self, forensic_reports: list[ForensicReport]
self, forensic_reports: list[dict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
from typing import Any, Optional, Union
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from ssl import SSLContext
import json
@@ -99,7 +98,7 @@ class KafkaClient(object):
def save_aggregate_reports_to_kafka(
self,
aggregate_reports: Union[AggregateReport, list[AggregateReport]],
aggregate_reports: Union[dict[str, Any], list[dict[str, Any]]],
aggregate_topic: str,
):
"""
@@ -111,9 +110,7 @@ class KafkaClient(object):
aggregate_topic (str): The name of the Kafka topic
"""
if isinstance(aggregate_reports, dict) or isinstance(
aggregate_reports, OrderedDict
):
if isinstance(aggregate_reports, dict):
aggregate_reports = [aggregate_reports]
if len(aggregate_reports) < 1:
@@ -146,7 +143,7 @@ class KafkaClient(object):
def save_forensic_reports_to_kafka(
self,
forensic_reports: Union[AggregateReport, list[AggregateReport]],
forensic_reports: Union[dict[str, Any], list[dict[str, Any]]],
forensic_topic: str,
):
"""
@@ -180,7 +177,7 @@ class KafkaClient(object):
def save_smtp_tls_reports_to_kafka(
self,
smtp_tls_reports: Union[list[SMTPTLSReport], SMTPTLSReport],
smtp_tls_reports: Union[list[dict[str, Any]], dict[str, Any]],
smtp_tls_topic: str,
):
"""

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from parsedmarc.log import logger
from azure.core.exceptions import HttpResponseError
@@ -133,7 +132,7 @@ class LogAnalyticsClient(object):
def publish_results(
self,
results: ParseResults,
results: dict[str, dict[str, Any]],
save_aggregate: bool,
save_forensic: bool,
save_smtp_tls: bool,

View File

@@ -24,7 +24,7 @@ from opensearchpy.helpers import reindex
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport, AggregateReport, ForensicReport, SMTPTLSReport
from parsedmarc import InvalidForensicReport
class OpenSearchError(Exception):
@@ -376,7 +376,7 @@ def migrate_indexes(
def save_aggregate_report_to_opensearch(
aggregate_report: AggregateReport,
aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -387,7 +387,7 @@ def save_aggregate_report_to_opensearch(
Saves a parsed DMARC aggregate report to OpenSearch
Args:
aggregate_report (OrderedDict): A parsed forensic report
aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -538,7 +538,7 @@ def save_aggregate_report_to_opensearch(
def save_forensic_report_to_opensearch(
forensic_report: ForensicReport,
forensic_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -549,7 +549,7 @@ def save_forensic_report_to_opensearch(
Saves a parsed DMARC forensic report to OpenSearch
Args:
forensic_report (OrderedDict): A parsed forensic report
forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -569,7 +569,7 @@ def save_forensic_report_to_opensearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers = {}
headers = dict()
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -596,7 +596,7 @@ def save_forensic_report_to_opensearch(
else:
headers["from"] = " <".join(headers["from"]) + ">"
from_ = {}
from_ = dict()
from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_))
q = q & from_query
@@ -608,7 +608,7 @@ def save_forensic_report_to_opensearch(
else:
headers["to"] = " <".join(headers["to"]) + ">"
to_ = {}
to_ = dict()
to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_))
q = q & to_query
@@ -716,7 +716,7 @@ def save_smtp_tls_report_to_opensearch(
Saves a parsed SMTP TLS report to OpenSearch
Args:
report (OrderedDict): A parsed SMTP TLS report
report (dict): A parsed SMTP TLS report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes

View File

@@ -3,12 +3,10 @@
from __future__ import annotations
from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
import json
import boto3
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -53,18 +51,18 @@ class S3Client(object):
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
)
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore
self.bucket: Any = self.s3.Bucket(self.bucket_name)
def save_aggregate_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate")
def save_forensic_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
def save_forensic_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "forensic")
def save_smtp_tls_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
def save_smtp_tls_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "smtp_tls")
def save_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport], report_type: str):
def save_report_to_s3(self, report: dict[str, Any], report_type: str):
if report_type == "smtp_tls":
report_date = report["begin_date"]
report_id = report["report_id"]

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
from typing import Any, Union
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from urllib.parse import urlparse
@@ -73,7 +72,7 @@ class HECClient(object):
def save_aggregate_reports_to_splunk(
self,
aggregate_reports: Union[list[AggregateReport], AggregateReport],
aggregate_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
"""
Saves aggregate DMARC reports to Splunk
@@ -94,7 +93,7 @@ class HECClient(object):
json_str = ""
for report in aggregate_reports:
for record in report["records"]:
new_report: dict[str, Union[str, int, float, dict]] = {}
new_report: dict[str, Union[str, int, float, dict]] = dict()
for metadata in report["report_metadata"]:
new_report[metadata] = report["report_metadata"][metadata]
new_report["interval_begin"] = record["interval_begin"]
@@ -139,7 +138,7 @@ class HECClient(object):
def save_forensic_reports_to_splunk(
self,
forensic_reports: Union[list[AggregateReport], AggregateReport],
forensic_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
"""
Saves forensic DMARC reports to Splunk
@@ -175,7 +174,7 @@ class HECClient(object):
raise SplunkError(response["text"])
def save_smtp_tls_reports_to_splunk(
self, reports: Union[list[AggregateReport], AggregateReport]
self, reports: Union[list[dict[str, Any]], dict[str, Any]]
):
"""
Saves aggregate DMARC reports to Splunk

View File

@@ -7,7 +7,6 @@ import logging
import logging.handlers
from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
import json
@@ -37,21 +36,21 @@ class SyslogClient(object):
self.logger.addHandler(log_handler)
def save_aggregate_report_to_syslog(
self, aggregate_reports: list[AggregateReport]
self, aggregate_reports: list[dict[str, Any]]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog(
self, forensic_reports: list[ForensicReport]
self, forensic_reports: list[dict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_syslog(
self, smtp_tls_reports: list[SMTPTLSReport]
self, smtp_tls_reports: list[dict[str, Any]]
):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:

View File

@@ -4,7 +4,7 @@
from __future__ import annotations
from typing import Optional, Union, TypedDict
from typing import Optional, Union, TypedDict, Any
import logging
import os
@@ -45,32 +45,6 @@ import parsedmarc.resources.dbip
import parsedmarc.resources.maps
from parsedmarc.constants import USER_AGENT
# TypedDict definitions for better typing
class IPAddressInfo(TypedDict, total=False):
"""Information about an IP address"""
ip_address: str
country: Optional[str]
reverse_dns: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
class EmailAddress(TypedDict, total=False):
"""Parsed email address information"""
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
class ReverseDNSService(TypedDict):
"""Reverse DNS service information"""
name: str
type: Optional[str]
parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
null_file = open(os.devnull, "w")
@@ -92,7 +66,15 @@ class DownloadError(RuntimeError):
"""Raised when an error occurs when downloading a file"""
def decode_base64(data) -> bytes:
class EmailAddress(TypedDict):
"""Parsed email address information"""
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
def decode_base64(data: str) -> bytes:
"""
Decodes a base64 string, with padding being optional
@@ -360,14 +342,14 @@ def get_ip_address_country(ip_address: str, *, db_path: Optional[str] = None) ->
def get_service_from_reverse_dns_base_domain(
base_domain,
base_domain: str,
*,
always_use_local_file: Optional[bool] = False,
local_file_path: Optional[bool] = None,
url: Optional[bool] = None,
local_file_path: Optional[str] = None,
url: Optional[str] = None,
offline: Optional[bool] = False,
reverse_dns_map: Optional[bool] = None,
) -> ReverseDNSService:
reverse_dns_map: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""
Returns the service name of a given base domain name from reverse DNS.
@@ -388,7 +370,7 @@ def get_service_from_reverse_dns_base_domain(
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = {"name": row["name"], "type": row["type"]}
reverse_dns_map[key] = dict(name=row["name"], type=row["type"])
base_domain = base_domain.lower().strip()
if url is None:
@@ -398,7 +380,7 @@ def get_service_from_reverse_dns_base_domain(
"resources/maps/base_reverse_dns_map.csv"
)
if reverse_dns_map is None:
reverse_dns_map = {}
reverse_dns_map = dict()
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map) == 0:
@@ -430,24 +412,24 @@ def get_service_from_reverse_dns_base_domain(
try:
service = reverse_dns_map[base_domain]
except KeyError:
service = {"name": base_domain, "type": None}
service = dict(name=base_domain, type=None)
return service
def get_ip_address_info(
ip_address,
ip_address: str,
*,
ip_db_path: Optional[str] = None,
reverse_dns_map_path: Optional[str] = None,
always_use_local_files: Optional[bool] = False,
reverse_dns_map_url: Optional[str] = None,
cache: Optional[ExpiringDict] = None,
reverse_dns_map: Optional[dict] = None,
reverse_dns_map: Optional[dict[str, Any]] = None,
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
timeout: Optional[float] = 2.0,
) -> IPAddressInfo:
) -> dict[str, Any]:
"""
Returns reverse DNS and country information for the given IP address
@@ -465,7 +447,7 @@ def get_ip_address_info(
timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict: ``ip_address``, ``reverse_dns``, ``country``
dict: ``ip_address``, ``reverse_dns``, ``country``
"""
ip_address = ip_address.lower()
@@ -474,7 +456,7 @@ def get_ip_address_info(
if info:
logger.debug(f"IP address {ip_address} was found in cache")
return info
info = {}
info = dict()
info["ip_address"] = ip_address
if offline:
reverse_dns = None
@@ -576,7 +558,7 @@ def is_mbox(path: str) -> bool:
return _is_mbox
def is_outlook_msg(content) -> bool:
def is_outlook_msg(content: Union[bytes, Any]) -> bool:
"""
Checks if the given content is an Outlook msg OLE/MSG file
@@ -609,13 +591,14 @@ def convert_outlook_msg(msg_bytes: bytes) -> str:
os.chdir(tmp_dir)
with open("sample.msg", "wb") as msg_file:
msg_file.write(msg_bytes)
rfc822_bytes: bytes
try:
subprocess.check_call(
["msgconvert", "sample.msg"], stdout=null_file, stderr=null_file
)
eml_path = "sample.eml"
with open(eml_path, "rb") as eml_file:
rfc822 = eml_file.read()
rfc822_bytes = eml_file.read()
except FileNotFoundError:
raise EmailParserError(
"Failed to convert Outlook MSG: msgconvert utility not found"
@@ -624,12 +607,12 @@ def convert_outlook_msg(msg_bytes: bytes) -> str:
os.chdir(orig_dir)
shutil.rmtree(tmp_dir)
return rfc822
return rfc822_bytes.decode("utf-8", errors="replace")
def parse_email(
data: Union[bytes, str], *, strip_attachment_payloads: Optional[bool] = False
):
) -> dict[str, Any]:
"""
A simplified email parser
@@ -644,7 +627,8 @@ def parse_email(
if isinstance(data, bytes):
if is_outlook_msg(data):
data = convert_outlook_msg(data)
data = data.decode("utf-8", errors="replace")
else:
data = data.decode("utf-8", errors="replace")
parsed_email = mailparser.parse_from_string(data)
headers = json.loads(parsed_email.headers_json).copy()
parsed_email = json.loads(parsed_email.mail_json).copy()