From 6799f10364e483093983265eed558c1e0ac89534 Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Mon, 8 Dec 2025 13:26:59 -0500 Subject: [PATCH] 9.0.4 Fixes - Fix saving reports to OpenSearch ([#637](https://github.com/domainaware/parsedmarc/issues/637)) - Fix parsing certain DMARC failure/forensic reports - Some fixes to type hints (incomplete, but published as-is due to the above bugs) --- CHANGELOG.md | 8 ++++++ parsedmarc/__init__.py | 54 ++++++++++++++++++++------------------ parsedmarc/constants.py | 3 ++- parsedmarc/elastic.py | 18 ++++++------- parsedmarc/gelf.py | 8 ++++-- parsedmarc/kafkaclient.py | 14 +++++++--- parsedmarc/loganalytics.py | 2 +- parsedmarc/opensearch.py | 24 ++++++++--------- parsedmarc/s3.py | 2 +- parsedmarc/splunk.py | 24 ++++++++++------- parsedmarc/utils.py | 6 ++--- parsedmarc/webhook.py | 8 +++--- 12 files changed, 98 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e2e7c90..a4da3d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 9.0.4 + +### Fixes + +- Fix saving reports to OpenSearch ([#637](https://github.com/domainaware/parsedmarc/issues/637)) +- Fix parsing certain DMARC failure/forensic reports +- Some fixes to type hints (incomplete, but published as-is due to the above bugs) + ## 9.0.3 ### Fixes diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 8c2c849..d30da73 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -86,7 +86,7 @@ def _bucket_interval_by_day( begin: datetime, end: datetime, total_count: int, -) -> List[Dict[Any]]: +) -> List[Dict[str, Any]]: """ Split the interval [begin, end) into daily buckets and distribute `total_count` proportionally across those buckets. @@ -221,7 +221,7 @@ def _bucket_interval_by_day( def _append_parsed_record( parsed_record: OrderedDict[str, Any], - records: OrderedDict[str, Any], + records: List[OrderedDict[str, Any]], begin_dt: datetime, end_dt: datetime, normalize: bool, @@ -267,10 +267,10 @@ def _parse_report_record( record: OrderedDict, *, ip_db_path: Optional[str] = None, - always_use_local_files: bool = False, + always_use_local_files: Optional[bool] = False, reverse_dns_map_path: Optional[str] = None, reverse_dns_map_url: Optional[str] = None, - offline: bool = False, + offline: Optional[bool] = False, nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 2.0, ) -> OrderedDict[str, Any]: @@ -463,7 +463,7 @@ def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]): raise InvalidSMTPTLSReport(str(e)) -def _parse_smtp_tls_report_policy(policy: dict[str, Any]): +def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]): policy_types = ["tlsa", "sts", "no-policy-found"] try: policy_domain = policy["policy"]["policy-domain"] @@ -500,7 +500,7 @@ def _parse_smtp_tls_report_policy(policy: dict[str, Any]): raise InvalidSMTPTLSReport(str(e)) -def parse_smtp_tls_report_json(report: dict[str, Any]): +def parse_smtp_tls_report_json(report: str): """Parses and validates an SMTP TLS report""" required_fields = [ "organization-name", @@ -512,22 +512,22 @@ def parse_smtp_tls_report_json(report: dict[str, Any]): try: policies = [] - report = json.loads(report) + report_dict = json.loads(report) for required_field in required_fields: - if required_field not in report: + if required_field not in report_dict: raise Exception(f"Missing required field: {required_field}]") - if not isinstance(report["policies"], list): - policies_type = type(report["policies"]) + if not isinstance(report_dict["policies"], list): + policies_type = type(report_dict["policies"]) raise InvalidSMTPTLSReport(f"policies must be a list, not {policies_type}") - for policy in report["policies"]: + for policy in report_dict["policies"]: policies.append(_parse_smtp_tls_report_policy(policy)) new_report = OrderedDict( - organization_name=report["organization-name"], - begin_date=report["date-range"]["start-datetime"], - end_date=report["date-range"]["end-datetime"], - contact_info=report["contact-info"], - report_id=report["report-id"], + organization_name=report_dict["organization-name"], + begin_date=report_dict["date-range"]["start-datetime"], + end_date=report_dict["date-range"]["end-datetime"], + contact_info=report_dict["contact-info"], + report_id=report_dict["report-id"], policies=policies, ) @@ -539,7 +539,9 @@ def parse_smtp_tls_report_json(report: dict[str, Any]): raise InvalidSMTPTLSReport(str(e)) -def parsed_smtp_tls_reports_to_csv_rows(reports: OrderedDict[str, Any]): +def parsed_smtp_tls_reports_to_csv_rows( + reports: Union[OrderedDict[str, Any], List[OrderedDict[str, Any]]], +): """Converts one oor more parsed SMTP TLS reports into a list of single layer OrderedDict objects suitable for use in a CSV""" if type(reports) is OrderedDict: @@ -622,15 +624,15 @@ def parsed_smtp_tls_reports_to_csv(reports: OrderedDict[str, Any]) -> str: def parse_aggregate_report_xml( xml: str, *, - ip_db_path: Optional[bool] = None, + ip_db_path: Optional[str] = None, always_use_local_files: Optional[bool] = False, - reverse_dns_map_path: Optional[bool] = None, - reverse_dns_map_url: Optional[bool] = None, + reverse_dns_map_path: Optional[str] = None, + reverse_dns_map_url: Optional[str] = None, offline: Optional[bool] = False, nameservers: Optional[list[str]] = None, timeout: Optional[float] = 2.0, - keep_alive: Optional[callable] = None, - normalize_timespan_threshold_hours: Optional[float] = 24.0, + keep_alive: Optional[Callable] = None, + normalize_timespan_threshold_hours: float = 24.0, ) -> OrderedDict[str, Any]: """Parses a DMARC XML report string and returns a consistent OrderedDict @@ -1148,7 +1150,7 @@ def parse_forensic_report( *, always_use_local_files: Optional[bool] = False, reverse_dns_map_path: Optional[str] = None, - reverse_dns_map_url: str = None, + reverse_dns_map_url: Optional[str] = None, offline: Optional[bool] = False, ip_db_path: Optional[str] = None, nameservers: Optional[list[str]] = None, @@ -1451,7 +1453,7 @@ def parse_report_email( feedback_report = feedback_report.replace("\\n", "\n") except (ValueError, TypeError, binascii.Error): feedback_report = payload - elif content_type == "text/rfc822-headers": + elif content_type == "text/rfc822-headers" or "message/rfc-822": sample = payload elif content_type == "message/rfc822": sample = payload @@ -1750,14 +1752,14 @@ def get_dmarc_reports_from_mailbox( delete: Optional[bool] = False, test: Optional[bool] = False, ip_db_path: Optional[str] = None, - always_use_local_files: Optional[str] = False, + always_use_local_files: Optional[bool] = False, reverse_dns_map_path: Optional[str] = None, reverse_dns_map_url: Optional[str] = None, offline: Optional[bool] = False, nameservers: Optional[list[str]] = None, dns_timeout: Optional[float] = 6.0, strip_attachment_payloads: Optional[bool] = False, - results: Optional[OrderedDict[str, any]] = None, + results: Optional[OrderedDict[str, Any]] = None, batch_size: Optional[int] = 10, since: Optional[datetime] = None, create_folders: Optional[bool] = True, diff --git a/parsedmarc/constants.py b/parsedmarc/constants.py index 8470c08..180fd1a 100644 --- a/parsedmarc/constants.py +++ b/parsedmarc/constants.py @@ -1,2 +1,3 @@ -__version__ = "9.0.3" +__version__ = "9.0.4" + USER_AGENT = f"parsedmarc/{__version__}" diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 92baf41..a3249e2 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -203,11 +203,11 @@ class _SMTPTLSPolicyDoc(InnerDoc): def add_failure_details( self, - result_type: str, - ip_address: str, - receiving_ip: str, - receiving_mx_helo: str, - failed_session_count: int, + result_type: Optional[str] = None, + ip_address: Optional[str] = None, + receiving_ip: Optional[str] = None, + receiving_mx_helo: Optional[str] = None, + failed_session_count: Optional[int] = None, sending_mta_ip: Optional[str] = None, receiving_mx_hostname: Optional[str] = None, additional_information_uri: Optional[str] = None, @@ -297,7 +297,7 @@ def set_hosts( conn_params["ca_certs"] = ssl_cert_path else: conn_params["verify_certs"] = False - if username: + if username and password: conn_params["http_auth"] = username + ":" + password if api_key: conn_params["api_key"] = api_key @@ -540,7 +540,7 @@ def save_aggregate_report_to_elasticsearch( def save_forensic_report_to_elasticsearch( forensic_report: OrderedDict[str, Any], - index_suffix: Optional[any] = None, + index_suffix: Optional[Any] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, number_of_shards: int = 1, @@ -707,8 +707,8 @@ def save_forensic_report_to_elasticsearch( def save_smtp_tls_report_to_elasticsearch( report: OrderedDict[str, Any], - index_suffix: str = None, - index_prefix: str = None, + index_suffix: Optional[str] = None, + index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, number_of_shards: Optional[int] = 1, number_of_replicas: Optional[int] = 0, diff --git a/parsedmarc/gelf.py b/parsedmarc/gelf.py index f7c811b..e3b5b77 100644 --- a/parsedmarc/gelf.py +++ b/parsedmarc/gelf.py @@ -53,7 +53,9 @@ class GelfClient(object): ) self.logger.addHandler(self.handler) - def save_aggregate_report_to_gelf(self, aggregate_reports: OrderedDict[str, Any]): + def save_aggregate_report_to_gelf( + self, aggregate_reports: list[OrderedDict[str, Any]] + ): rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) for row in rows: log_context_data.parsedmarc = row @@ -61,7 +63,9 @@ class GelfClient(object): log_context_data.parsedmarc = None - def save_forensic_report_to_gelf(self, forensic_reports: OrderedDict[str, Any]): + def save_forensic_report_to_gelf( + self, forensic_reports: list[OrderedDict[str, Any]] + ): rows = parsed_forensic_reports_to_csv_rows(forensic_reports) for row in rows: self.logger.info(json.dumps(row)) diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 31191d5..9e9c2ce 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Any, Optional +from typing import Any, Optional, Union from ssl import SSLContext import json @@ -98,7 +98,9 @@ class KafkaClient(object): return date_range def save_aggregate_reports_to_kafka( - self, aggregate_reports: list[OrderedDict][str, Any], aggregate_topic: str + self, + aggregate_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]], + aggregate_topic: str, ): """ Saves aggregate DMARC reports to Kafka @@ -143,7 +145,9 @@ class KafkaClient(object): raise KafkaError("Kafka error: {0}".format(e.__str__())) def save_forensic_reports_to_kafka( - self, forensic_reports: OrderedDict[str, Any], forensic_topic: str + self, + forensic_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]], + forensic_topic: str, ): """ Saves forensic DMARC reports to Kafka, sends individual @@ -175,7 +179,9 @@ class KafkaClient(object): raise KafkaError("Kafka error: {0}".format(e.__str__())) def save_smtp_tls_reports_to_kafka( - self, smtp_tls_reports: list[OrderedDict[str, Any]], smtp_tls_topic: str + self, + smtp_tls_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], + smtp_tls_topic: str, ): """ Saves SMTP TLS reports to Kafka, sends individual diff --git a/parsedmarc/loganalytics.py b/parsedmarc/loganalytics.py index 14bc3a9..1c5daa2 100644 --- a/parsedmarc/loganalytics.py +++ b/parsedmarc/loganalytics.py @@ -110,7 +110,7 @@ class LogAnalyticsClient(object): def publish_json( self, - results: OrderedDict[str, OrderedDict[str, Any]], + results, logs_client: LogsIngestionClient, dcr_stream: str, ): diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 94e724a..16cc3cf 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -203,11 +203,11 @@ class _SMTPTLSPolicyDoc(InnerDoc): def add_failure_details( self, - result_type: str, - ip_address: str, - receiving_ip: str, - receiving_mx_helo: str, - failed_session_count: int, + result_type: Optional[str] = None, + ip_address: Optional[str] = None, + receiving_ip: Optional[str] = None, + receiving_mx_helo: Optional[str] = None, + failed_session_count: Optional[int] = None, sending_mta_ip: Optional[str] = None, receiving_mx_hostname: Optional[str] = None, additional_information_uri: Optional[str] = None, @@ -297,7 +297,7 @@ def set_hosts( conn_params["ca_certs"] = ssl_cert_path else: conn_params["verify_certs"] = False - if username: + if username and password: conn_params["http_auth"] = username + ":" + password if api_key: conn_params["api_key"] = api_key @@ -376,7 +376,7 @@ def migrate_indexes( pass -def save_aggregate_report_to_elasticsearch( +def save_aggregate_report_to_opensearch( aggregate_report: OrderedDict[str, Any], index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, @@ -538,9 +538,9 @@ def save_aggregate_report_to_elasticsearch( raise OpenSearchError("OpenSearch error: {0}".format(e.__str__())) -def save_forensic_report_to_elasticsearch( +def save_forensic_report_to_opensearch( forensic_report: OrderedDict[str, Any], - index_suffix: Optional[any] = None, + index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, number_of_shards: int = 1, @@ -705,10 +705,10 @@ def save_forensic_report_to_elasticsearch( ) -def save_smtp_tls_report_to_elasticsearch( +def save_smtp_tls_report_to_opensearch( report: OrderedDict[str, Any], - index_suffix: str = None, - index_prefix: str = None, + index_suffix: Optional[str] = None, + index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, number_of_shards: Optional[int] = 1, number_of_replicas: Optional[int] = 0, diff --git a/parsedmarc/s3.py b/parsedmarc/s3.py index 4164c67..3fb0a5a 100644 --- a/parsedmarc/s3.py +++ b/parsedmarc/s3.py @@ -53,7 +53,7 @@ class S3Client(object): aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key, ) - self.bucket = self.s3.Bucket(self.bucket_name) + self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore def save_aggregate_report_to_s3(self, report: OrderedDict[str, Any]): self.save_report_to_s3(report, "aggregate") diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index 925c502..0390e44 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Any +from typing import Any, Union from collections import OrderedDict @@ -35,7 +35,7 @@ class HECClient(object): url: str, access_token: str, index: str, - source: bool = "parsedmarc", + source: str = "parsedmarc", verify=True, timeout=60, ): @@ -51,9 +51,9 @@ class HECClient(object): timeout (float): Number of seconds to wait for the server to send data before giving up """ - url = urlparse(url) + parsed_url = urlparse(url) self.url = "{0}://{1}/services/collector/event/1.0".format( - url.scheme, url.netloc + parsed_url.scheme, parsed_url.netloc ) self.access_token = access_token.lstrip("Splunk ") self.index = index @@ -62,7 +62,9 @@ class HECClient(object): self.session = requests.Session() self.timeout = timeout self.session.verify = verify - self._common_data = dict(host=self.host, source=self.source, index=self.index) + self._common_data: dict[str, Union[str, int, float, dict]] = dict( + host=self.host, source=self.source, index=self.index + ) self.session.headers = { "User-Agent": USER_AGENT, @@ -70,7 +72,8 @@ class HECClient(object): } def save_aggregate_reports_to_splunk( - self, aggregate_reports: list[OrderedDict[str, Any]] + self, + aggregate_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], ): """ Saves aggregate DMARC reports to Splunk @@ -91,7 +94,7 @@ class HECClient(object): json_str = "" for report in aggregate_reports: for record in report["records"]: - new_report = dict() + new_report: dict[str, Union[str, int, float, dict]] = dict() for metadata in report["report_metadata"]: new_report[metadata] = report["report_metadata"][metadata] new_report["interval_begin"] = record["interval_begin"] @@ -135,7 +138,8 @@ class HECClient(object): raise SplunkError(response["text"]) def save_forensic_reports_to_splunk( - self, forensic_reports: list[OrderedDict[str, Any]] + self, + forensic_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], ): """ Saves forensic DMARC reports to Splunk @@ -170,7 +174,9 @@ class HECClient(object): if response["code"] != 0: raise SplunkError(response["text"]) - def save_smtp_tls_reports_to_splunk(self, reports: OrderedDict[str, Any]): + def save_smtp_tls_reports_to_splunk( + self, reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]] + ): """ Saves aggregate DMARC reports to Splunk diff --git a/parsedmarc/utils.py b/parsedmarc/utils.py index 1b24039..545f7e7 100644 --- a/parsedmarc/utils.py +++ b/parsedmarc/utils.py @@ -67,7 +67,7 @@ class DownloadError(RuntimeError): """Raised when an error occurs when downloading a file""" -def decode_base64(data: str) -> bytes: +def decode_base64(data) -> bytes: """ Decodes a base64 string, with padding being optional @@ -416,9 +416,9 @@ def get_ip_address_info( ip_db_path: Optional[str] = None, reverse_dns_map_path: Optional[str] = None, always_use_local_files: Optional[bool] = False, - reverse_dns_map_url: Optional[bool] = None, + reverse_dns_map_url: Optional[str] = None, cache: Optional[ExpiringDict] = None, - reverse_dns_map: Optional[bool] = None, + reverse_dns_map: Optional[dict] = None, offline: Optional[bool] = False, nameservers: Optional[list[str]] = None, timeout: Optional[float] = 2.0, diff --git a/parsedmarc/webhook.py b/parsedmarc/webhook.py index 982808a..5dd05bf 100644 --- a/parsedmarc/webhook.py +++ b/parsedmarc/webhook.py @@ -4,8 +4,6 @@ from __future__ import annotations from typing import Any, Optional, Union -from collections import OrderedDict - import requests from parsedmarc import logger @@ -40,19 +38,19 @@ class WebhookClient(object): "Content-Type": "application/json", } - def save_forensic_report_to_webhook(self, report: OrderedDict[str, Any]): + def save_forensic_report_to_webhook(self, report: str): try: self._send_to_webhook(self.forensic_url, report) except Exception as error_: logger.error("Webhook Error: {0}".format(error_.__str__())) - def save_smtp_tls_report_to_webhook(self, report: OrderedDict[str, Any]): + def save_smtp_tls_report_to_webhook(self, report: str): try: self._send_to_webhook(self.smtp_tls_url, report) except Exception as error_: logger.error("Webhook Error: {0}".format(error_.__str__())) - def save_aggregate_report_to_webhook(self, report: OrderedDict[str, Any]): + def save_aggregate_report_to_webhook(self, report: str): try: self._send_to_webhook(self.aggregate_url, report) except Exception as error_: