- Allow multiple `records` for the same aggregate DMARC report in Elasticsearch and Opensearch (fixes issue in 9.0.0)
- Fix typos
This commit is contained in:
Sean Whalen
2025-12-01 18:57:23 -05:00
parent 1969196e1a
commit b1356f7dfc
5 changed files with 115 additions and 84 deletions
+6
View File
@@ -36,6 +36,7 @@
"exampleuser",
"expiringdict",
"fieldlist",
"GELF",
"genindex",
"geoip",
"geoipupdate",
@@ -65,17 +66,20 @@
"mailrelay",
"mailsuite",
"maxdepth",
"MAXHEADERS",
"maxmind",
"mbox",
"mfrom",
"michaeldavie",
"mikesiegel",
"Mimecast",
"mitigations",
"MMDB",
"modindex",
"msgconvert",
"msgraph",
"MSSP",
"multiprocess",
"Munge",
"ndjson",
"newkey",
@@ -86,6 +90,7 @@
"nosniff",
"nwettbewerb",
"opensearch",
"opensearchpy",
"parsedmarc",
"passsword",
"Postorius",
@@ -123,6 +128,7 @@
"truststore",
"Übersicht",
"uids",
"Uncategorized",
"unparasable",
"uper",
"urllib",
+8 -2
View File
@@ -1,5 +1,11 @@
# Changelog
## 9.0.1
### Fixes
- Allow multiple `records` for the same aggregate DMARC report in Elasticsearch and Opensearch
## 9.0.0 (yanked)
- Normalize aggregate DMARC report volumes when a report timespan exceeds 24 hours
@@ -113,7 +119,7 @@ Removed improper spaces from `base_reverse_dns_map.csv` (Closes #612)
## 8.15.0
- Fix processing of SMTP-TLS reports ([#549](https://github.com/domainaware/parsedmarc/issues/549)), which broke in commit [410663d ](https://github.com/domainaware/parsedmarc/commit/410663dbcaba019ca3d3744946348b56a635480b)(PR [#530](https://github.com/domainaware/parsedmarc/pull/530))
- Fix processing of SMTP-TLS reports ([#549](https://github.com/domainaware/parsedmarc/issues/549)), which broke in commit [410663d](https://github.com/domainaware/parsedmarc/commit/410663dbcaba019ca3d3744946348b56a635480b)(PR [#530](https://github.com/domainaware/parsedmarc/pull/530))
- This PR enforced a stricter check for base64-encoded strings, which SMTP TLS reports from Google did not pass
- Removing the check introduced its own issue, because some file paths were treated as base64-encoded strings
- Create a separate `extract_report_from_file_path()` function for processioning reports based on a file path
@@ -131,7 +137,7 @@ Removed improper spaces from `base_reverse_dns_map.csv` (Closes #612)
- Skip invalid aggregate report rows without calling the whole report invalid
- Some providers such as GoDaddy will send reports with some rows missing a source IP address, while other rows are fine
- Fix Dovecot support by using the seperator provided by the IPMAP namespace when possible (PR #552 closes #551)
- Fix Dovecot support by using the separator provided by the IMAP namespace when possible (PR #552 closes #551)
- Only download `base_reverse_dns_map.csv` once (fixes #542)
- Update included `base_reverse_dns_map.csv`
- Replace University category with Education to be more inclusive
+1 -1
View File
@@ -1,2 +1,2 @@
__version__ = "9.0.0"
__version__ = "9.0.1"
USER_AGENT = f"parsedmarc/{__version__}"
+50 -41
View File
@@ -395,7 +395,51 @@ def save_aggregate_report_to_elasticsearch(
org_name = metadata["org_name"]
report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise ElasticsearchError(
"Elasticsearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"Elasticsearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"],
@@ -409,8 +453,8 @@ def save_aggregate_report_to_elasticsearch(
for record in aggregate_report["records"]:
begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True)
end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
normalized_timespan = record["normalized_timespan"]
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
@@ -418,41 +462,6 @@ def save_aggregate_report_to_elasticsearch(
aggregate_report["begin_date"] = begin_date
aggregate_report["end_date"] = end_date
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
raise ElasticsearchError(
"Elasticsearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"Elasticsearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"],
org_name=metadata["org_name"],
@@ -460,9 +469,9 @@ def save_aggregate_report_to_elasticsearch(
org_extra_contact_info=metadata["org_extra_contact_info"],
report_id=metadata["report_id"],
date_range=date_range,
date_begin=aggregate_report["begin_date"],
date_end=aggregate_report["end_date"],
normalized_timespan=record["normalized_timespan"],
date_begin=begin_date,
date_end=end_date,
normalized_timespan=normalized_timespan,
errors=metadata["errors"],
published_policy=published_policy,
source_ip_address=record["source"]["ip_address"],
@@ -785,7 +794,7 @@ def save_smtp_tls_report_to_elasticsearch(
policy_doc = _SMTPTLSPolicyDoc(
policy_domain=policy["policy_domain"],
policy_type=policy["policy_type"],
succesful_session_count=policy["successful_session_count"],
successful_session_count=policy["successful_session_count"],
failed_session_count=policy["failed_session_count"],
policy_string=policy_strings,
mx_host_patterns=mx_host_patterns,
+50 -40
View File
@@ -395,7 +395,51 @@ def save_aggregate_report_to_opensearch(
org_name = metadata["org_name"]
report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise OpenSearchError(
"Opensearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"Opensearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"],
@@ -409,8 +453,8 @@ def save_aggregate_report_to_opensearch(
for record in aggregate_report["records"]:
begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True)
end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
normalized_timespan = record["normalized_timespan"]
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
@@ -418,41 +462,6 @@ def save_aggregate_report_to_opensearch(
aggregate_report["begin_date"] = begin_date
aggregate_report["end_date"] = end_date
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
raise OpenSearchError(
"OpenSearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"OpenSearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"],
org_name=metadata["org_name"],
@@ -460,8 +469,9 @@ def save_aggregate_report_to_opensearch(
org_extra_contact_info=metadata["org_extra_contact_info"],
report_id=metadata["report_id"],
date_range=date_range,
date_begin=aggregate_report["begin_date"],
date_end=aggregate_report["end_date"],
date_begin=begin_date,
date_end=end_date,
normalized_timespan=normalized_timespan,
errors=metadata["errors"],
published_policy=published_policy,
source_ip_address=record["source"]["ip_address"],
@@ -784,7 +794,7 @@ def save_smtp_tls_report_to_opensearch(
policy_doc = _SMTPTLSPolicyDoc(
policy_domain=policy["policy_domain"],
policy_type=policy["policy_type"],
succesful_session_count=policy["successful_session_count"],
successful_session_count=policy["successful_session_count"],
failed_session_count=policy["failed_session_count"],
policy_string=policy_strings,
mx_host_patterns=mx_host_patterns,