Add TypedDict definitions to replace dict[str, Any] for better typing

- Added comprehensive TypedDict definitions for all major data structures
- IPAddressInfo, EmailAddress, ReverseDNSService in utils.py
- AggregateReport, ForensicReport, SMTPTLSReport and related types in __init__.py
- Updated function signatures throughout codebase to use TypedDict types
- Reduced dict[str, Any] usage from 50+ to 11 (remaining are legitimate generic cases)
- All tests pass successfully

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-19 21:29:14 +00:00
parent 639f8f674b
commit a4ac5accd2
10 changed files with 245 additions and 48 deletions

View File

@@ -4,7 +4,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Dict, List, Any, Union, Optional, IO, Callable from typing import Dict, List, Any, Union, Optional, IO, Callable, TypedDict
import binascii import binascii
import email import email
@@ -38,12 +38,177 @@ from parsedmarc.mail import (
) )
from parsedmarc.constants import __version__ from parsedmarc.constants import __version__
from parsedmarc.utils import get_base_domain, get_ip_address_info from parsedmarc.utils import get_base_domain, get_ip_address_info, IPAddressInfo
from parsedmarc.utils import is_outlook_msg, convert_outlook_msg from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import parse_email from parsedmarc.utils import parse_email
from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime
# TypedDict definitions for DMARC report structures
class PolicyOverrideReason(TypedDict, total=False):
"""Reason for DMARC policy override"""
type: str
comment: Optional[str]
class PolicyEvaluated(TypedDict):
"""DMARC policy evaluation result"""
disposition: str
dkim: str
spf: str
policy_override_reasons: list[PolicyOverrideReason]
class Alignment(TypedDict):
"""DMARC alignment information"""
spf: bool
dkim: bool
dmarc: bool
class DKIMResult(TypedDict, total=False):
"""DKIM authentication result"""
domain: str
selector: str
result: str
class SPFResult(TypedDict, total=False):
"""SPF authentication result"""
domain: str
scope: str
result: str
class AuthResults(TypedDict):
"""Authentication results"""
dkim: list[DKIMResult]
spf: list[SPFResult]
class DMARCIdentifiers(TypedDict):
"""DMARC identifiers"""
header_from: str
envelope_from: str
envelope_to: Optional[str]
class DMARCRecord(TypedDict):
"""Parsed DMARC aggregate record"""
source: IPAddressInfo
count: int
alignment: Alignment
policy_evaluated: PolicyEvaluated
identifiers: DMARCIdentifiers
auth_results: AuthResults
class PublishedPolicy(TypedDict):
"""Published DMARC policy"""
domain: str
adkim: str
aspf: str
p: str
sp: str
pct: str
fo: str
class ReportMetadata(TypedDict, total=False):
"""DMARC report metadata"""
org_name: str
org_email: str
org_extra_contact_info: Optional[str]
report_id: str
begin_date: str
end_date: str
errors: list[str]
timespan_requires_normalization: bool
original_timespan_seconds: int
class AggregateReport(TypedDict):
"""Parsed DMARC aggregate report"""
xml_schema: str
report_metadata: ReportMetadata
policy_published: PublishedPolicy
records: list[DMARCRecord]
class SMTPTLSFailureDetails(TypedDict, total=False):
"""SMTP TLS failure details"""
result_type: str
failed_session_count: int
sending_mta_ip: Optional[str]
receiving_ip: Optional[str]
receiving_mx_hostname: Optional[str]
receiving_mx_helo: Optional[str]
additional_info_uri: Optional[str]
failure_reason_code: Optional[str]
class SMTPTLSPolicy(TypedDict, total=False):
"""SMTP TLS policy"""
policy_domain: str
policy_type: str
policy_strings: Optional[list[str]]
mx_host_patterns: Optional[list[str]]
successful_session_count: int
total_successful_session_count: int
total_failure_session_count: int
failure_details: list[SMTPTLSFailureDetails]
class SMTPTLSReport(TypedDict):
"""Parsed SMTP TLS report"""
organization_name: str
begin_date: str
end_date: str
contact_info: str
report_id: str
policies: list[SMTPTLSPolicy]
class ForensicReport(TypedDict, total=False):
"""Parsed DMARC forensic report"""
feedback_type: str
user_agent: Optional[str]
version: Optional[str]
original_envelope_id: Optional[str]
original_mail_from: Optional[str]
original_rcpt_to: Optional[str]
arrival_date: str
arrival_date_utc: str
subject: Optional[str]
message_id: Optional[str]
authentication_results: Optional[str]
dkim_domain: Optional[str]
source_ip_address: Optional[str]
source_country: Optional[str]
source_reverse_dns: Optional[str]
source_base_domain: Optional[str]
delivery_result: Optional[str]
auth_failure: Optional[list[str]]
reported_domain: Optional[str]
arrival_date_utc: str
sample: Optional[str]
parsed_sample: Optional[dict]
sample_headers_only: bool
class ParsedReport(TypedDict):
"""Container for parsed report with type"""
report_type: str
report: Union[AggregateReport, ForensicReport, SMTPTLSReport]
class ParseResults(TypedDict):
"""Results from parsing multiple reports"""
aggregate_reports: list[AggregateReport]
forensic_reports: list[ForensicReport]
smtp_tls_reports: list[SMTPTLSReport]
logger.debug("parsedmarc v{0}".format(__version__)) logger.debug("parsedmarc v{0}".format(__version__))
feedback_report_regex = re.compile(r"^([\w\-]+): (.+)$", re.MULTILINE) feedback_report_regex = re.compile(r"^([\w\-]+): (.+)$", re.MULTILINE)
@@ -230,8 +395,8 @@ def _bucket_interval_by_day(
def _append_parsed_record( def _append_parsed_record(
parsed_record: dict[str, Any], parsed_record: DMARCRecord,
records: list[dict[str, Any]], records: list[DMARCRecord],
begin_dt: datetime, begin_dt: datetime,
end_dt: datetime, end_dt: datetime,
normalize: bool, normalize: bool,
@@ -283,7 +448,7 @@ def _parse_report_record(
offline: Optional[bool] = False, offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None, nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0, dns_timeout: Optional[float] = 2.0,
) -> dict[str, Any]: ) -> DMARCRecord:
""" """
Converts a record from a DMARC aggregate report into a more consistent Converts a record from a DMARC aggregate report into a more consistent
format format
@@ -512,7 +677,7 @@ def _parse_smtp_tls_report_policy(policy: dict[str, Any]):
raise InvalidSMTPTLSReport(str(e)) raise InvalidSMTPTLSReport(str(e))
def parse_smtp_tls_report_json(report: str): def parse_smtp_tls_report_json(report: str) -> SMTPTLSReport:
"""Parses and validates an SMTP TLS report""" """Parses and validates an SMTP TLS report"""
required_fields = [ required_fields = [
"organization-name", "organization-name",
@@ -552,7 +717,7 @@ def parse_smtp_tls_report_json(report: str):
def parsed_smtp_tls_reports_to_csv_rows( def parsed_smtp_tls_reports_to_csv_rows(
reports: Union[dict[str, Any], List[dict[str, Any]]], reports: Union[SMTPTLSReport, list[SMTPTLSReport]],
): ):
"""Converts one oor more parsed SMTP TLS reports into a list of single """Converts one oor more parsed SMTP TLS reports into a list of single
layer OrderedDict objects suitable for use in a CSV""" layer OrderedDict objects suitable for use in a CSV"""
@@ -588,7 +753,7 @@ def parsed_smtp_tls_reports_to_csv_rows(
return rows return rows
def parsed_smtp_tls_reports_to_csv(reports: dict[str, Any]) -> str: def parsed_smtp_tls_reports_to_csv(reports: SMTPTLSReport) -> str:
""" """
Converts one or more parsed SMTP TLS reports to flat CSV format, including Converts one or more parsed SMTP TLS reports to flat CSV format, including
headers headers
@@ -645,7 +810,7 @@ def parse_aggregate_report_xml(
timeout: Optional[float] = 2.0, timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None, keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24.0, normalize_timespan_threshold_hours: float = 24.0,
) -> dict[str, Any]: ) -> AggregateReport:
"""Parses a DMARC XML report string and returns a consistent OrderedDict """Parses a DMARC XML report string and returns a consistent OrderedDict
Args: Args:
@@ -923,7 +1088,7 @@ def parse_aggregate_report_file(
dns_timeout: Optional[float] = 2.0, dns_timeout: Optional[float] = 2.0,
keep_alive: Optional[Callable] = None, keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0, normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> dict[str, Any]: ) -> AggregateReport:
"""Parses a file at the given path, a file-like object. or bytes as an """Parses a file at the given path, a file-like object. or bytes as an
aggregate DMARC report aggregate DMARC report
@@ -964,7 +1129,7 @@ def parse_aggregate_report_file(
def parsed_aggregate_reports_to_csv_rows( def parsed_aggregate_reports_to_csv_rows(
reports: list[dict[str, Any]], reports: list[AggregateReport],
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """
Converts one or more parsed aggregate reports to list of dicts in flat CSV Converts one or more parsed aggregate reports to list of dicts in flat CSV
@@ -1089,7 +1254,7 @@ def parsed_aggregate_reports_to_csv_rows(
return rows return rows
def parsed_aggregate_reports_to_csv(reports: list[dict[str, Any]]) -> str: def parsed_aggregate_reports_to_csv(reports: list[AggregateReport]) -> str:
""" """
Converts one or more parsed aggregate reports to flat CSV format, including Converts one or more parsed aggregate reports to flat CSV format, including
headers headers
@@ -1168,7 +1333,7 @@ def parse_forensic_report(
nameservers: Optional[list[str]] = None, nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0, dns_timeout: Optional[float] = 2.0,
strip_attachment_payloads: Optional[bool] = False, strip_attachment_payloads: Optional[bool] = False,
) -> dict[str, Any]: ) -> ForensicReport:
""" """
Converts a DMARC forensic report and sample to a ``OrderedDict`` Converts a DMARC forensic report and sample to a ``OrderedDict``
@@ -1296,7 +1461,7 @@ def parse_forensic_report(
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__())) raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(reports: list[dict[str, Any]]): def parsed_forensic_reports_to_csv_rows(reports: list[AggregateReport]):
""" """
Converts one or more parsed forensic reports to a list of dicts in flat CSV Converts one or more parsed forensic reports to a list of dicts in flat CSV
format format
@@ -1332,7 +1497,7 @@ def parsed_forensic_reports_to_csv_rows(reports: list[dict[str, Any]]):
return rows return rows
def parsed_forensic_reports_to_csv(reports: list[dict[str, Any]]) -> str: def parsed_forensic_reports_to_csv(reports: list[AggregateReport]) -> str:
""" """
Converts one or more parsed forensic reports to flat CSV format, including Converts one or more parsed forensic reports to flat CSV format, including
headers headers
@@ -1397,7 +1562,7 @@ def parse_report_email(
strip_attachment_payloads: Optional[bool] = False, strip_attachment_payloads: Optional[bool] = False,
keep_alive: Optional[callable] = None, keep_alive: Optional[callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0, normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> dict[str, Any]: ) -> ParsedReport:
""" """
Parses a DMARC report from an email Parses a DMARC report from an email
@@ -1602,7 +1767,7 @@ def parse_report_file(
offline: Optional[bool] = False, offline: Optional[bool] = False,
keep_alive: Optional[Callable] = None, keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24, normalize_timespan_threshold_hours: Optional[float] = 24,
) -> dict[str, Any]: ) -> ParsedReport:
"""Parses a DMARC aggregate or forensic file at the given path, a """Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes file-like object. or bytes
@@ -1692,7 +1857,7 @@ def get_dmarc_reports_from_mbox(
reverse_dns_map_url: Optional[str] = None, reverse_dns_map_url: Optional[str] = None,
offline: Optional[bool] = False, offline: Optional[bool] = False,
normalize_timespan_threshold_hours: Optional[float] = 24.0, normalize_timespan_threshold_hours: Optional[float] = 24.0,
) -> dict[str, dict[str, Any]]: ) -> ParseResults:
"""Parses a mailbox in mbox format containing e-mails with attached """Parses a mailbox in mbox format containing e-mails with attached
DMARC reports DMARC reports
@@ -1787,7 +1952,7 @@ def get_dmarc_reports_from_mailbox(
since: Optional[datetime] = None, since: Optional[datetime] = None,
create_folders: Optional[bool] = True, create_folders: Optional[bool] = True,
normalize_timespan_threshold_hours: Optional[float] = 24, normalize_timespan_threshold_hours: Optional[float] = 24,
) -> dict[str, dict[str, Any]]: ) -> ParseResults:
""" """
Fetches and parses DMARC reports from a mailbox Fetches and parses DMARC reports from a mailbox
@@ -2281,7 +2446,7 @@ def save_output(
sample_file.write(sample) sample_file.write(sample)
def get_report_zip(results: dict[str, Any]) -> bytes: def get_report_zip(results: ParseResults) -> bytes:
""" """
Creates a zip file of parsed report output Creates a zip file of parsed report output

View File

@@ -24,7 +24,7 @@ from elasticsearch.helpers import reindex
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport from parsedmarc import InvalidForensicReport, AggregateReport, ForensicReport, SMTPTLSReport
class ElasticsearchError(Exception): class ElasticsearchError(Exception):
@@ -376,7 +376,7 @@ def migrate_indexes(
def save_aggregate_report_to_elasticsearch( def save_aggregate_report_to_elasticsearch(
aggregate_report: dict[str, Any], aggregate_report: SMTPTLSReport,
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: Optional[bool] = False,
@@ -538,7 +538,7 @@ def save_aggregate_report_to_elasticsearch(
def save_forensic_report_to_elasticsearch( def save_forensic_report_to_elasticsearch(
forensic_report: dict[str, Any], forensic_report: SMTPTLSReport,
index_suffix: Optional[Any] = None, index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: Optional[bool] = False,
@@ -705,7 +705,7 @@ def save_forensic_report_to_elasticsearch(
def save_smtp_tls_report_to_elasticsearch( def save_smtp_tls_report_to_elasticsearch(
report: dict[str, Any], report: SMTPTLSReport,
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: Optional[bool] = False,

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
import logging import logging
import logging.handlers import logging.handlers
@@ -53,7 +54,7 @@ class GelfClient(object):
self.logger.addHandler(self.handler) self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf( def save_aggregate_report_to_gelf(
self, aggregate_reports: list[dict[str, Any]] self, aggregate_reports: list[AggregateReport]
): ):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows: for row in rows:
@@ -63,13 +64,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None log_context_data.parsedmarc = None
def save_forensic_report_to_gelf( def save_forensic_report_to_gelf(
self, forensic_reports: list[dict[str, Any]] self, forensic_reports: list[ForensicReport]
): ):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports) rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]): def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Optional, Union from typing import Any, Optional, Union
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from ssl import SSLContext from ssl import SSLContext
import json import json
@@ -98,7 +99,7 @@ class KafkaClient(object):
def save_aggregate_reports_to_kafka( def save_aggregate_reports_to_kafka(
self, self,
aggregate_reports: Union[dict[str, Any], list[dict[str, Any]]], aggregate_reports: Union[AggregateReport, list[AggregateReport]],
aggregate_topic: str, aggregate_topic: str,
): ):
""" """
@@ -145,7 +146,7 @@ class KafkaClient(object):
def save_forensic_reports_to_kafka( def save_forensic_reports_to_kafka(
self, self,
forensic_reports: Union[dict[str, Any], list[dict[str, Any]]], forensic_reports: Union[AggregateReport, list[AggregateReport]],
forensic_topic: str, forensic_topic: str,
): ):
""" """
@@ -179,7 +180,7 @@ class KafkaClient(object):
def save_smtp_tls_reports_to_kafka( def save_smtp_tls_reports_to_kafka(
self, self,
smtp_tls_reports: Union[list[dict[str, Any]], dict[str, Any]], smtp_tls_reports: Union[list[SMTPTLSReport], SMTPTLSReport],
smtp_tls_topic: str, smtp_tls_topic: str,
): ):
""" """

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from parsedmarc.log import logger from parsedmarc.log import logger
from azure.core.exceptions import HttpResponseError from azure.core.exceptions import HttpResponseError
@@ -132,7 +133,7 @@ class LogAnalyticsClient(object):
def publish_results( def publish_results(
self, self,
results: dict[str, dict[str, Any]], results: ParseResults,
save_aggregate: bool, save_aggregate: bool,
save_forensic: bool, save_forensic: bool,
save_smtp_tls: bool, save_smtp_tls: bool,

View File

@@ -24,7 +24,7 @@ from opensearchpy.helpers import reindex
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport from parsedmarc import InvalidForensicReport, AggregateReport, ForensicReport, SMTPTLSReport
class OpenSearchError(Exception): class OpenSearchError(Exception):
@@ -376,7 +376,7 @@ def migrate_indexes(
def save_aggregate_report_to_opensearch( def save_aggregate_report_to_opensearch(
aggregate_report: dict[str, Any], aggregate_report: AggregateReport,
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: Optional[bool] = False,
@@ -538,7 +538,7 @@ def save_aggregate_report_to_opensearch(
def save_forensic_report_to_opensearch( def save_forensic_report_to_opensearch(
forensic_report: dict[str, Any], forensic_report: ForensicReport,
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: Optional[bool] = False,

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
import json import json
import boto3 import boto3
@@ -54,16 +55,16 @@ class S3Client(object):
) )
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore
def save_aggregate_report_to_s3(self, report: dict[str, Any]): def save_aggregate_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
self.save_report_to_s3(report, "aggregate") self.save_report_to_s3(report, "aggregate")
def save_forensic_report_to_s3(self, report: dict[str, Any]): def save_forensic_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
self.save_report_to_s3(report, "forensic") self.save_report_to_s3(report, "forensic")
def save_smtp_tls_report_to_s3(self, report: dict[str, Any]): def save_smtp_tls_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport]):
self.save_report_to_s3(report, "smtp_tls") self.save_report_to_s3(report, "smtp_tls")
def save_report_to_s3(self, report: dict[str, Any], report_type: str): def save_report_to_s3(self, report: Union[AggregateReport, ForensicReport, SMTPTLSReport], report_type: str):
if report_type == "smtp_tls": if report_type == "smtp_tls":
report_date = report["begin_date"] report_date = report["begin_date"]
report_id = report["report_id"] report_id = report["report_id"]

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Union from typing import Any, Union
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -72,7 +73,7 @@ class HECClient(object):
def save_aggregate_reports_to_splunk( def save_aggregate_reports_to_splunk(
self, self,
aggregate_reports: Union[list[dict[str, Any]], dict[str, Any]], aggregate_reports: Union[list[AggregateReport], AggregateReport],
): ):
""" """
Saves aggregate DMARC reports to Splunk Saves aggregate DMARC reports to Splunk
@@ -138,7 +139,7 @@ class HECClient(object):
def save_forensic_reports_to_splunk( def save_forensic_reports_to_splunk(
self, self,
forensic_reports: Union[list[dict[str, Any]], dict[str, Any]], forensic_reports: Union[list[AggregateReport], AggregateReport],
): ):
""" """
Saves forensic DMARC reports to Splunk Saves forensic DMARC reports to Splunk
@@ -174,7 +175,7 @@ class HECClient(object):
raise SplunkError(response["text"]) raise SplunkError(response["text"])
def save_smtp_tls_reports_to_splunk( def save_smtp_tls_reports_to_splunk(
self, reports: Union[list[dict[str, Any]], dict[str, Any]] self, reports: Union[list[AggregateReport], AggregateReport]
): ):
""" """
Saves aggregate DMARC reports to Splunk Saves aggregate DMARC reports to Splunk

View File

@@ -7,6 +7,7 @@ import logging
import logging.handlers import logging.handlers
from typing import Any from typing import Any
from parsedmarc import AggregateReport, ForensicReport, SMTPTLSReport, ParseResults
import json import json
@@ -36,21 +37,21 @@ class SyslogClient(object):
self.logger.addHandler(log_handler) self.logger.addHandler(log_handler)
def save_aggregate_report_to_syslog( def save_aggregate_report_to_syslog(
self, aggregate_reports: list[dict[str, Any]] self, aggregate_reports: list[AggregateReport]
): ):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog( def save_forensic_report_to_syslog(
self, forensic_reports: list[dict[str, Any]] self, forensic_reports: list[ForensicReport]
): ):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports) rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_syslog( def save_smtp_tls_report_to_syslog(
self, smtp_tls_reports: list[dict[str, Any]] self, smtp_tls_reports: list[SMTPTLSReport]
): ):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows: for row in rows:

View File

@@ -4,7 +4,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional, Union from typing import Optional, Union, TypedDict
import logging import logging
import os import os
@@ -45,6 +45,32 @@ import parsedmarc.resources.dbip
import parsedmarc.resources.maps import parsedmarc.resources.maps
from parsedmarc.constants import USER_AGENT from parsedmarc.constants import USER_AGENT
# TypedDict definitions for better typing
class IPAddressInfo(TypedDict, total=False):
"""Information about an IP address"""
ip_address: str
country: Optional[str]
reverse_dns: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
class EmailAddress(TypedDict, total=False):
"""Parsed email address information"""
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
class ReverseDNSService(TypedDict):
"""Reverse DNS service information"""
name: str
type: Optional[str]
parenthesis_regex = re.compile(r"\s*\(.*\)\s*") parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
null_file = open(os.devnull, "w") null_file = open(os.devnull, "w")
@@ -341,7 +367,7 @@ def get_service_from_reverse_dns_base_domain(
url: Optional[bool] = None, url: Optional[bool] = None,
offline: Optional[bool] = False, offline: Optional[bool] = False,
reverse_dns_map: Optional[bool] = None, reverse_dns_map: Optional[bool] = None,
) -> str: ) -> ReverseDNSService:
""" """
Returns the service name of a given base domain name from reverse DNS. Returns the service name of a given base domain name from reverse DNS.
@@ -421,7 +447,7 @@ def get_ip_address_info(
offline: Optional[bool] = False, offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None, nameservers: Optional[list[str]] = None,
timeout: Optional[float] = 2.0, timeout: Optional[float] = 2.0,
) -> dict[str, str]: ) -> IPAddressInfo:
""" """
Returns reverse DNS and country information for the given IP address Returns reverse DNS and country information for the given IP address
@@ -486,7 +512,7 @@ def get_ip_address_info(
return info return info
def parse_email_address(original_address: str) -> dict[str, str]: def parse_email_address(original_address: str) -> EmailAddress:
if original_address[0] == "": if original_address[0] == "":
display_name = None display_name = None
else: else: