From b1356f7dfc6025bfac3ed6868c6aedcd849c2848 Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Mon, 1 Dec 2025 18:57:23 -0500 Subject: [PATCH] 9.0.1 - Allow multiple `records` for the same aggregate DMARC report in Elasticsearch and Opensearch (fixes issue in 9.0.0) - Fix typos --- .vscode/settings.json | 6 +++ CHANGELOG.md | 10 ++++- parsedmarc/constants.py | 2 +- parsedmarc/elastic.py | 91 ++++++++++++++++++++++------------------ parsedmarc/opensearch.py | 90 +++++++++++++++++++++------------------ 5 files changed, 115 insertions(+), 84 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 27e19b5..dc1da07 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -36,6 +36,7 @@ "exampleuser", "expiringdict", "fieldlist", + "GELF", "genindex", "geoip", "geoipupdate", @@ -65,17 +66,20 @@ "mailrelay", "mailsuite", "maxdepth", + "MAXHEADERS", "maxmind", "mbox", "mfrom", "michaeldavie", "mikesiegel", + "Mimecast", "mitigations", "MMDB", "modindex", "msgconvert", "msgraph", "MSSP", + "multiprocess", "Munge", "ndjson", "newkey", @@ -86,6 +90,7 @@ "nosniff", "nwettbewerb", "opensearch", + "opensearchpy", "parsedmarc", "passsword", "Postorius", @@ -123,6 +128,7 @@ "truststore", "Übersicht", "uids", + "Uncategorized", "unparasable", "uper", "urllib", diff --git a/CHANGELOG.md b/CHANGELOG.md index 957a747..65be704 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 9.0.1 + +### Fixes + +- Allow multiple `records` for the same aggregate DMARC report in Elasticsearch and Opensearch + ## 9.0.0 (yanked) - Normalize aggregate DMARC report volumes when a report timespan exceeds 24 hours @@ -113,7 +119,7 @@ Removed improper spaces from `base_reverse_dns_map.csv` (Closes #612) ## 8.15.0 -- Fix processing of SMTP-TLS reports ([#549](https://github.com/domainaware/parsedmarc/issues/549)), which broke in commit [410663d ](https://github.com/domainaware/parsedmarc/commit/410663dbcaba019ca3d3744946348b56a635480b)(PR [#530](https://github.com/domainaware/parsedmarc/pull/530)) +- Fix processing of SMTP-TLS reports ([#549](https://github.com/domainaware/parsedmarc/issues/549)), which broke in commit [410663d](https://github.com/domainaware/parsedmarc/commit/410663dbcaba019ca3d3744946348b56a635480b)(PR [#530](https://github.com/domainaware/parsedmarc/pull/530)) - This PR enforced a stricter check for base64-encoded strings, which SMTP TLS reports from Google did not pass - Removing the check introduced its own issue, because some file paths were treated as base64-encoded strings - Create a separate `extract_report_from_file_path()` function for processioning reports based on a file path @@ -131,7 +137,7 @@ Removed improper spaces from `base_reverse_dns_map.csv` (Closes #612) - Skip invalid aggregate report rows without calling the whole report invalid - Some providers such as GoDaddy will send reports with some rows missing a source IP address, while other rows are fine -- Fix Dovecot support by using the seperator provided by the IPMAP namespace when possible (PR #552 closes #551) +- Fix Dovecot support by using the separator provided by the IMAP namespace when possible (PR #552 closes #551) - Only download `base_reverse_dns_map.csv` once (fixes #542) - Update included `base_reverse_dns_map.csv` - Replace University category with Education to be more inclusive diff --git a/parsedmarc/constants.py b/parsedmarc/constants.py index 62f6605..5a326cb 100644 --- a/parsedmarc/constants.py +++ b/parsedmarc/constants.py @@ -1,2 +1,2 @@ -__version__ = "9.0.0" +__version__ = "9.0.1" USER_AGENT = f"parsedmarc/{__version__}" diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 2eec5f2..ed3ceab 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -395,7 +395,51 @@ def save_aggregate_report_to_elasticsearch( org_name = metadata["org_name"] report_id = metadata["report_id"] domain = aggregate_report["policy_published"]["domain"] + begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True) + end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True) + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search_index = "dmarc_aggregate_{0}*".format(index_suffix) + else: + search_index = "dmarc_aggregate*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) + query = org_name_query & report_id_query & domain_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + + raise ElasticsearchError( + "Elasticsearch's search for existing report \ + error: {}".format(error_.__str__()) + ) + + if len(existing) > 0: + raise AlreadySaved( + "An aggregate report ID {0} from {1} about {2} " + "with a date range of {3} UTC to {4} UTC already " + "exists in " + "Elasticsearch".format( + report_id, org_name, domain, begin_date_human, end_date_human + ) + ) published_policy = _PublishedPolicy( domain=aggregate_report["policy_published"]["domain"], adkim=aggregate_report["policy_published"]["adkim"], @@ -409,8 +453,8 @@ def save_aggregate_report_to_elasticsearch( for record in aggregate_report["records"]: begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True) end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True) - begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") - end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + normalized_timespan = record["normalized_timespan"] + if monthly_indexes: index_date = begin_date.strftime("%Y-%m") else: @@ -418,41 +462,6 @@ def save_aggregate_report_to_elasticsearch( aggregate_report["begin_date"] = begin_date aggregate_report["end_date"] = end_date date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] - - org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) - report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) - domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) - begin_date_query = Q(dict(match=dict(date_begin=begin_date))) - end_date_query = Q(dict(match=dict(date_end=end_date))) - - if index_suffix is not None: - search_index = "dmarc_aggregate_{0}*".format(index_suffix) - else: - search_index = "dmarc_aggregate*" - if index_prefix is not None: - search_index = "{0}{1}".format(index_prefix, search_index) - search = Search(index=search_index) - query = org_name_query & report_id_query & domain_query - query = query & begin_date_query & end_date_query - search.query = query - - try: - existing = search.execute() - except Exception as error_: - raise ElasticsearchError( - "Elasticsearch's search for existing report \ - error: {}".format(error_.__str__()) - ) - - if len(existing) > 0: - raise AlreadySaved( - "An aggregate report ID {0} from {1} about {2} " - "with a date range of {3} UTC to {4} UTC already " - "exists in " - "Elasticsearch".format( - report_id, org_name, domain, begin_date_human, end_date_human - ) - ) agg_doc = _AggregateReportDoc( xml_schema=aggregate_report["xml_schema"], org_name=metadata["org_name"], @@ -460,9 +469,9 @@ def save_aggregate_report_to_elasticsearch( org_extra_contact_info=metadata["org_extra_contact_info"], report_id=metadata["report_id"], date_range=date_range, - date_begin=aggregate_report["begin_date"], - date_end=aggregate_report["end_date"], - normalized_timespan=record["normalized_timespan"], + date_begin=begin_date, + date_end=end_date, + normalized_timespan=normalized_timespan, errors=metadata["errors"], published_policy=published_policy, source_ip_address=record["source"]["ip_address"], @@ -785,7 +794,7 @@ def save_smtp_tls_report_to_elasticsearch( policy_doc = _SMTPTLSPolicyDoc( policy_domain=policy["policy_domain"], policy_type=policy["policy_type"], - succesful_session_count=policy["successful_session_count"], + successful_session_count=policy["successful_session_count"], failed_session_count=policy["failed_session_count"], policy_string=policy_strings, mx_host_patterns=mx_host_patterns, diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 61b5773..8786284 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -395,7 +395,51 @@ def save_aggregate_report_to_opensearch( org_name = metadata["org_name"] report_id = metadata["report_id"] domain = aggregate_report["policy_published"]["domain"] + begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True) + end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True) + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search_index = "dmarc_aggregate_{0}*".format(index_suffix) + else: + search_index = "dmarc_aggregate*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) + query = org_name_query & report_id_query & domain_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + + raise OpenSearchError( + "Opensearch's search for existing report \ + error: {}".format(error_.__str__()) + ) + + if len(existing) > 0: + raise AlreadySaved( + "An aggregate report ID {0} from {1} about {2} " + "with a date range of {3} UTC to {4} UTC already " + "exists in " + "Opensearch".format( + report_id, org_name, domain, begin_date_human, end_date_human + ) + ) published_policy = _PublishedPolicy( domain=aggregate_report["policy_published"]["domain"], adkim=aggregate_report["policy_published"]["adkim"], @@ -409,8 +453,8 @@ def save_aggregate_report_to_opensearch( for record in aggregate_report["records"]: begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True) end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True) - begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") - end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + normalized_timespan = record["normalized_timespan"] + if monthly_indexes: index_date = begin_date.strftime("%Y-%m") else: @@ -418,41 +462,6 @@ def save_aggregate_report_to_opensearch( aggregate_report["begin_date"] = begin_date aggregate_report["end_date"] = end_date date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] - - org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) - report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) - domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) - begin_date_query = Q(dict(match=dict(date_begin=begin_date))) - end_date_query = Q(dict(match=dict(date_end=end_date))) - - if index_suffix is not None: - search_index = "dmarc_aggregate_{0}*".format(index_suffix) - else: - search_index = "dmarc_aggregate*" - if index_prefix is not None: - search_index = "{0}{1}".format(index_prefix, search_index) - search = Search(index=search_index) - query = org_name_query & report_id_query & domain_query - query = query & begin_date_query & end_date_query - search.query = query - - try: - existing = search.execute() - except Exception as error_: - raise OpenSearchError( - "OpenSearch's search for existing report \ - error: {}".format(error_.__str__()) - ) - - if len(existing) > 0: - raise AlreadySaved( - "An aggregate report ID {0} from {1} about {2} " - "with a date range of {3} UTC to {4} UTC already " - "exists in " - "OpenSearch".format( - report_id, org_name, domain, begin_date_human, end_date_human - ) - ) agg_doc = _AggregateReportDoc( xml_schema=aggregate_report["xml_schema"], org_name=metadata["org_name"], @@ -460,8 +469,9 @@ def save_aggregate_report_to_opensearch( org_extra_contact_info=metadata["org_extra_contact_info"], report_id=metadata["report_id"], date_range=date_range, - date_begin=aggregate_report["begin_date"], - date_end=aggregate_report["end_date"], + date_begin=begin_date, + date_end=end_date, + normalized_timespan=normalized_timespan, errors=metadata["errors"], published_policy=published_policy, source_ip_address=record["source"]["ip_address"], @@ -784,7 +794,7 @@ def save_smtp_tls_report_to_opensearch( policy_doc = _SMTPTLSPolicyDoc( policy_domain=policy["policy_domain"], policy_type=policy["policy_type"], - succesful_session_count=policy["successful_session_count"], + successful_session_count=policy["successful_session_count"], failed_session_count=policy["failed_session_count"], policy_string=policy_strings, mx_host_patterns=mx_host_patterns,