* Normalize report volumes when a report timespan exceed 24 hours
This commit is contained in:
Sean Whalen
2025-12-01 17:06:58 -05:00
committed by GitHub
parent 48bff504b4
commit 1fc9f638e2
10 changed files with 542 additions and 274 deletions

13
.vscode/launch.json vendored
View File

@@ -19,20 +19,11 @@
"console": "integratedTerminal"
},
{
"name": "sample.eml",
"name": "sample",
"type": "debugpy",
"request": "launch",
"module": "parsedmarc.cli",
"args": ["samples/private/sample.eml"]
},
{
"name": "find_sus_domains.py",
"type": "debugpy",
"request": "launch",
"program": "find_sus_domains.py",
"args": ["-i", "unknown_domains.txt", "-o", "sus_domains.csv"],
"cwd": "${workspaceFolder}/parsedmarc/resources/maps",
"console": "integratedTerminal"
"args": ["samples/private/sample"]
},
{
"name": "sortlists.py",

View File

@@ -1,6 +1,11 @@
Changelog
=========
9.0.0
------
- Normalize aggregate DMARC report volumes when a report timespan exceeds 24 hours
8.19.1
------

View File

@@ -23,6 +23,8 @@ of the report schema.
"report_id": "9391651994964116463",
"begin_date": "2012-04-27 20:00:00",
"end_date": "2012-04-28 19:59:59",
"timespan_requires_normalization": false,
"original_timespan_seconds": 86399,
"errors": []
},
"policy_published": {
@@ -39,8 +41,10 @@ of the report schema.
"source": {
"ip_address": "72.150.241.94",
"country": "US",
"reverse_dns": "adsl-72-150-241-94.shv.bellsouth.net",
"base_domain": "bellsouth.net"
"reverse_dns": null,
"base_domain": null,
"name": null,
"type": null
},
"count": 2,
"alignment": {
@@ -74,7 +78,10 @@ of the report schema.
"result": "pass"
}
]
}
},
"normalized_timespan": false,
"interval_begin": "2012-04-28 00:00:00",
"interval_end": "2012-04-28 23:59:59"
}
]
}
@@ -83,8 +90,10 @@ of the report schema.
### CSV aggregate report
```text
xml_schema,org_name,org_email,org_extra_contact_info,report_id,begin_date,end_date,errors,domain,adkim,aspf,p,sp,pct,fo,source_ip_address,source_country,source_reverse_dns,source_base_domain,count,spf_aligned,dkim_aligned,dmarc_aligned,disposition,policy_override_reasons,policy_override_comments,envelope_from,header_from,envelope_to,dkim_domains,dkim_selectors,dkim_results,spf_domains,spf_scopes,spf_results
draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-27 20:00:00,2012-04-28 19:59:59,,example.com,r,r,none,none,100,0,72.150.241.94,US,adsl-72-150-241-94.shv.bellsouth.net,bellsouth.net,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass
xml_schema,org_name,org_email,org_extra_contact_info,report_id,begin_date,end_date,normalized_timespan,errors,domain,adkim,aspf,p,sp,pct,fo,source_ip_address,source_country,source_reverse_dns,source_base_domain,source_name,source_type,count,spf_aligned,dkim_aligned,dmarc_aligned,disposition,policy_override_reasons,policy_override_comments,envelope_from,header_from,envelope_to,dkim_domains,dkim_selectors,dkim_results,spf_domains,spf_scopes,spf_results
draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-28 00:00:00,2012-04-28 23:59:59,False,,example.com,r,r,none,none,100,0,72.150.241.94,US,,,,,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass
draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-28 00:00:00,2012-04-28 23:59:59,False,,example.com,r,r,none,none,100,0,72.150.241.94,US,,,,,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass
```
## Sample forensic report output

View File

@@ -4,21 +4,19 @@
```text
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME]
[--forensic-json-filename FORENSIC_JSON_FILENAME]
[--aggregate-csv-filename AGGREGATE_CSV_FILENAME]
[--forensic-csv-filename FORENSIC_CSV_FILENAME]
[-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline]
[-s] [--verbose] [--debug] [--log-file LOG_FILE] [-v]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--forensic-json-filename FORENSIC_JSON_FILENAME]
[--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME] [--aggregate-csv-filename AGGREGATE_CSV_FILENAME]
[--forensic-csv-filename FORENSIC_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME]
[-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] [-s] [-w] [--verbose] [--debug]
[--log-file LOG_FILE] [--no-prettify-json] [-v]
[file_path ...]
Parses DMARC reports
positional arguments:
file_path one or more paths to aggregate or forensic report
files, emails, or mbox files'
file_path one or more paths to aggregate or forensic report files, emails, or mbox files'
optional arguments:
options:
-h, --help show this help message and exit
-c CONFIG_FILE, --config-file CONFIG_FILE
a path to a configuration file (--silent implied)
@@ -30,20 +28,25 @@ usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT
filename for the aggregate JSON output file
--forensic-json-filename FORENSIC_JSON_FILENAME
filename for the forensic JSON output file
--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME
filename for the SMTP TLS JSON output file
--aggregate-csv-filename AGGREGATE_CSV_FILENAME
filename for the aggregate CSV output file
--forensic-csv-filename FORENSIC_CSV_FILENAME
filename for the forensic CSV output file
--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME
filename for the SMTP TLS CSV output file
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query
-t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT
number of seconds to wait for an answer from DNS
(default: 2.0)
number of seconds to wait for an answer from DNS (default: 2.0)
--offline do not make online queries for geolocation or DNS
-s, --silent only print errors and warnings
-s, --silent only print errors
-w, --warnings print warnings in addition to errors
--verbose more verbose output
--debug print debugging information
--log-file LOG_FILE output logging to a file
--no-prettify-json output JSON in a single line without indentation
-v, --version show program's version number and exit
```

View File

@@ -2,6 +2,10 @@
"""A Python package for parsing DMARC reports"""
from __future__ import annotations
from typing import Dict, List, Any, Union, IO, Callable
import binascii
import email
import email.utils
@@ -17,9 +21,8 @@ import zlib
from base64 import b64decode
from collections import OrderedDict
from csv import DictWriter
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta, timezone, tzinfo
from io import BytesIO, StringIO
from typing import Callable
import mailparser
import xmltodict
@@ -79,15 +82,196 @@ class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered"""
def _bucket_interval_by_day(
begin: datetime,
end: datetime,
total_count: int,
) -> List[Dict[Any]]:
"""
Split the interval [begin, end) into daily buckets and distribute
`total_count` proportionally across those buckets.
The function:
1. Identifies each calendar day touched by [begin, end)
2. Computes how many seconds of the interval fall into each day
3. Assigns counts in proportion to those overlaps
4. Ensures the final counts sum exactly to total_count
Args:
begin: timezone-aware datetime, inclusive start of interval
end: timezone-aware datetime, exclusive end of interval
total_count: number of messages to distribute
Returns:
A list of dicts like:
{
"begin": datetime,
"end": datetime,
"count": int
}
"""
# --- Input validation ----------------------------------------------------
if begin > end:
raise ValueError("begin must be earlier than end")
if begin.tzinfo is None or end.tzinfo is None:
raise ValueError("begin and end must be timezone-aware")
if begin.tzinfo is not end.tzinfo:
raise ValueError("begin and end must have the same tzinfo")
if total_count < 0:
raise ValueError("total_count must be non-negative")
# --- Short-circuit trivial cases -----------------------------------------
interval_seconds = (end - begin).total_seconds()
if interval_seconds <= 0 or total_count == 0:
return []
tz: tzinfo = begin.tzinfo
# --- Step 1: Determine all calendar days touched by [begin, end) ----------
#
# For example:
# begin = Jan 1 12:00
# end = Jan 3 06:00
#
# We need buckets for:
# Jan 1 12:00 → Jan 2 00:00
# Jan 2 00:00 → Jan 3 00:00
# Jan 3 00:00 → Jan 3 06:00
#
# Start at midnight on the day of `begin`.
day_cursor = datetime(begin.year, begin.month, begin.day, tzinfo=tz)
# If `begin` is earlier on that day (e.g. 10:00), we want that midnight.
# If `begin` is past that midnight (e.g. 00:30), this is correct.
# If `begin` is BEFORE that midnight (rare unless tz shifts), adjust:
if day_cursor > begin:
day_cursor -= timedelta(days=1)
day_buckets: List[Dict[str, Any]] = []
while day_cursor < end:
day_start = day_cursor
day_end = day_cursor + timedelta(days=1)
# Overlap between [begin, end) and this day
overlap_start = max(begin, day_start)
overlap_end = min(end, day_end)
overlap_seconds = (overlap_end - overlap_start).total_seconds()
if overlap_seconds > 0:
day_buckets.append(
{
"begin": overlap_start,
"end": overlap_end,
"seconds": overlap_seconds,
}
)
day_cursor = day_end
# --- Step 2: Pro-rate counts across buckets -------------------------------
#
# Compute the exact fractional count for each bucket:
# bucket_fraction = bucket_seconds / interval_seconds
# bucket_exact = total_count * bucket_fraction
#
# Then apply a "largest remainder" rounding strategy to ensure the sum
# equals exactly total_count.
exact_values: List[float] = [
(b["seconds"] / interval_seconds) * total_count for b in day_buckets
]
floor_values: List[int] = [int(x) for x in exact_values]
fractional_parts: List[float] = [x - int(x) for x in exact_values]
# How many counts do we still need to distribute after flooring?
remainder = total_count - sum(floor_values)
# Sort buckets by descending fractional remainder
indices_by_fraction = sorted(
range(len(day_buckets)),
key=lambda i: fractional_parts[i],
reverse=True,
)
# Start with floor values
final_counts = floor_values[:]
# Add +1 to the buckets with the largest fractional parts
for idx in indices_by_fraction[:remainder]:
final_counts[idx] += 1
# --- Step 3: Build the final per-day result list -------------------------
results: List[Dict[str, Any]] = []
for bucket, count in zip(day_buckets, final_counts):
if count > 0:
results.append(
{
"begin": bucket["begin"],
"end": bucket["end"],
"count": count,
}
)
return results
def _append_parsed_record(
parsed_record: Dict[str, Any],
records: List[Dict[str, Any]],
begin_dt: datetime,
end_dt: datetime,
normalize: bool,
) -> None:
"""
Append a parsed DMARC record either unchanged or normalized.
Args:
parsed_record: The record returned by _parse_report_record().
records: Accumulating list of output records.
begin_dt: Report-level begin datetime (UTC).
end_dt: Report-level end datetime (UTC).
normalize: Whether this report exceeded the allowed timespan
and should be normalized per-day.
"""
if not normalize:
parsed_record["normalized_timespan"] = False
parsed_record["interval_begin"] = begin_dt.strftime("%Y-%m-%d %H:%M:%S")
parsed_record["interval_end"] = end_dt.strftime("%Y-%m-%d %H:%M:%S")
records.append(parsed_record)
return
# Normalization path: break record into daily buckets
total_count = int(parsed_record.get("count", 0))
buckets = _bucket_interval_by_day(begin_dt, end_dt, total_count)
if not buckets:
return
for part_index, bucket in enumerate(buckets):
new_rec = parsed_record.copy()
new_rec["count"] = bucket["count"]
new_rec["normalized_timespan"] = True
new_rec["interval_begin"] = bucket["begin"].strftime("%Y-%m-%d %H:%M:%S")
new_rec["interval_end"] = bucket["end"].strftime("%Y-%m-%d %H:%M:%S")
records.append(new_rec)
def _parse_report_record(
record,
ip_db_path=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
offline=False,
nameservers=None,
dns_timeout=2.0,
record: dict,
ip_db_path: str = None,
always_use_local_files: bool = False,
reverse_dns_map_path: str = None,
reverse_dns_map_url: str = None,
offline: bool = False,
nameservers: list[str] = None,
dns_timeout: float = 2.0,
):
"""
Converts a record from a DMARC aggregate report into a more consistent
@@ -242,7 +426,7 @@ def _parse_report_record(
return new_record
def _parse_smtp_tls_failure_details(failure_details):
def _parse_smtp_tls_failure_details(failure_details: dict):
try:
new_failure_details = OrderedDict(
result_type=failure_details["result-type"],
@@ -278,7 +462,7 @@ def _parse_smtp_tls_failure_details(failure_details):
raise InvalidSMTPTLSReport(str(e))
def _parse_smtp_tls_report_policy(policy):
def _parse_smtp_tls_report_policy(policy: dict):
policy_types = ["tlsa", "sts", "no-policy-found"]
try:
policy_domain = policy["policy"]["policy-domain"]
@@ -315,7 +499,7 @@ def _parse_smtp_tls_report_policy(policy):
raise InvalidSMTPTLSReport(str(e))
def parse_smtp_tls_report_json(report):
def parse_smtp_tls_report_json(report: dict):
"""Parses and validates an SMTP TLS report"""
required_fields = [
"organization-name",
@@ -354,7 +538,7 @@ def parse_smtp_tls_report_json(report):
raise InvalidSMTPTLSReport(str(e))
def parsed_smtp_tls_reports_to_csv_rows(reports):
def parsed_smtp_tls_reports_to_csv_rows(reports: dict):
"""Converts one oor more parsed SMTP TLS reports into a list of single
layer OrderedDict objects suitable for use in a CSV"""
if type(reports) is OrderedDict:
@@ -389,7 +573,7 @@ def parsed_smtp_tls_reports_to_csv_rows(reports):
return rows
def parsed_smtp_tls_reports_to_csv(reports):
def parsed_smtp_tls_reports_to_csv(reports: dict):
"""
Converts one or more parsed SMTP TLS reports to flat CSV format, including
headers
@@ -435,15 +619,16 @@ def parsed_smtp_tls_reports_to_csv(reports):
def parse_aggregate_report_xml(
xml,
ip_db_path=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
offline=False,
nameservers=None,
timeout=2.0,
keep_alive=None,
xml: str,
ip_db_path: bool = None,
always_use_local_files: bool = False,
reverse_dns_map_path: bool = None,
reverse_dns_map_url: bool = None,
offline: bool = False,
nameservers: bool = None,
timeout: float = 2.0,
keep_alive: callable = None,
normalize_timespan_threshold_hours: float = 24.0,
):
"""Parses a DMARC XML report string and returns a consistent OrderedDict
@@ -458,6 +643,7 @@ def parse_aggregate_report_xml(
(Cloudflare's public DNS resolvers by default)
timeout (float): Sets the DNS timeout in seconds
keep_alive (callable): Keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: The parsed aggregate DMARC report
@@ -522,13 +708,27 @@ def parse_aggregate_report_xml(
report_id = report_id.replace("<", "").replace(">", "").split("@")[0]
new_report_metadata["report_id"] = report_id
date_range = report["report_metadata"]["date_range"]
if int(date_range["end"]) - int(date_range["begin"]) > 2 * 86400:
_error = "Time span > 24 hours - RFC 7489 section 7.2"
raise InvalidAggregateReport(_error)
date_range["begin"] = timestamp_to_human(date_range["begin"])
date_range["end"] = timestamp_to_human(date_range["end"])
begin_ts = int(date_range["begin"])
end_ts = int(date_range["end"])
span_seconds = end_ts - begin_ts
normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600
date_range["begin"] = timestamp_to_human(begin_ts)
date_range["end"] = timestamp_to_human(end_ts)
new_report_metadata["begin_date"] = date_range["begin"]
new_report_metadata["end_date"] = date_range["end"]
new_report_metadata["timespan_requires_normalization"] = normalize_timespan
new_report_metadata["original_timespan_seconds"] = span_seconds
begin_dt = human_timestamp_to_datetime(
new_report_metadata["begin_date"], to_utc=True
)
end_dt = human_timestamp_to_datetime(
new_report_metadata["end_date"], to_utc=True
)
if "error" in report["report_metadata"]:
if not isinstance(report["report_metadata"]["error"], list):
errors = [report["report_metadata"]["error"]]
@@ -587,7 +787,13 @@ def parse_aggregate_report_xml(
nameservers=nameservers,
dns_timeout=timeout,
)
records.append(report_record)
_append_parsed_record(
parsed_record=report_record,
records=records,
begin_dt=begin_dt,
end_dt=end_dt,
normalize=normalize_timespan,
)
except Exception as e:
logger.warning("Could not parse record: {0}".format(e))
@@ -602,7 +808,13 @@ def parse_aggregate_report_xml(
nameservers=nameservers,
dns_timeout=timeout,
)
records.append(report_record)
_append_parsed_record(
parsed_record=report_record,
records=records,
begin_dt=begin_dt,
end_dt=end_dt,
normalize=normalize_timespan,
)
new_report["records"] = records
@@ -620,7 +832,7 @@ def parse_aggregate_report_xml(
raise InvalidAggregateReport("Unexpected error: {0}".format(error.__str__()))
def extract_report(content):
def extract_report(content: Union[bytes, str, IO[Any]]):
"""
Extracts text from a zip or gzip file, as a base64-encoded string,
file-like object, or bytes.
@@ -684,15 +896,16 @@ def extract_report_from_file_path(file_path):
def parse_aggregate_report_file(
_input,
offline=False,
always_use_local_files=None,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
ip_db_path=None,
nameservers=None,
dns_timeout=2.0,
keep_alive=None,
_input: Union[str, bytes, IO[Any]],
offline: bool = False,
always_use_local_files: bool = None,
reverse_dns_map_path: str = None,
reverse_dns_map_url: str = None,
ip_db_path: str = None,
nameservers: list[str] = None,
dns_timeout: float = 2.0,
keep_alive: Callable = None,
normalize_timespan_threshold_hours: float = 24.0,
):
"""Parses a file at the given path, a file-like object. or bytes as an
aggregate DMARC report
@@ -708,6 +921,7 @@ def parse_aggregate_report_file(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
keep_alive (callable): Keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: The parsed DMARC aggregate report
@@ -728,10 +942,11 @@ def parse_aggregate_report_file(
nameservers=nameservers,
timeout=dns_timeout,
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
def parsed_aggregate_reports_to_csv_rows(reports):
def parsed_aggregate_reports_to_csv_rows(reports: list[dict]):
"""
Converts one or more parsed aggregate reports to list of dicts in flat CSV
format
@@ -760,6 +975,9 @@ def parsed_aggregate_reports_to_csv_rows(reports):
report_id = report["report_metadata"]["report_id"]
begin_date = report["report_metadata"]["begin_date"]
end_date = report["report_metadata"]["end_date"]
normalized_timespan = report["report_metadata"][
"timespan_requires_normalization"
]
errors = "|".join(report["report_metadata"]["errors"])
domain = report["policy_published"]["domain"]
adkim = report["policy_published"]["adkim"]
@@ -777,6 +995,7 @@ def parsed_aggregate_reports_to_csv_rows(reports):
report_id=report_id,
begin_date=begin_date,
end_date=end_date,
normalized_timespan=normalized_timespan,
errors=errors,
domain=domain,
adkim=adkim,
@@ -789,6 +1008,8 @@ def parsed_aggregate_reports_to_csv_rows(reports):
for record in report["records"]:
row = report_dict.copy()
row["begin_date"] = record["interval_begin"]
row["end_date"] = record["interval_end"]
row["source_ip_address"] = record["source"]["ip_address"]
row["source_country"] = record["source"]["country"]
row["source_reverse_dns"] = record["source"]["reverse_dns"]
@@ -849,7 +1070,7 @@ def parsed_aggregate_reports_to_csv_rows(reports):
return rows
def parsed_aggregate_reports_to_csv(reports):
def parsed_aggregate_reports_to_csv(reports: list[OrderedDict]):
"""
Converts one or more parsed aggregate reports to flat CSV format, including
headers
@@ -869,6 +1090,7 @@ def parsed_aggregate_reports_to_csv(reports):
"report_id",
"begin_date",
"end_date",
"normalized_timespan",
"errors",
"domain",
"adkim",
@@ -915,17 +1137,17 @@ def parsed_aggregate_reports_to_csv(reports):
def parse_forensic_report(
feedback_report,
sample,
msg_date,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
offline=False,
ip_db_path=None,
nameservers=None,
dns_timeout=2.0,
strip_attachment_payloads=False,
feedback_report: str,
sample: str,
msg_date: datetime,
always_use_local_files: bool = False,
reverse_dns_map_path: str = None,
reverse_dns_map_url: str = None,
offline: bool = False,
ip_db_path: str = None,
nameservers: list[str] = None,
dns_timeout: float = 2.0,
strip_attachment_payloads: bool = False,
):
"""
Converts a DMARC forensic report and sample to a ``OrderedDict``
@@ -1054,7 +1276,7 @@ def parse_forensic_report(
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(reports):
def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict]):
"""
Converts one or more parsed forensic reports to a list of dicts in flat CSV
format
@@ -1090,7 +1312,7 @@ def parsed_forensic_reports_to_csv_rows(reports):
return rows
def parsed_forensic_reports_to_csv(reports):
def parsed_forensic_reports_to_csv(reports: list[dict]):
"""
Converts one or more parsed forensic reports to flat CSV format, including
headers
@@ -1143,16 +1365,17 @@ def parsed_forensic_reports_to_csv(reports):
def parse_report_email(
input_,
offline=False,
ip_db_path=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
nameservers=None,
dns_timeout=2.0,
strip_attachment_payloads=False,
keep_alive=None,
input_: Union[bytes, str],
offline: bool = False,
ip_db_path: str = None,
always_use_local_files: bool = False,
reverse_dns_map_path: str = None,
reverse_dns_map_url: str = None,
nameservers: list[str] = None,
dns_timeout: float = 2.0,
strip_attachment_payloads: bool = False,
keep_alive: callable = None,
normalize_timespan_threshold_hours: float = 24.0,
):
"""
Parses a DMARC report from an email
@@ -1169,6 +1392,7 @@ def parse_report_email(
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
keep_alive (callable): keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict:
@@ -1281,6 +1505,7 @@ def parse_report_email(
nameservers=nameservers,
timeout=dns_timeout,
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
result = OrderedDict(
[("report_type", "aggregate"), ("report", aggregate_report)]
@@ -1337,16 +1562,17 @@ def parse_report_email(
def parse_report_file(
input_,
nameservers=None,
dns_timeout=2.0,
strip_attachment_payloads=False,
ip_db_path=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
offline=False,
keep_alive=None,
input_: Union[bytes, str, IO[Any]],
nameservers: list[str] = None,
dns_timeout: float = 2.0,
strip_attachment_payloads: bool = False,
ip_db_path: str = None,
always_use_local_files: bool = False,
reverse_dns_map_path: str = None,
reverse_dns_map_url: str = None,
offline: bool = False,
keep_alive: Callable = None,
normalize_timespan_threshold_hours: float = 24,
):
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
@@ -1389,6 +1615,7 @@ def parse_report_file(
nameservers=nameservers,
dns_timeout=dns_timeout,
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
results = OrderedDict([("report_type", "aggregate"), ("report", report)])
except InvalidAggregateReport:
@@ -1409,6 +1636,7 @@ def parse_report_file(
dns_timeout=dns_timeout,
strip_attachment_payloads=sa,
keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
except InvalidDMARCReport:
raise ParserError("Not a valid report")
@@ -1416,15 +1644,16 @@ def parse_report_file(
def get_dmarc_reports_from_mbox(
input_,
nameservers=None,
dns_timeout=2.0,
strip_attachment_payloads=False,
ip_db_path=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
offline=False,
input_: str,
nameservers: list[str] = None,
dns_timeout: float = 2.0,
strip_attachment_payloads: bool = False,
ip_db_path: str = None,
always_use_local_files: bool = False,
reverse_dns_map_path: str = None,
reverse_dns_map_url: str = None,
offline: bool = False,
normalize_timespan_threshold_hours: float = 24.0,
):
"""Parses a mailbox in mbox format containing e-mails with attached
DMARC reports
@@ -1441,6 +1670,7 @@ def get_dmarc_reports_from_mbox(
reverse_dns_map_url (str): URL to a reverse DNS map file
ip_db_path (str): Path to a MMDB file from MaxMind or DBIP
offline (bool): Do not make online queries for geolocation or DNS
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
@@ -1470,6 +1700,7 @@ def get_dmarc_reports_from_mbox(
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
if parsed_email["report_type"] == "aggregate":
report_org = parsed_email["report"]["report_metadata"]["org_name"]
@@ -1502,22 +1733,23 @@ def get_dmarc_reports_from_mbox(
def get_dmarc_reports_from_mailbox(
connection: MailboxConnection,
reports_folder="INBOX",
archive_folder="Archive",
delete=False,
test=False,
ip_db_path=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
offline=False,
nameservers=None,
dns_timeout=6.0,
strip_attachment_payloads=False,
results=None,
batch_size=10,
since=None,
create_folders=True,
reports_folder: str = "INBOX",
archive_folder: str = "Archive",
delete: bool = False,
test: bool = False,
ip_db_path: str = None,
always_use_local_files: str = False,
reverse_dns_map_path: str = None,
reverse_dns_map_url: str = None,
offline: bool = False,
nameservers: list[str] = None,
dns_timeout: float = 6.0,
strip_attachment_payloads: bool = False,
results: dict = None,
batch_size: int = 10,
since: datetime = None,
create_folders: bool = True,
normalize_timespan_threshold_hours: float = 24,
):
"""
Fetches and parses DMARC reports from a mailbox
@@ -1544,6 +1776,7 @@ def get_dmarc_reports_from_mailbox(
(units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"})
create_folders (bool): Whether to create the destination folders
(not used in watch)
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
@@ -1661,6 +1894,7 @@ def get_dmarc_reports_from_mailbox(
offline=offline,
strip_attachment_payloads=sa,
keep_alive=connection.keepalive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
if parsed_email["report_type"] == "aggregate":
report_org = parsed_email["report"]["report_metadata"]["org_name"]
@@ -1812,6 +2046,7 @@ def get_dmarc_reports_from_mailbox(
reverse_dns_map_url=reverse_dns_map_url,
offline=offline,
since=current_time,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
return results
@@ -1820,20 +2055,21 @@ def get_dmarc_reports_from_mailbox(
def watch_inbox(
mailbox_connection: MailboxConnection,
callback: Callable,
reports_folder="INBOX",
archive_folder="Archive",
delete=False,
test=False,
check_timeout=30,
ip_db_path=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
offline=False,
nameservers=None,
dns_timeout=6.0,
strip_attachment_payloads=False,
batch_size=None,
reports_folder: str = "INBOX",
archive_folder: str = "Archive",
delete: bool = False,
test: bool = False,
check_timeout: int = 30,
ip_db_path: str = None,
always_use_local_files: bool = False,
reverse_dns_map_path: str = None,
reverse_dns_map_url: str = None,
offline: bool = False,
nameservers: list[str] = None,
dns_timeout: float = 6.0,
strip_attachment_payloads: bool = False,
batch_size: int = None,
normalize_timespan_threshold_hours: float = 24,
):
"""
Watches the mailbox for new messages and
@@ -1859,6 +2095,7 @@ def watch_inbox(
strip_attachment_payloads (bool): Replace attachment payloads in
forensic report samples with None
batch_size (int): Number of messages to read and process before saving
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
"""
def check_callback(connection):
@@ -1879,6 +2116,7 @@ def watch_inbox(
strip_attachment_payloads=sa,
batch_size=batch_size,
create_folders=False,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
callback(res)
@@ -1921,14 +2159,14 @@ def append_csv(filename, csv):
def save_output(
results,
output_directory="output",
aggregate_json_filename="aggregate.json",
forensic_json_filename="forensic.json",
smtp_tls_json_filename="smtp_tls.json",
aggregate_csv_filename="aggregate.csv",
forensic_csv_filename="forensic.csv",
smtp_tls_csv_filename="smtp_tls.csv",
results: OrderedDict,
output_directory: str = "output",
aggregate_json_filename: str = "aggregate.json",
forensic_json_filename: str = "forensic.json",
smtp_tls_json_filename: str = "smtp_tls.json",
aggregate_csv_filename: str = "aggregate.csv",
forensic_csv_filename: str = "forensic.csv",
smtp_tls_csv_filename: str = "smtp_tls.csv",
):
"""
Save report data in the given directory
@@ -2006,7 +2244,7 @@ def save_output(
sample_file.write(sample)
def get_report_zip(results):
def get_report_zip(results: OrderedDict):
"""
Creates a zip file of parsed report output

View File

@@ -77,6 +77,7 @@ def cli_parse(
always_use_local_files,
reverse_dns_map_path,
reverse_dns_map_url,
normalize_timespan_threshold_hours,
conn,
):
"""Separated this function for multiprocessing"""
@@ -91,6 +92,7 @@ def cli_parse(
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
conn.send([file_results, file_path])
except ParserError as error:
@@ -659,6 +661,7 @@ def _main():
webhook_forensic_url=None,
webhook_smtp_tls_url=None,
webhook_timeout=60,
normalize_timespan_threshold_hours=24.0,
)
args = arg_parser.parse_args()
@@ -674,8 +677,11 @@ def _main():
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
if general_config["silent"].lower() == "false":
opts.silent = False
opts.silent = general_config.getboolean("silent")
if "normalize_timespan_threshold_hours" in general_config:
opts.normalize_timespan_threshold_hours = general_config.getfloat(
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
@@ -1456,6 +1462,7 @@ def _main():
opts.always_use_local_files,
opts.reverse_dns_map_path,
opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours,
child_conn,
),
)
@@ -1506,6 +1513,7 @@ def _main():
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
@@ -1615,6 +1623,7 @@ def _main():
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
aggregate_reports += reports["aggregate_reports"]
@@ -1677,6 +1686,7 @@ def _main():
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))

View File

@@ -1,2 +1,2 @@
__version__ = "8.19.1"
__version__ = "9.0.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -67,6 +67,8 @@ class _AggregateReportDoc(Document):
date_range = Date()
date_begin = Date()
date_end = Date()
normalized_timespan = Boolean()
original_timespan_seconds = Integer
errors = Text()
published_policy = Object(_PublishedPolicy)
source_ip_address = Ip()
@@ -393,8 +395,20 @@ def save_aggregate_report_to_elasticsearch(
org_name = metadata["org_name"]
report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"],
aspf=aggregate_report["policy_published"]["aspf"],
p=aggregate_report["policy_published"]["p"],
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
)
for record in aggregate_report["records"]:
begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True)
end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
@@ -439,17 +453,6 @@ def save_aggregate_report_to_elasticsearch(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"],
aspf=aggregate_report["policy_published"]["aspf"],
p=aggregate_report["policy_published"]["p"],
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
)
for record in aggregate_report["records"]:
agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"],
org_name=metadata["org_name"],
@@ -459,6 +462,7 @@ def save_aggregate_report_to_elasticsearch(
date_range=date_range,
date_begin=aggregate_report["begin_date"],
date_end=aggregate_report["end_date"],
normalized_timespan=record["normalized_timespan"],
errors=metadata["errors"],
published_policy=published_policy,
source_ip_address=record["source"]["ip_address"],

View File

@@ -67,6 +67,8 @@ class _AggregateReportDoc(Document):
date_range = Date()
date_begin = Date()
date_end = Date()
normalized_timespan = Boolean()
original_timespan_seconds = Integer
errors = Text()
published_policy = Object(_PublishedPolicy)
source_ip_address = Ip()
@@ -393,8 +395,20 @@ def save_aggregate_report_to_opensearch(
org_name = metadata["org_name"]
report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"],
aspf=aggregate_report["policy_published"]["aspf"],
p=aggregate_report["policy_published"]["p"],
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
)
for record in aggregate_report["records"]:
begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True)
end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
@@ -439,17 +453,6 @@ def save_aggregate_report_to_opensearch(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"],
aspf=aggregate_report["policy_published"]["aspf"],
p=aggregate_report["policy_published"]["p"],
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
)
for record in aggregate_report["records"]:
agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"],
org_name=metadata["org_name"],

View File

@@ -78,6 +78,9 @@ class HECClient(object):
new_report = dict()
for metadata in report["report_metadata"]:
new_report[metadata] = report["report_metadata"][metadata]
new_report["interval_begin"] = record["interval_begin"]
new_report["interval_end"] = record["interval_end"]
new_report["normalized_timespan"] = record["normalized_timespan"]
new_report["published_policy"] = report["policy_published"]
new_report["source_ip_address"] = record["source"]["ip_address"]
new_report["source_country"] = record["source"]["country"]
@@ -98,7 +101,9 @@ class HECClient(object):
new_report["spf_results"] = record["auth_results"]["spf"]
data["sourcetype"] = "dmarc:aggregate"
timestamp = human_timestamp_to_unix_timestamp(new_report["begin_date"])
timestamp = human_timestamp_to_unix_timestamp(
new_report["interval_begin"]
)
data["time"] = timestamp
data["event"] = new_report.copy()
json_str += "{0}\n".format(json.dumps(data))