Normalize report volumes when a report timespan exceed 24 hours

This commit is contained in:
Sean Whalen
2025-11-29 14:52:57 -05:00
parent c1a757ca29
commit 45d1093a99
+224 -8
View File
@@ -2,6 +2,10 @@
"""A Python package for parsing DMARC reports"""
from __future__ import annotations
from typing import Dict, List, Any
import binascii
import email
import email.utils
@@ -17,7 +21,7 @@ import zlib
from base64 import b64decode
from collections import OrderedDict
from csv import DictWriter
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta, timezone, tzinfo
from io import BytesIO, StringIO
from typing import Callable
@@ -79,6 +83,189 @@ class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered"""
def _bucket_interval_by_day(
begin: datetime,
end: datetime,
total_count: int,
) -> List[Dict[str, Any]]:
"""
Split the interval [begin, end) into daily buckets and distribute
`total_count` proportionally across those buckets.
The function:
1. Identifies each calendar day touched by [begin, end)
2. Computes how many seconds of the interval fall into each day
3. Assigns counts in proportion to those overlaps
4. Ensures the final counts sum exactly to total_count
Args:
begin: timezone-aware datetime, inclusive start of interval
end: timezone-aware datetime, exclusive end of interval
total_count: number of messages to distribute
Returns:
A list of dicts like:
{
"begin": datetime,
"end": datetime,
"count": int
}
"""
# --- Input validation ----------------------------------------------------
if begin > end:
raise ValueError("begin must be earlier than end")
if begin.tzinfo is None or end.tzinfo is None:
raise ValueError("begin and end must be timezone-aware")
if begin.tzinfo is not end.tzinfo:
raise ValueError("begin and end must have the same tzinfo")
if total_count < 0:
raise ValueError("total_count must be non-negative")
# --- Short-circuit trivial cases -----------------------------------------
interval_seconds = (end - begin).total_seconds()
if interval_seconds <= 0 or total_count == 0:
return []
tz: tzinfo = begin.tzinfo
# --- Step 1: Determine all calendar days touched by [begin, end) ----------
#
# For example:
# begin = Jan 1 12:00
# end = Jan 3 06:00
#
# We need buckets for:
# Jan 1 12:00 → Jan 2 00:00
# Jan 2 00:00 → Jan 3 00:00
# Jan 3 00:00 → Jan 3 06:00
#
# Start at midnight on the day of `begin`.
day_cursor = datetime(begin.year, begin.month, begin.day, tzinfo=tz)
# If `begin` is earlier on that day (e.g. 10:00), we want that midnight.
# If `begin` is past that midnight (e.g. 00:30), this is correct.
# If `begin` is BEFORE that midnight (rare unless tz shifts), adjust:
if day_cursor > begin:
day_cursor -= timedelta(days=1)
day_buckets: List[Dict[str, Any]] = []
while day_cursor < end:
day_start = day_cursor
day_end = day_cursor + timedelta(days=1)
# Overlap between [begin, end) and this day
overlap_start = max(begin, day_start)
overlap_end = min(end, day_end)
overlap_seconds = (overlap_end - overlap_start).total_seconds()
if overlap_seconds > 0:
day_buckets.append(
{
"begin": overlap_start,
"end": overlap_end,
"seconds": overlap_seconds,
}
)
day_cursor = day_end
# --- Step 2: Pro-rate counts across buckets -------------------------------
#
# Compute the exact fractional count for each bucket:
# bucket_fraction = bucket_seconds / interval_seconds
# bucket_exact = total_count * bucket_fraction
#
# Then apply a "largest remainder" rounding strategy to ensure the sum
# equals exactly total_count.
exact_values: List[float] = [
(b["seconds"] / interval_seconds) * total_count for b in day_buckets
]
floor_values: List[int] = [int(x) for x in exact_values]
fractional_parts: List[float] = [x - int(x) for x in exact_values]
# How many counts do we still need to distribute after flooring?
remainder = total_count - sum(floor_values)
# Sort buckets by descending fractional remainder
indices_by_fraction = sorted(
range(len(day_buckets)),
key=lambda i: fractional_parts[i],
reverse=True,
)
# Start with floor values
final_counts = floor_values[:]
# Add +1 to the buckets with the largest fractional parts
for idx in indices_by_fraction[:remainder]:
final_counts[idx] += 1
# --- Step 3: Build the final per-day result list -------------------------
results: List[Dict[str, Any]] = []
for bucket, count in zip(day_buckets, final_counts):
if count > 0:
results.append(
{
"begin": bucket["begin"],
"end": bucket["end"],
"count": count,
}
)
return results
def _append_parsed_record(
parsed_record: Dict[str, Any],
records: List[Dict[str, Any]],
begin_dt: datetime,
end_dt: datetime,
normalize: bool,
) -> None:
"""
Append a parsed DMARC record either unchanged or normalized.
Args:
parsed_record: The record returned by _parse_report_record().
records: Accumulating list of output records.
begin_dt: Report-level begin datetime (UTC).
end_dt: Report-level end datetime (UTC).
normalize: Whether this report exceeded the allowed timespan
and should be normalized per-day.
"""
# No normalization needed → append as-is.
if not normalize:
parsed_record["normalized_timespan"] = False
records.append(parsed_record)
return
# Normalize case: split the record's count into daily buckets.
total_count = int(parsed_record.get("count", 0))
buckets = _bucket_interval_by_day(begin_dt, end_dt, total_count)
if not buckets:
return
num_parts = len(buckets)
for idx, bucket in enumerate(buckets):
new_rec = parsed_record.copy()
new_rec["count"] = bucket["count"]
new_rec["normalized_timespan"] = True
new_rec["normalized_timespan_parts"] = num_parts
new_rec["normalized_timespan_part_index"] = idx
new_rec["interval_begin"] = bucket["begin"].strftime("%Y-%m-%d %H:%M:%S")
new_rec["interval_end"] = bucket["end"].strftime("%Y-%m-%d %H:%M:%S")
records.append(new_rec)
def _parse_report_record(
record,
ip_db_path=None,
@@ -522,13 +709,30 @@ def parse_aggregate_report_xml(
report_id = report_id.replace("<", "").replace(">", "").split("@")[0]
new_report_metadata["report_id"] = report_id
date_range = report["report_metadata"]["date_range"]
if int(date_range["end"]) - int(date_range["begin"]) > 2 * 86400:
_error = "Time span > 24 hours - RFC 7489 section 7.2"
raise InvalidAggregateReport(_error)
date_range["begin"] = timestamp_to_human(date_range["begin"])
date_range["end"] = timestamp_to_human(date_range["end"])
begin_ts = int(date_range["begin"])
end_ts = int(date_range["end"])
span_seconds = end_ts - begin_ts
# 27h gives some slack around “daily”.
NORMALIZE_SPAN_THRESHOLD_SECONDS = 27 * 3600
normalize_timespan = span_seconds > NORMALIZE_SPAN_THRESHOLD_SECONDS
date_range["begin"] = timestamp_to_human(begin_ts)
date_range["end"] = timestamp_to_human(end_ts)
new_report_metadata["begin_date"] = date_range["begin"]
new_report_metadata["end_date"] = date_range["end"]
new_report_metadata["normalized_timespan"] = normalize_timespan
new_report_metadata["original_span_seconds"] = span_seconds
begin_dt = human_timestamp_to_datetime(
new_report_metadata["begin_date"], to_utc=True
)
end_dt = human_timestamp_to_datetime(
new_report_metadata["end_date"], to_utc=True
)
if "error" in report["report_metadata"]:
if not isinstance(report["report_metadata"]["error"], list):
errors = [report["report_metadata"]["error"]]
@@ -587,7 +791,13 @@ def parse_aggregate_report_xml(
nameservers=nameservers,
dns_timeout=timeout,
)
records.append(report_record)
_append_parsed_record(
parsed_record=report_record,
records=records,
begin_dt=begin_dt,
end_dt=end_dt,
normalize=normalize_timespan,
)
except Exception as e:
logger.warning("Could not parse record: {0}".format(e))
@@ -602,7 +812,13 @@ def parse_aggregate_report_xml(
nameservers=nameservers,
dns_timeout=timeout,
)
records.append(report_record)
_append_parsed_record(
parsed_record=report_record,
records=records,
begin_dt=begin_dt,
end_dt=end_dt,
normalize=normalize_timespan,
)
new_report["records"] = records