From 45d1093a997d31be82732a691cc253d9cb2d8130 Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Sat, 29 Nov 2025 14:52:57 -0500 Subject: [PATCH] Normalize report volumes when a report timespan exceed 24 hours --- parsedmarc/__init__.py | 232 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 224 insertions(+), 8 deletions(-) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index b6a2209..2999185 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -2,6 +2,10 @@ """A Python package for parsing DMARC reports""" +from __future__ import annotations + +from typing import Dict, List, Any + import binascii import email import email.utils @@ -17,7 +21,7 @@ import zlib from base64 import b64decode from collections import OrderedDict from csv import DictWriter -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta, timezone, tzinfo from io import BytesIO, StringIO from typing import Callable @@ -79,6 +83,189 @@ class InvalidForensicReport(InvalidDMARCReport): """Raised when an invalid DMARC forensic report is encountered""" +def _bucket_interval_by_day( + begin: datetime, + end: datetime, + total_count: int, +) -> List[Dict[str, Any]]: + """ + Split the interval [begin, end) into daily buckets and distribute + `total_count` proportionally across those buckets. + + The function: + 1. Identifies each calendar day touched by [begin, end) + 2. Computes how many seconds of the interval fall into each day + 3. Assigns counts in proportion to those overlaps + 4. Ensures the final counts sum exactly to total_count + + Args: + begin: timezone-aware datetime, inclusive start of interval + end: timezone-aware datetime, exclusive end of interval + total_count: number of messages to distribute + + Returns: + A list of dicts like: + { + "begin": datetime, + "end": datetime, + "count": int + } + """ + # --- Input validation ---------------------------------------------------- + if begin > end: + raise ValueError("begin must be earlier than end") + if begin.tzinfo is None or end.tzinfo is None: + raise ValueError("begin and end must be timezone-aware") + if begin.tzinfo is not end.tzinfo: + raise ValueError("begin and end must have the same tzinfo") + if total_count < 0: + raise ValueError("total_count must be non-negative") + + # --- Short-circuit trivial cases ----------------------------------------- + interval_seconds = (end - begin).total_seconds() + if interval_seconds <= 0 or total_count == 0: + return [] + + tz: tzinfo = begin.tzinfo + + # --- Step 1: Determine all calendar days touched by [begin, end) ---------- + # + # For example: + # begin = Jan 1 12:00 + # end = Jan 3 06:00 + # + # We need buckets for: + # Jan 1 12:00 → Jan 2 00:00 + # Jan 2 00:00 → Jan 3 00:00 + # Jan 3 00:00 → Jan 3 06:00 + # + + # Start at midnight on the day of `begin`. + day_cursor = datetime(begin.year, begin.month, begin.day, tzinfo=tz) + + # If `begin` is earlier on that day (e.g. 10:00), we want that midnight. + # If `begin` is past that midnight (e.g. 00:30), this is correct. + # If `begin` is BEFORE that midnight (rare unless tz shifts), adjust: + if day_cursor > begin: + day_cursor -= timedelta(days=1) + + day_buckets: List[Dict[str, Any]] = [] + + while day_cursor < end: + day_start = day_cursor + day_end = day_cursor + timedelta(days=1) + + # Overlap between [begin, end) and this day + overlap_start = max(begin, day_start) + overlap_end = min(end, day_end) + + overlap_seconds = (overlap_end - overlap_start).total_seconds() + + if overlap_seconds > 0: + day_buckets.append( + { + "begin": overlap_start, + "end": overlap_end, + "seconds": overlap_seconds, + } + ) + + day_cursor = day_end + + # --- Step 2: Pro-rate counts across buckets ------------------------------- + # + # Compute the exact fractional count for each bucket: + # bucket_fraction = bucket_seconds / interval_seconds + # bucket_exact = total_count * bucket_fraction + # + # Then apply a "largest remainder" rounding strategy to ensure the sum + # equals exactly total_count. + + exact_values: List[float] = [ + (b["seconds"] / interval_seconds) * total_count for b in day_buckets + ] + + floor_values: List[int] = [int(x) for x in exact_values] + fractional_parts: List[float] = [x - int(x) for x in exact_values] + + # How many counts do we still need to distribute after flooring? + remainder = total_count - sum(floor_values) + + # Sort buckets by descending fractional remainder + indices_by_fraction = sorted( + range(len(day_buckets)), + key=lambda i: fractional_parts[i], + reverse=True, + ) + + # Start with floor values + final_counts = floor_values[:] + + # Add +1 to the buckets with the largest fractional parts + for idx in indices_by_fraction[:remainder]: + final_counts[idx] += 1 + + # --- Step 3: Build the final per-day result list ------------------------- + results: List[Dict[str, Any]] = [] + for bucket, count in zip(day_buckets, final_counts): + if count > 0: + results.append( + { + "begin": bucket["begin"], + "end": bucket["end"], + "count": count, + } + ) + + return results + + +def _append_parsed_record( + parsed_record: Dict[str, Any], + records: List[Dict[str, Any]], + begin_dt: datetime, + end_dt: datetime, + normalize: bool, +) -> None: + """ + Append a parsed DMARC record either unchanged or normalized. + + Args: + parsed_record: The record returned by _parse_report_record(). + records: Accumulating list of output records. + begin_dt: Report-level begin datetime (UTC). + end_dt: Report-level end datetime (UTC). + normalize: Whether this report exceeded the allowed timespan + and should be normalized per-day. + """ + # No normalization needed → append as-is. + if not normalize: + parsed_record["normalized_timespan"] = False + records.append(parsed_record) + return + + # Normalize case: split the record's count into daily buckets. + total_count = int(parsed_record.get("count", 0)) + buckets = _bucket_interval_by_day(begin_dt, end_dt, total_count) + + if not buckets: + return + + num_parts = len(buckets) + + for idx, bucket in enumerate(buckets): + new_rec = parsed_record.copy() + new_rec["count"] = bucket["count"] + new_rec["normalized_timespan"] = True + new_rec["normalized_timespan_parts"] = num_parts + new_rec["normalized_timespan_part_index"] = idx + + new_rec["interval_begin"] = bucket["begin"].strftime("%Y-%m-%d %H:%M:%S") + new_rec["interval_end"] = bucket["end"].strftime("%Y-%m-%d %H:%M:%S") + + records.append(new_rec) + + def _parse_report_record( record, ip_db_path=None, @@ -522,13 +709,30 @@ def parse_aggregate_report_xml( report_id = report_id.replace("<", "").replace(">", "").split("@")[0] new_report_metadata["report_id"] = report_id date_range = report["report_metadata"]["date_range"] - if int(date_range["end"]) - int(date_range["begin"]) > 2 * 86400: - _error = "Time span > 24 hours - RFC 7489 section 7.2" - raise InvalidAggregateReport(_error) - date_range["begin"] = timestamp_to_human(date_range["begin"]) - date_range["end"] = timestamp_to_human(date_range["end"]) + + begin_ts = int(date_range["begin"]) + end_ts = int(date_range["end"]) + span_seconds = end_ts - begin_ts + + # 27h gives some slack around “daily”. + NORMALIZE_SPAN_THRESHOLD_SECONDS = 27 * 3600 + + normalize_timespan = span_seconds > NORMALIZE_SPAN_THRESHOLD_SECONDS + + date_range["begin"] = timestamp_to_human(begin_ts) + date_range["end"] = timestamp_to_human(end_ts) + new_report_metadata["begin_date"] = date_range["begin"] new_report_metadata["end_date"] = date_range["end"] + new_report_metadata["normalized_timespan"] = normalize_timespan + new_report_metadata["original_span_seconds"] = span_seconds + begin_dt = human_timestamp_to_datetime( + new_report_metadata["begin_date"], to_utc=True + ) + end_dt = human_timestamp_to_datetime( + new_report_metadata["end_date"], to_utc=True + ) + if "error" in report["report_metadata"]: if not isinstance(report["report_metadata"]["error"], list): errors = [report["report_metadata"]["error"]] @@ -587,7 +791,13 @@ def parse_aggregate_report_xml( nameservers=nameservers, dns_timeout=timeout, ) - records.append(report_record) + _append_parsed_record( + parsed_record=report_record, + records=records, + begin_dt=begin_dt, + end_dt=end_dt, + normalize=normalize_timespan, + ) except Exception as e: logger.warning("Could not parse record: {0}".format(e)) @@ -602,7 +812,13 @@ def parse_aggregate_report_xml( nameservers=nameservers, dns_timeout=timeout, ) - records.append(report_record) + _append_parsed_record( + parsed_record=report_record, + records=records, + begin_dt=begin_dt, + end_dt=end_dt, + normalize=normalize_timespan, + ) new_report["records"] = records