From c8cdd90a1e0587517d6456fab2a2ddf734a04514 Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Mon, 1 Dec 2025 16:34:40 -0500 Subject: [PATCH] Normalize timespans for aggregate reports in Elasticsearch and Opensearch --- parsedmarc/elastic.py | 96 +++++++++++++++++++++------------------- parsedmarc/opensearch.py | 95 ++++++++++++++++++++------------------- 2 files changed, 99 insertions(+), 92 deletions(-) diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 5c7ca91..3fa7606 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -67,6 +67,8 @@ class _AggregateReportDoc(Document): date_range = Date() date_begin = Date() date_end = Date() + normalized_timespan = Boolean() + original_timespan_seconds = Integer errors = Text() published_policy = Object(_PublishedPolicy) source_ip_address = Ip() @@ -393,52 +395,7 @@ def save_aggregate_report_to_elasticsearch( org_name = metadata["org_name"] report_id = metadata["report_id"] domain = aggregate_report["policy_published"]["domain"] - begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True) - end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True) - begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") - end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") - if monthly_indexes: - index_date = begin_date.strftime("%Y-%m") - else: - index_date = begin_date.strftime("%Y-%m-%d") - aggregate_report["begin_date"] = begin_date - aggregate_report["end_date"] = end_date - date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] - - org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) - report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) - domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) - begin_date_query = Q(dict(match=dict(date_begin=begin_date))) - end_date_query = Q(dict(match=dict(date_end=end_date))) - - if index_suffix is not None: - search_index = "dmarc_aggregate_{0}*".format(index_suffix) - else: - search_index = "dmarc_aggregate*" - if index_prefix is not None: - search_index = "{0}{1}".format(index_prefix, search_index) - search = Search(index=search_index) - query = org_name_query & report_id_query & domain_query - query = query & begin_date_query & end_date_query - search.query = query - - try: - existing = search.execute() - except Exception as error_: - raise ElasticsearchError( - "Elasticsearch's search for existing report \ - error: {}".format(error_.__str__()) - ) - - if len(existing) > 0: - raise AlreadySaved( - "An aggregate report ID {0} from {1} about {2} " - "with a date range of {3} UTC to {4} UTC already " - "exists in " - "Elasticsearch".format( - report_id, org_name, domain, begin_date_human, end_date_human - ) - ) + published_policy = _PublishedPolicy( domain=aggregate_report["policy_published"]["domain"], adkim=aggregate_report["policy_published"]["adkim"], @@ -450,6 +407,52 @@ def save_aggregate_report_to_elasticsearch( ) for record in aggregate_report["records"]: + begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True) + end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True) + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + aggregate_report["begin_date"] = begin_date + aggregate_report["end_date"] = end_date + date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search_index = "dmarc_aggregate_{0}*".format(index_suffix) + else: + search_index = "dmarc_aggregate*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) + query = org_name_query & report_id_query & domain_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + raise ElasticsearchError( + "Elasticsearch's search for existing report \ + error: {}".format(error_.__str__()) + ) + + if len(existing) > 0: + raise AlreadySaved( + "An aggregate report ID {0} from {1} about {2} " + "with a date range of {3} UTC to {4} UTC already " + "exists in " + "Elasticsearch".format( + report_id, org_name, domain, begin_date_human, end_date_human + ) + ) agg_doc = _AggregateReportDoc( xml_schema=aggregate_report["xml_schema"], org_name=metadata["org_name"], @@ -459,6 +462,7 @@ def save_aggregate_report_to_elasticsearch( date_range=date_range, date_begin=aggregate_report["begin_date"], date_end=aggregate_report["end_date"], + normalized_timespan=record["normalized_timespan"], errors=metadata["errors"], published_policy=published_policy, source_ip_address=record["source"]["ip_address"], diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index d947ea7..b82ba9c 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -67,6 +67,8 @@ class _AggregateReportDoc(Document): date_range = Date() date_begin = Date() date_end = Date() + normalized_timespan = Boolean() + original_timespan_seconds = Integer errors = Text() published_policy = Object(_PublishedPolicy) source_ip_address = Ip() @@ -393,52 +395,7 @@ def save_aggregate_report_to_opensearch( org_name = metadata["org_name"] report_id = metadata["report_id"] domain = aggregate_report["policy_published"]["domain"] - begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True) - end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True) - begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") - end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") - if monthly_indexes: - index_date = begin_date.strftime("%Y-%m") - else: - index_date = begin_date.strftime("%Y-%m-%d") - aggregate_report["begin_date"] = begin_date - aggregate_report["end_date"] = end_date - date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] - - org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) - report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) - domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) - begin_date_query = Q(dict(match=dict(date_begin=begin_date))) - end_date_query = Q(dict(match=dict(date_end=end_date))) - - if index_suffix is not None: - search_index = "dmarc_aggregate_{0}*".format(index_suffix) - else: - search_index = "dmarc_aggregate*" - if index_prefix is not None: - search_index = "{0}{1}".format(index_prefix, search_index) - search = Search(index=search_index) - query = org_name_query & report_id_query & domain_query - query = query & begin_date_query & end_date_query - search.query = query - - try: - existing = search.execute() - except Exception as error_: - raise OpenSearchError( - "OpenSearch's search for existing report \ - error: {}".format(error_.__str__()) - ) - - if len(existing) > 0: - raise AlreadySaved( - "An aggregate report ID {0} from {1} about {2} " - "with a date range of {3} UTC to {4} UTC already " - "exists in " - "OpenSearch".format( - report_id, org_name, domain, begin_date_human, end_date_human - ) - ) + published_policy = _PublishedPolicy( domain=aggregate_report["policy_published"]["domain"], adkim=aggregate_report["policy_published"]["adkim"], @@ -450,6 +407,52 @@ def save_aggregate_report_to_opensearch( ) for record in aggregate_report["records"]: + begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True) + end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True) + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + aggregate_report["begin_date"] = begin_date + aggregate_report["end_date"] = end_date + date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search_index = "dmarc_aggregate_{0}*".format(index_suffix) + else: + search_index = "dmarc_aggregate*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) + query = org_name_query & report_id_query & domain_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + raise OpenSearchError( + "OpenSearch's search for existing report \ + error: {}".format(error_.__str__()) + ) + + if len(existing) > 0: + raise AlreadySaved( + "An aggregate report ID {0} from {1} about {2} " + "with a date range of {3} UTC to {4} UTC already " + "exists in " + "OpenSearch".format( + report_id, org_name, domain, begin_date_human, end_date_human + ) + ) agg_doc = _AggregateReportDoc( xml_schema=aggregate_report["xml_schema"], org_name=metadata["org_name"],