Fixes

- Fix saving reports to OpenSearch ([#637](https://github.com/domainaware/parsedmarc/issues/637))
- Fix parsing certain DMARC failure/forensic reports
- Some fixes to type hints (incomplete, but published as-is due to the above bugs)
This commit is contained in:
Sean Whalen
2025-12-08 13:26:59 -05:00
parent 445c9565a4
commit 6799f10364
12 changed files with 98 additions and 73 deletions

View File

@@ -1,5 +1,13 @@
# Changelog
## 9.0.4
### Fixes
- Fix saving reports to OpenSearch ([#637](https://github.com/domainaware/parsedmarc/issues/637))
- Fix parsing certain DMARC failure/forensic reports
- Some fixes to type hints (incomplete, but published as-is due to the above bugs)
## 9.0.3
### Fixes

View File

@@ -86,7 +86,7 @@ def _bucket_interval_by_day(
begin: datetime,
end: datetime,
total_count: int,
) -> List[Dict[Any]]:
) -> List[Dict[str, Any]]:
"""
Split the interval [begin, end) into daily buckets and distribute
`total_count` proportionally across those buckets.
@@ -221,7 +221,7 @@ def _bucket_interval_by_day(
def _append_parsed_record(
parsed_record: OrderedDict[str, Any],
records: OrderedDict[str, Any],
records: List[OrderedDict[str, Any]],
begin_dt: datetime,
end_dt: datetime,
normalize: bool,
@@ -267,10 +267,10 @@ def _parse_report_record(
record: OrderedDict,
*,
ip_db_path: Optional[str] = None,
always_use_local_files: bool = False,
always_use_local_files: Optional[bool] = False,
reverse_dns_map_path: Optional[str] = None,
reverse_dns_map_url: Optional[str] = None,
offline: bool = False,
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 2.0,
) -> OrderedDict[str, Any]:
@@ -463,7 +463,7 @@ def _parse_smtp_tls_failure_details(failure_details: dict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def _parse_smtp_tls_report_policy(policy: dict[str, Any]):
def _parse_smtp_tls_report_policy(policy: OrderedDict[str, Any]):
policy_types = ["tlsa", "sts", "no-policy-found"]
try:
policy_domain = policy["policy"]["policy-domain"]
@@ -500,7 +500,7 @@ def _parse_smtp_tls_report_policy(policy: dict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def parse_smtp_tls_report_json(report: dict[str, Any]):
def parse_smtp_tls_report_json(report: str):
"""Parses and validates an SMTP TLS report"""
required_fields = [
"organization-name",
@@ -512,22 +512,22 @@ def parse_smtp_tls_report_json(report: dict[str, Any]):
try:
policies = []
report = json.loads(report)
report_dict = json.loads(report)
for required_field in required_fields:
if required_field not in report:
if required_field not in report_dict:
raise Exception(f"Missing required field: {required_field}]")
if not isinstance(report["policies"], list):
policies_type = type(report["policies"])
if not isinstance(report_dict["policies"], list):
policies_type = type(report_dict["policies"])
raise InvalidSMTPTLSReport(f"policies must be a list, not {policies_type}")
for policy in report["policies"]:
for policy in report_dict["policies"]:
policies.append(_parse_smtp_tls_report_policy(policy))
new_report = OrderedDict(
organization_name=report["organization-name"],
begin_date=report["date-range"]["start-datetime"],
end_date=report["date-range"]["end-datetime"],
contact_info=report["contact-info"],
report_id=report["report-id"],
organization_name=report_dict["organization-name"],
begin_date=report_dict["date-range"]["start-datetime"],
end_date=report_dict["date-range"]["end-datetime"],
contact_info=report_dict["contact-info"],
report_id=report_dict["report-id"],
policies=policies,
)
@@ -539,7 +539,9 @@ def parse_smtp_tls_report_json(report: dict[str, Any]):
raise InvalidSMTPTLSReport(str(e))
def parsed_smtp_tls_reports_to_csv_rows(reports: OrderedDict[str, Any]):
def parsed_smtp_tls_reports_to_csv_rows(
reports: Union[OrderedDict[str, Any], List[OrderedDict[str, Any]]],
):
"""Converts one oor more parsed SMTP TLS reports into a list of single
layer OrderedDict objects suitable for use in a CSV"""
if type(reports) is OrderedDict:
@@ -622,15 +624,15 @@ def parsed_smtp_tls_reports_to_csv(reports: OrderedDict[str, Any]) -> str:
def parse_aggregate_report_xml(
xml: str,
*,
ip_db_path: Optional[bool] = None,
ip_db_path: Optional[str] = None,
always_use_local_files: Optional[bool] = False,
reverse_dns_map_path: Optional[bool] = None,
reverse_dns_map_url: Optional[bool] = None,
reverse_dns_map_path: Optional[str] = None,
reverse_dns_map_url: Optional[str] = None,
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
timeout: Optional[float] = 2.0,
keep_alive: Optional[callable] = None,
normalize_timespan_threshold_hours: Optional[float] = 24.0,
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24.0,
) -> OrderedDict[str, Any]:
"""Parses a DMARC XML report string and returns a consistent OrderedDict
@@ -1148,7 +1150,7 @@ def parse_forensic_report(
*,
always_use_local_files: Optional[bool] = False,
reverse_dns_map_path: Optional[str] = None,
reverse_dns_map_url: str = None,
reverse_dns_map_url: Optional[str] = None,
offline: Optional[bool] = False,
ip_db_path: Optional[str] = None,
nameservers: Optional[list[str]] = None,
@@ -1451,7 +1453,7 @@ def parse_report_email(
feedback_report = feedback_report.replace("\\n", "\n")
except (ValueError, TypeError, binascii.Error):
feedback_report = payload
elif content_type == "text/rfc822-headers":
elif content_type == "text/rfc822-headers" or "message/rfc-822":
sample = payload
elif content_type == "message/rfc822":
sample = payload
@@ -1750,14 +1752,14 @@ def get_dmarc_reports_from_mailbox(
delete: Optional[bool] = False,
test: Optional[bool] = False,
ip_db_path: Optional[str] = None,
always_use_local_files: Optional[str] = False,
always_use_local_files: Optional[bool] = False,
reverse_dns_map_path: Optional[str] = None,
reverse_dns_map_url: Optional[str] = None,
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
dns_timeout: Optional[float] = 6.0,
strip_attachment_payloads: Optional[bool] = False,
results: Optional[OrderedDict[str, any]] = None,
results: Optional[OrderedDict[str, Any]] = None,
batch_size: Optional[int] = 10,
since: Optional[datetime] = None,
create_folders: Optional[bool] = True,

View File

@@ -1,2 +1,3 @@
__version__ = "9.0.3"
__version__ = "9.0.4"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -203,11 +203,11 @@ class _SMTPTLSPolicyDoc(InnerDoc):
def add_failure_details(
self,
result_type: str,
ip_address: str,
receiving_ip: str,
receiving_mx_helo: str,
failed_session_count: int,
result_type: Optional[str] = None,
ip_address: Optional[str] = None,
receiving_ip: Optional[str] = None,
receiving_mx_helo: Optional[str] = None,
failed_session_count: Optional[int] = None,
sending_mta_ip: Optional[str] = None,
receiving_mx_hostname: Optional[str] = None,
additional_information_uri: Optional[str] = None,
@@ -297,7 +297,7 @@ def set_hosts(
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
if username:
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
conn_params["api_key"] = api_key
@@ -540,7 +540,7 @@ def save_aggregate_report_to_elasticsearch(
def save_forensic_report_to_elasticsearch(
forensic_report: OrderedDict[str, Any],
index_suffix: Optional[any] = None,
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1,
@@ -707,8 +707,8 @@ def save_forensic_report_to_elasticsearch(
def save_smtp_tls_report_to_elasticsearch(
report: OrderedDict[str, Any],
index_suffix: str = None,
index_prefix: str = None,
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
number_of_shards: Optional[int] = 1,
number_of_replicas: Optional[int] = 0,

View File

@@ -53,7 +53,9 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: OrderedDict[str, Any]):
def save_aggregate_report_to_gelf(
self, aggregate_reports: list[OrderedDict[str, Any]]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -61,7 +63,9 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: OrderedDict[str, Any]):
def save_forensic_report_to_gelf(
self, forensic_reports: list[OrderedDict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from typing import Any, Optional
from typing import Any, Optional, Union
from ssl import SSLContext
import json
@@ -98,7 +98,9 @@ class KafkaClient(object):
return date_range
def save_aggregate_reports_to_kafka(
self, aggregate_reports: list[OrderedDict][str, Any], aggregate_topic: str
self,
aggregate_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]],
aggregate_topic: str,
):
"""
Saves aggregate DMARC reports to Kafka
@@ -143,7 +145,9 @@ class KafkaClient(object):
raise KafkaError("Kafka error: {0}".format(e.__str__()))
def save_forensic_reports_to_kafka(
self, forensic_reports: OrderedDict[str, Any], forensic_topic: str
self,
forensic_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]],
forensic_topic: str,
):
"""
Saves forensic DMARC reports to Kafka, sends individual
@@ -175,7 +179,9 @@ class KafkaClient(object):
raise KafkaError("Kafka error: {0}".format(e.__str__()))
def save_smtp_tls_reports_to_kafka(
self, smtp_tls_reports: list[OrderedDict[str, Any]], smtp_tls_topic: str
self,
smtp_tls_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
smtp_tls_topic: str,
):
"""
Saves SMTP TLS reports to Kafka, sends individual

View File

@@ -110,7 +110,7 @@ class LogAnalyticsClient(object):
def publish_json(
self,
results: OrderedDict[str, OrderedDict[str, Any]],
results,
logs_client: LogsIngestionClient,
dcr_stream: str,
):

View File

@@ -203,11 +203,11 @@ class _SMTPTLSPolicyDoc(InnerDoc):
def add_failure_details(
self,
result_type: str,
ip_address: str,
receiving_ip: str,
receiving_mx_helo: str,
failed_session_count: int,
result_type: Optional[str] = None,
ip_address: Optional[str] = None,
receiving_ip: Optional[str] = None,
receiving_mx_helo: Optional[str] = None,
failed_session_count: Optional[int] = None,
sending_mta_ip: Optional[str] = None,
receiving_mx_hostname: Optional[str] = None,
additional_information_uri: Optional[str] = None,
@@ -297,7 +297,7 @@ def set_hosts(
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
if username:
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
conn_params["api_key"] = api_key
@@ -376,7 +376,7 @@ def migrate_indexes(
pass
def save_aggregate_report_to_elasticsearch(
def save_aggregate_report_to_opensearch(
aggregate_report: OrderedDict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
@@ -538,9 +538,9 @@ def save_aggregate_report_to_elasticsearch(
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
def save_forensic_report_to_elasticsearch(
def save_forensic_report_to_opensearch(
forensic_report: OrderedDict[str, Any],
index_suffix: Optional[any] = None,
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1,
@@ -705,10 +705,10 @@ def save_forensic_report_to_elasticsearch(
)
def save_smtp_tls_report_to_elasticsearch(
def save_smtp_tls_report_to_opensearch(
report: OrderedDict[str, Any],
index_suffix: str = None,
index_prefix: str = None,
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
number_of_shards: Optional[int] = 1,
number_of_replicas: Optional[int] = 0,

View File

@@ -53,7 +53,7 @@ class S3Client(object):
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
)
self.bucket = self.s3.Bucket(self.bucket_name)
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore
def save_aggregate_report_to_s3(self, report: OrderedDict[str, Any]):
self.save_report_to_s3(report, "aggregate")

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from typing import Any
from typing import Any, Union
from collections import OrderedDict
@@ -35,7 +35,7 @@ class HECClient(object):
url: str,
access_token: str,
index: str,
source: bool = "parsedmarc",
source: str = "parsedmarc",
verify=True,
timeout=60,
):
@@ -51,9 +51,9 @@ class HECClient(object):
timeout (float): Number of seconds to wait for the server to send
data before giving up
"""
url = urlparse(url)
parsed_url = urlparse(url)
self.url = "{0}://{1}/services/collector/event/1.0".format(
url.scheme, url.netloc
parsed_url.scheme, parsed_url.netloc
)
self.access_token = access_token.lstrip("Splunk ")
self.index = index
@@ -62,7 +62,9 @@ class HECClient(object):
self.session = requests.Session()
self.timeout = timeout
self.session.verify = verify
self._common_data = dict(host=self.host, source=self.source, index=self.index)
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
host=self.host, source=self.source, index=self.index
)
self.session.headers = {
"User-Agent": USER_AGENT,
@@ -70,7 +72,8 @@ class HECClient(object):
}
def save_aggregate_reports_to_splunk(
self, aggregate_reports: list[OrderedDict[str, Any]]
self,
aggregate_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
):
"""
Saves aggregate DMARC reports to Splunk
@@ -91,7 +94,7 @@ class HECClient(object):
json_str = ""
for report in aggregate_reports:
for record in report["records"]:
new_report = dict()
new_report: dict[str, Union[str, int, float, dict]] = dict()
for metadata in report["report_metadata"]:
new_report[metadata] = report["report_metadata"][metadata]
new_report["interval_begin"] = record["interval_begin"]
@@ -135,7 +138,8 @@ class HECClient(object):
raise SplunkError(response["text"])
def save_forensic_reports_to_splunk(
self, forensic_reports: list[OrderedDict[str, Any]]
self,
forensic_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]],
):
"""
Saves forensic DMARC reports to Splunk
@@ -170,7 +174,9 @@ class HECClient(object):
if response["code"] != 0:
raise SplunkError(response["text"])
def save_smtp_tls_reports_to_splunk(self, reports: OrderedDict[str, Any]):
def save_smtp_tls_reports_to_splunk(
self, reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]]
):
"""
Saves aggregate DMARC reports to Splunk

View File

@@ -67,7 +67,7 @@ class DownloadError(RuntimeError):
"""Raised when an error occurs when downloading a file"""
def decode_base64(data: str) -> bytes:
def decode_base64(data) -> bytes:
"""
Decodes a base64 string, with padding being optional
@@ -416,9 +416,9 @@ def get_ip_address_info(
ip_db_path: Optional[str] = None,
reverse_dns_map_path: Optional[str] = None,
always_use_local_files: Optional[bool] = False,
reverse_dns_map_url: Optional[bool] = None,
reverse_dns_map_url: Optional[str] = None,
cache: Optional[ExpiringDict] = None,
reverse_dns_map: Optional[bool] = None,
reverse_dns_map: Optional[dict] = None,
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
timeout: Optional[float] = 2.0,

View File

@@ -4,8 +4,6 @@ from __future__ import annotations
from typing import Any, Optional, Union
from collections import OrderedDict
import requests
from parsedmarc import logger
@@ -40,19 +38,19 @@ class WebhookClient(object):
"Content-Type": "application/json",
}
def save_forensic_report_to_webhook(self, report: OrderedDict[str, Any]):
def save_forensic_report_to_webhook(self, report: str):
try:
self._send_to_webhook(self.forensic_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
def save_smtp_tls_report_to_webhook(self, report: OrderedDict[str, Any]):
def save_smtp_tls_report_to_webhook(self, report: str):
try:
self._send_to_webhook(self.smtp_tls_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
def save_aggregate_report_to_webhook(self, report: OrderedDict[str, Any]):
def save_aggregate_report_to_webhook(self, report: str):
try:
self._send_to_webhook(self.aggregate_url, report)
except Exception as error_: