diff --git a/.vscode/launch.json b/.vscode/launch.json index 2cc3124..f83087f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -19,20 +19,11 @@ "console": "integratedTerminal" }, { - "name": "sample.eml", + "name": "sample", "type": "debugpy", "request": "launch", "module": "parsedmarc.cli", - "args": ["samples/private/sample.eml"] - }, - { - "name": "find_sus_domains.py", - "type": "debugpy", - "request": "launch", - "program": "find_sus_domains.py", - "args": ["-i", "unknown_domains.txt", "-o", "sus_domains.csv"], - "cwd": "${workspaceFolder}/parsedmarc/resources/maps", - "console": "integratedTerminal" + "args": ["samples/private/sample"] }, { "name": "sortlists.py", diff --git a/CHANGELOG.md b/CHANGELOG.md index 17091d1..91de095 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ Changelog ========= +9.0.0 +------ + +- Normalize aggregate DMARC report volumes when a report timespan exceeds 24 hours + 8.19.1 ------ diff --git a/docs/source/output.md b/docs/source/output.md index 61b8273..a8d19e4 100644 --- a/docs/source/output.md +++ b/docs/source/output.md @@ -23,6 +23,8 @@ of the report schema. "report_id": "9391651994964116463", "begin_date": "2012-04-27 20:00:00", "end_date": "2012-04-28 19:59:59", + "timespan_requires_normalization": false, + "original_timespan_seconds": 86399, "errors": [] }, "policy_published": { @@ -39,8 +41,10 @@ of the report schema. "source": { "ip_address": "72.150.241.94", "country": "US", - "reverse_dns": "adsl-72-150-241-94.shv.bellsouth.net", - "base_domain": "bellsouth.net" + "reverse_dns": null, + "base_domain": null, + "name": null, + "type": null }, "count": 2, "alignment": { @@ -74,7 +78,10 @@ of the report schema. "result": "pass" } ] - } + }, + "normalized_timespan": false, + "interval_begin": "2012-04-28 00:00:00", + "interval_end": "2012-04-28 23:59:59" } ] } @@ -83,8 +90,10 @@ of the report schema. ### CSV aggregate report ```text -xml_schema,org_name,org_email,org_extra_contact_info,report_id,begin_date,end_date,errors,domain,adkim,aspf,p,sp,pct,fo,source_ip_address,source_country,source_reverse_dns,source_base_domain,count,spf_aligned,dkim_aligned,dmarc_aligned,disposition,policy_override_reasons,policy_override_comments,envelope_from,header_from,envelope_to,dkim_domains,dkim_selectors,dkim_results,spf_domains,spf_scopes,spf_results -draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-27 20:00:00,2012-04-28 19:59:59,,example.com,r,r,none,none,100,0,72.150.241.94,US,adsl-72-150-241-94.shv.bellsouth.net,bellsouth.net,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass +xml_schema,org_name,org_email,org_extra_contact_info,report_id,begin_date,end_date,normalized_timespan,errors,domain,adkim,aspf,p,sp,pct,fo,source_ip_address,source_country,source_reverse_dns,source_base_domain,source_name,source_type,count,spf_aligned,dkim_aligned,dmarc_aligned,disposition,policy_override_reasons,policy_override_comments,envelope_from,header_from,envelope_to,dkim_domains,dkim_selectors,dkim_results,spf_domains,spf_scopes,spf_results +draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-28 00:00:00,2012-04-28 23:59:59,False,,example.com,r,r,none,none,100,0,72.150.241.94,US,,,,,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass +draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-28 00:00:00,2012-04-28 23:59:59,False,,example.com,r,r,none,none,100,0,72.150.241.94,US,,,,,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass + ``` ## Sample forensic report output diff --git a/docs/source/usage.md b/docs/source/usage.md index 00d7e28..85eec61 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -4,47 +4,50 @@ ```text usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT] - [--aggregate-json-filename AGGREGATE_JSON_FILENAME] - [--forensic-json-filename FORENSIC_JSON_FILENAME] - [--aggregate-csv-filename AGGREGATE_CSV_FILENAME] - [--forensic-csv-filename FORENSIC_CSV_FILENAME] - [-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] - [-s] [--verbose] [--debug] [--log-file LOG_FILE] [-v] - [file_path ...] + [--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--forensic-json-filename FORENSIC_JSON_FILENAME] + [--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME] [--aggregate-csv-filename AGGREGATE_CSV_FILENAME] + [--forensic-csv-filename FORENSIC_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME] + [-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] [-s] [-w] [--verbose] [--debug] + [--log-file LOG_FILE] [--no-prettify-json] [-v] + [file_path ...] - Parses DMARC reports +Parses DMARC reports - positional arguments: - file_path one or more paths to aggregate or forensic report - files, emails, or mbox files' +positional arguments: + file_path one or more paths to aggregate or forensic report files, emails, or mbox files' - optional arguments: - -h, --help show this help message and exit - -c CONFIG_FILE, --config-file CONFIG_FILE - a path to a configuration file (--silent implied) - --strip-attachment-payloads - remove attachment payloads from forensic report output - -o OUTPUT, --output OUTPUT - write output files to the given directory - --aggregate-json-filename AGGREGATE_JSON_FILENAME - filename for the aggregate JSON output file - --forensic-json-filename FORENSIC_JSON_FILENAME - filename for the forensic JSON output file - --aggregate-csv-filename AGGREGATE_CSV_FILENAME - filename for the aggregate CSV output file - --forensic-csv-filename FORENSIC_CSV_FILENAME - filename for the forensic CSV output file - -n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...] - nameservers to query - -t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT - number of seconds to wait for an answer from DNS - (default: 2.0) - --offline do not make online queries for geolocation or DNS - -s, --silent only print errors and warnings - --verbose more verbose output - --debug print debugging information - --log-file LOG_FILE output logging to a file - -v, --version show program's version number and exit +options: + -h, --help show this help message and exit + -c CONFIG_FILE, --config-file CONFIG_FILE + a path to a configuration file (--silent implied) + --strip-attachment-payloads + remove attachment payloads from forensic report output + -o OUTPUT, --output OUTPUT + write output files to the given directory + --aggregate-json-filename AGGREGATE_JSON_FILENAME + filename for the aggregate JSON output file + --forensic-json-filename FORENSIC_JSON_FILENAME + filename for the forensic JSON output file + --smtp-tls-json-filename SMTP_TLS_JSON_FILENAME + filename for the SMTP TLS JSON output file + --aggregate-csv-filename AGGREGATE_CSV_FILENAME + filename for the aggregate CSV output file + --forensic-csv-filename FORENSIC_CSV_FILENAME + filename for the forensic CSV output file + --smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME + filename for the SMTP TLS CSV output file + -n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...] + nameservers to query + -t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT + number of seconds to wait for an answer from DNS (default: 2.0) + --offline do not make online queries for geolocation or DNS + -s, --silent only print errors + -w, --warnings print warnings in addition to errors + --verbose more verbose output + --debug print debugging information + --log-file LOG_FILE output logging to a file + --no-prettify-json output JSON in a single line without indentation + -v, --version show program's version number and exit ``` :::{note} diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index a3e6a5c..c353213 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -2,6 +2,10 @@ """A Python package for parsing DMARC reports""" +from __future__ import annotations + +from typing import Dict, List, Any, Union, IO, Callable + import binascii import email import email.utils @@ -17,9 +21,8 @@ import zlib from base64 import b64decode from collections import OrderedDict from csv import DictWriter -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta, timezone, tzinfo from io import BytesIO, StringIO -from typing import Callable import mailparser import xmltodict @@ -79,15 +82,196 @@ class InvalidForensicReport(InvalidDMARCReport): """Raised when an invalid DMARC forensic report is encountered""" +def _bucket_interval_by_day( + begin: datetime, + end: datetime, + total_count: int, +) -> List[Dict[Any]]: + """ + Split the interval [begin, end) into daily buckets and distribute + `total_count` proportionally across those buckets. + + The function: + 1. Identifies each calendar day touched by [begin, end) + 2. Computes how many seconds of the interval fall into each day + 3. Assigns counts in proportion to those overlaps + 4. Ensures the final counts sum exactly to total_count + + Args: + begin: timezone-aware datetime, inclusive start of interval + end: timezone-aware datetime, exclusive end of interval + total_count: number of messages to distribute + + Returns: + A list of dicts like: + { + "begin": datetime, + "end": datetime, + "count": int + } + """ + # --- Input validation ---------------------------------------------------- + if begin > end: + raise ValueError("begin must be earlier than end") + if begin.tzinfo is None or end.tzinfo is None: + raise ValueError("begin and end must be timezone-aware") + if begin.tzinfo is not end.tzinfo: + raise ValueError("begin and end must have the same tzinfo") + if total_count < 0: + raise ValueError("total_count must be non-negative") + + # --- Short-circuit trivial cases ----------------------------------------- + interval_seconds = (end - begin).total_seconds() + if interval_seconds <= 0 or total_count == 0: + return [] + + tz: tzinfo = begin.tzinfo + + # --- Step 1: Determine all calendar days touched by [begin, end) ---------- + # + # For example: + # begin = Jan 1 12:00 + # end = Jan 3 06:00 + # + # We need buckets for: + # Jan 1 12:00 → Jan 2 00:00 + # Jan 2 00:00 → Jan 3 00:00 + # Jan 3 00:00 → Jan 3 06:00 + # + + # Start at midnight on the day of `begin`. + day_cursor = datetime(begin.year, begin.month, begin.day, tzinfo=tz) + + # If `begin` is earlier on that day (e.g. 10:00), we want that midnight. + # If `begin` is past that midnight (e.g. 00:30), this is correct. + # If `begin` is BEFORE that midnight (rare unless tz shifts), adjust: + if day_cursor > begin: + day_cursor -= timedelta(days=1) + + day_buckets: List[Dict[str, Any]] = [] + + while day_cursor < end: + day_start = day_cursor + day_end = day_cursor + timedelta(days=1) + + # Overlap between [begin, end) and this day + overlap_start = max(begin, day_start) + overlap_end = min(end, day_end) + + overlap_seconds = (overlap_end - overlap_start).total_seconds() + + if overlap_seconds > 0: + day_buckets.append( + { + "begin": overlap_start, + "end": overlap_end, + "seconds": overlap_seconds, + } + ) + + day_cursor = day_end + + # --- Step 2: Pro-rate counts across buckets ------------------------------- + # + # Compute the exact fractional count for each bucket: + # bucket_fraction = bucket_seconds / interval_seconds + # bucket_exact = total_count * bucket_fraction + # + # Then apply a "largest remainder" rounding strategy to ensure the sum + # equals exactly total_count. + + exact_values: List[float] = [ + (b["seconds"] / interval_seconds) * total_count for b in day_buckets + ] + + floor_values: List[int] = [int(x) for x in exact_values] + fractional_parts: List[float] = [x - int(x) for x in exact_values] + + # How many counts do we still need to distribute after flooring? + remainder = total_count - sum(floor_values) + + # Sort buckets by descending fractional remainder + indices_by_fraction = sorted( + range(len(day_buckets)), + key=lambda i: fractional_parts[i], + reverse=True, + ) + + # Start with floor values + final_counts = floor_values[:] + + # Add +1 to the buckets with the largest fractional parts + for idx in indices_by_fraction[:remainder]: + final_counts[idx] += 1 + + # --- Step 3: Build the final per-day result list ------------------------- + results: List[Dict[str, Any]] = [] + for bucket, count in zip(day_buckets, final_counts): + if count > 0: + results.append( + { + "begin": bucket["begin"], + "end": bucket["end"], + "count": count, + } + ) + + return results + + +def _append_parsed_record( + parsed_record: Dict[str, Any], + records: List[Dict[str, Any]], + begin_dt: datetime, + end_dt: datetime, + normalize: bool, +) -> None: + """ + Append a parsed DMARC record either unchanged or normalized. + + Args: + parsed_record: The record returned by _parse_report_record(). + records: Accumulating list of output records. + begin_dt: Report-level begin datetime (UTC). + end_dt: Report-level end datetime (UTC). + normalize: Whether this report exceeded the allowed timespan + and should be normalized per-day. + """ + + if not normalize: + parsed_record["normalized_timespan"] = False + parsed_record["interval_begin"] = begin_dt.strftime("%Y-%m-%d %H:%M:%S") + parsed_record["interval_end"] = end_dt.strftime("%Y-%m-%d %H:%M:%S") + + records.append(parsed_record) + return + + # Normalization path: break record into daily buckets + total_count = int(parsed_record.get("count", 0)) + buckets = _bucket_interval_by_day(begin_dt, end_dt, total_count) + if not buckets: + return + + for part_index, bucket in enumerate(buckets): + new_rec = parsed_record.copy() + new_rec["count"] = bucket["count"] + new_rec["normalized_timespan"] = True + + new_rec["interval_begin"] = bucket["begin"].strftime("%Y-%m-%d %H:%M:%S") + new_rec["interval_end"] = bucket["end"].strftime("%Y-%m-%d %H:%M:%S") + + records.append(new_rec) + + def _parse_report_record( - record, - ip_db_path=None, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - offline=False, - nameservers=None, - dns_timeout=2.0, + record: dict, + ip_db_path: str = None, + always_use_local_files: bool = False, + reverse_dns_map_path: str = None, + reverse_dns_map_url: str = None, + offline: bool = False, + nameservers: list[str] = None, + dns_timeout: float = 2.0, ): """ Converts a record from a DMARC aggregate report into a more consistent @@ -242,7 +426,7 @@ def _parse_report_record( return new_record -def _parse_smtp_tls_failure_details(failure_details): +def _parse_smtp_tls_failure_details(failure_details: dict): try: new_failure_details = OrderedDict( result_type=failure_details["result-type"], @@ -278,7 +462,7 @@ def _parse_smtp_tls_failure_details(failure_details): raise InvalidSMTPTLSReport(str(e)) -def _parse_smtp_tls_report_policy(policy): +def _parse_smtp_tls_report_policy(policy: dict): policy_types = ["tlsa", "sts", "no-policy-found"] try: policy_domain = policy["policy"]["policy-domain"] @@ -315,7 +499,7 @@ def _parse_smtp_tls_report_policy(policy): raise InvalidSMTPTLSReport(str(e)) -def parse_smtp_tls_report_json(report): +def parse_smtp_tls_report_json(report: dict): """Parses and validates an SMTP TLS report""" required_fields = [ "organization-name", @@ -354,7 +538,7 @@ def parse_smtp_tls_report_json(report): raise InvalidSMTPTLSReport(str(e)) -def parsed_smtp_tls_reports_to_csv_rows(reports): +def parsed_smtp_tls_reports_to_csv_rows(reports: dict): """Converts one oor more parsed SMTP TLS reports into a list of single layer OrderedDict objects suitable for use in a CSV""" if type(reports) is OrderedDict: @@ -389,7 +573,7 @@ def parsed_smtp_tls_reports_to_csv_rows(reports): return rows -def parsed_smtp_tls_reports_to_csv(reports): +def parsed_smtp_tls_reports_to_csv(reports: dict): """ Converts one or more parsed SMTP TLS reports to flat CSV format, including headers @@ -435,15 +619,16 @@ def parsed_smtp_tls_reports_to_csv(reports): def parse_aggregate_report_xml( - xml, - ip_db_path=None, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - offline=False, - nameservers=None, - timeout=2.0, - keep_alive=None, + xml: str, + ip_db_path: bool = None, + always_use_local_files: bool = False, + reverse_dns_map_path: bool = None, + reverse_dns_map_url: bool = None, + offline: bool = False, + nameservers: bool = None, + timeout: float = 2.0, + keep_alive: callable = None, + normalize_timespan_threshold_hours: float = 24.0, ): """Parses a DMARC XML report string and returns a consistent OrderedDict @@ -458,6 +643,7 @@ def parse_aggregate_report_xml( (Cloudflare's public DNS resolvers by default) timeout (float): Sets the DNS timeout in seconds keep_alive (callable): Keep alive function + normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: OrderedDict: The parsed aggregate DMARC report @@ -522,13 +708,27 @@ def parse_aggregate_report_xml( report_id = report_id.replace("<", "").replace(">", "").split("@")[0] new_report_metadata["report_id"] = report_id date_range = report["report_metadata"]["date_range"] - if int(date_range["end"]) - int(date_range["begin"]) > 2 * 86400: - _error = "Time span > 24 hours - RFC 7489 section 7.2" - raise InvalidAggregateReport(_error) - date_range["begin"] = timestamp_to_human(date_range["begin"]) - date_range["end"] = timestamp_to_human(date_range["end"]) + + begin_ts = int(date_range["begin"]) + end_ts = int(date_range["end"]) + span_seconds = end_ts - begin_ts + + normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600 + + date_range["begin"] = timestamp_to_human(begin_ts) + date_range["end"] = timestamp_to_human(end_ts) + new_report_metadata["begin_date"] = date_range["begin"] new_report_metadata["end_date"] = date_range["end"] + new_report_metadata["timespan_requires_normalization"] = normalize_timespan + new_report_metadata["original_timespan_seconds"] = span_seconds + begin_dt = human_timestamp_to_datetime( + new_report_metadata["begin_date"], to_utc=True + ) + end_dt = human_timestamp_to_datetime( + new_report_metadata["end_date"], to_utc=True + ) + if "error" in report["report_metadata"]: if not isinstance(report["report_metadata"]["error"], list): errors = [report["report_metadata"]["error"]] @@ -587,7 +787,13 @@ def parse_aggregate_report_xml( nameservers=nameservers, dns_timeout=timeout, ) - records.append(report_record) + _append_parsed_record( + parsed_record=report_record, + records=records, + begin_dt=begin_dt, + end_dt=end_dt, + normalize=normalize_timespan, + ) except Exception as e: logger.warning("Could not parse record: {0}".format(e)) @@ -602,7 +808,13 @@ def parse_aggregate_report_xml( nameservers=nameservers, dns_timeout=timeout, ) - records.append(report_record) + _append_parsed_record( + parsed_record=report_record, + records=records, + begin_dt=begin_dt, + end_dt=end_dt, + normalize=normalize_timespan, + ) new_report["records"] = records @@ -620,7 +832,7 @@ def parse_aggregate_report_xml( raise InvalidAggregateReport("Unexpected error: {0}".format(error.__str__())) -def extract_report(content): +def extract_report(content: Union[bytes, str, IO[Any]]): """ Extracts text from a zip or gzip file, as a base64-encoded string, file-like object, or bytes. @@ -684,15 +896,16 @@ def extract_report_from_file_path(file_path): def parse_aggregate_report_file( - _input, - offline=False, - always_use_local_files=None, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - ip_db_path=None, - nameservers=None, - dns_timeout=2.0, - keep_alive=None, + _input: Union[str, bytes, IO[Any]], + offline: bool = False, + always_use_local_files: bool = None, + reverse_dns_map_path: str = None, + reverse_dns_map_url: str = None, + ip_db_path: str = None, + nameservers: list[str] = None, + dns_timeout: float = 2.0, + keep_alive: Callable = None, + normalize_timespan_threshold_hours: float = 24.0, ): """Parses a file at the given path, a file-like object. or bytes as an aggregate DMARC report @@ -708,6 +921,7 @@ def parse_aggregate_report_file( (Cloudflare's public DNS resolvers by default) dns_timeout (float): Sets the DNS timeout in seconds keep_alive (callable): Keep alive function + normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: OrderedDict: The parsed DMARC aggregate report @@ -728,10 +942,11 @@ def parse_aggregate_report_file( nameservers=nameservers, timeout=dns_timeout, keep_alive=keep_alive, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) -def parsed_aggregate_reports_to_csv_rows(reports): +def parsed_aggregate_reports_to_csv_rows(reports: list[dict]): """ Converts one or more parsed aggregate reports to list of dicts in flat CSV format @@ -760,6 +975,9 @@ def parsed_aggregate_reports_to_csv_rows(reports): report_id = report["report_metadata"]["report_id"] begin_date = report["report_metadata"]["begin_date"] end_date = report["report_metadata"]["end_date"] + normalized_timespan = report["report_metadata"][ + "timespan_requires_normalization" + ] errors = "|".join(report["report_metadata"]["errors"]) domain = report["policy_published"]["domain"] adkim = report["policy_published"]["adkim"] @@ -777,6 +995,7 @@ def parsed_aggregate_reports_to_csv_rows(reports): report_id=report_id, begin_date=begin_date, end_date=end_date, + normalized_timespan=normalized_timespan, errors=errors, domain=domain, adkim=adkim, @@ -789,6 +1008,8 @@ def parsed_aggregate_reports_to_csv_rows(reports): for record in report["records"]: row = report_dict.copy() + row["begin_date"] = record["interval_begin"] + row["end_date"] = record["interval_end"] row["source_ip_address"] = record["source"]["ip_address"] row["source_country"] = record["source"]["country"] row["source_reverse_dns"] = record["source"]["reverse_dns"] @@ -849,7 +1070,7 @@ def parsed_aggregate_reports_to_csv_rows(reports): return rows -def parsed_aggregate_reports_to_csv(reports): +def parsed_aggregate_reports_to_csv(reports: list[OrderedDict]): """ Converts one or more parsed aggregate reports to flat CSV format, including headers @@ -869,6 +1090,7 @@ def parsed_aggregate_reports_to_csv(reports): "report_id", "begin_date", "end_date", + "normalized_timespan", "errors", "domain", "adkim", @@ -915,17 +1137,17 @@ def parsed_aggregate_reports_to_csv(reports): def parse_forensic_report( - feedback_report, - sample, - msg_date, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - offline=False, - ip_db_path=None, - nameservers=None, - dns_timeout=2.0, - strip_attachment_payloads=False, + feedback_report: str, + sample: str, + msg_date: datetime, + always_use_local_files: bool = False, + reverse_dns_map_path: str = None, + reverse_dns_map_url: str = None, + offline: bool = False, + ip_db_path: str = None, + nameservers: list[str] = None, + dns_timeout: float = 2.0, + strip_attachment_payloads: bool = False, ): """ Converts a DMARC forensic report and sample to a ``OrderedDict`` @@ -1054,7 +1276,7 @@ def parse_forensic_report( raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__())) -def parsed_forensic_reports_to_csv_rows(reports): +def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict]): """ Converts one or more parsed forensic reports to a list of dicts in flat CSV format @@ -1090,7 +1312,7 @@ def parsed_forensic_reports_to_csv_rows(reports): return rows -def parsed_forensic_reports_to_csv(reports): +def parsed_forensic_reports_to_csv(reports: list[dict]): """ Converts one or more parsed forensic reports to flat CSV format, including headers @@ -1143,16 +1365,17 @@ def parsed_forensic_reports_to_csv(reports): def parse_report_email( - input_, - offline=False, - ip_db_path=None, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - nameservers=None, - dns_timeout=2.0, - strip_attachment_payloads=False, - keep_alive=None, + input_: Union[bytes, str], + offline: bool = False, + ip_db_path: str = None, + always_use_local_files: bool = False, + reverse_dns_map_path: str = None, + reverse_dns_map_url: str = None, + nameservers: list[str] = None, + dns_timeout: float = 2.0, + strip_attachment_payloads: bool = False, + keep_alive: callable = None, + normalize_timespan_threshold_hours: float = 24.0, ): """ Parses a DMARC report from an email @@ -1169,6 +1392,7 @@ def parse_report_email( strip_attachment_payloads (bool): Remove attachment payloads from forensic report results keep_alive (callable): keep alive function + normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: OrderedDict: @@ -1281,6 +1505,7 @@ def parse_report_email( nameservers=nameservers, timeout=dns_timeout, keep_alive=keep_alive, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) result = OrderedDict( [("report_type", "aggregate"), ("report", aggregate_report)] @@ -1337,16 +1562,17 @@ def parse_report_email( def parse_report_file( - input_, - nameservers=None, - dns_timeout=2.0, - strip_attachment_payloads=False, - ip_db_path=None, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - offline=False, - keep_alive=None, + input_: Union[bytes, str, IO[Any]], + nameservers: list[str] = None, + dns_timeout: float = 2.0, + strip_attachment_payloads: bool = False, + ip_db_path: str = None, + always_use_local_files: bool = False, + reverse_dns_map_path: str = None, + reverse_dns_map_url: str = None, + offline: bool = False, + keep_alive: Callable = None, + normalize_timespan_threshold_hours: float = 24, ): """Parses a DMARC aggregate or forensic file at the given path, a file-like object. or bytes @@ -1389,6 +1615,7 @@ def parse_report_file( nameservers=nameservers, dns_timeout=dns_timeout, keep_alive=keep_alive, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) results = OrderedDict([("report_type", "aggregate"), ("report", report)]) except InvalidAggregateReport: @@ -1409,6 +1636,7 @@ def parse_report_file( dns_timeout=dns_timeout, strip_attachment_payloads=sa, keep_alive=keep_alive, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) except InvalidDMARCReport: raise ParserError("Not a valid report") @@ -1416,15 +1644,16 @@ def parse_report_file( def get_dmarc_reports_from_mbox( - input_, - nameservers=None, - dns_timeout=2.0, - strip_attachment_payloads=False, - ip_db_path=None, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - offline=False, + input_: str, + nameservers: list[str] = None, + dns_timeout: float = 2.0, + strip_attachment_payloads: bool = False, + ip_db_path: str = None, + always_use_local_files: bool = False, + reverse_dns_map_path: str = None, + reverse_dns_map_url: str = None, + offline: bool = False, + normalize_timespan_threshold_hours: float = 24.0, ): """Parses a mailbox in mbox format containing e-mails with attached DMARC reports @@ -1441,6 +1670,7 @@ def get_dmarc_reports_from_mbox( reverse_dns_map_url (str): URL to a reverse DNS map file ip_db_path (str): Path to a MMDB file from MaxMind or DBIP offline (bool): Do not make online queries for geolocation or DNS + normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports`` @@ -1470,6 +1700,7 @@ def get_dmarc_reports_from_mbox( nameservers=nameservers, dns_timeout=dns_timeout, strip_attachment_payloads=sa, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) if parsed_email["report_type"] == "aggregate": report_org = parsed_email["report"]["report_metadata"]["org_name"] @@ -1502,22 +1733,23 @@ def get_dmarc_reports_from_mbox( def get_dmarc_reports_from_mailbox( connection: MailboxConnection, - reports_folder="INBOX", - archive_folder="Archive", - delete=False, - test=False, - ip_db_path=None, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - offline=False, - nameservers=None, - dns_timeout=6.0, - strip_attachment_payloads=False, - results=None, - batch_size=10, - since=None, - create_folders=True, + reports_folder: str = "INBOX", + archive_folder: str = "Archive", + delete: bool = False, + test: bool = False, + ip_db_path: str = None, + always_use_local_files: str = False, + reverse_dns_map_path: str = None, + reverse_dns_map_url: str = None, + offline: bool = False, + nameservers: list[str] = None, + dns_timeout: float = 6.0, + strip_attachment_payloads: bool = False, + results: dict = None, + batch_size: int = 10, + since: datetime = None, + create_folders: bool = True, + normalize_timespan_threshold_hours: float = 24, ): """ Fetches and parses DMARC reports from a mailbox @@ -1544,6 +1776,7 @@ def get_dmarc_reports_from_mailbox( (units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}) create_folders (bool): Whether to create the destination folders (not used in watch) + normalize_timespan_threshold_hours (float): Normalize timespans beyond this Returns: OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports`` @@ -1661,6 +1894,7 @@ def get_dmarc_reports_from_mailbox( offline=offline, strip_attachment_payloads=sa, keep_alive=connection.keepalive, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) if parsed_email["report_type"] == "aggregate": report_org = parsed_email["report"]["report_metadata"]["org_name"] @@ -1812,6 +2046,7 @@ def get_dmarc_reports_from_mailbox( reverse_dns_map_url=reverse_dns_map_url, offline=offline, since=current_time, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) return results @@ -1820,20 +2055,21 @@ def get_dmarc_reports_from_mailbox( def watch_inbox( mailbox_connection: MailboxConnection, callback: Callable, - reports_folder="INBOX", - archive_folder="Archive", - delete=False, - test=False, - check_timeout=30, - ip_db_path=None, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - offline=False, - nameservers=None, - dns_timeout=6.0, - strip_attachment_payloads=False, - batch_size=None, + reports_folder: str = "INBOX", + archive_folder: str = "Archive", + delete: bool = False, + test: bool = False, + check_timeout: int = 30, + ip_db_path: str = None, + always_use_local_files: bool = False, + reverse_dns_map_path: str = None, + reverse_dns_map_url: str = None, + offline: bool = False, + nameservers: list[str] = None, + dns_timeout: float = 6.0, + strip_attachment_payloads: bool = False, + batch_size: int = None, + normalize_timespan_threshold_hours: float = 24, ): """ Watches the mailbox for new messages and @@ -1859,6 +2095,7 @@ def watch_inbox( strip_attachment_payloads (bool): Replace attachment payloads in forensic report samples with None batch_size (int): Number of messages to read and process before saving + normalize_timespan_threshold_hours (float): Normalize timespans beyond this """ def check_callback(connection): @@ -1879,6 +2116,7 @@ def watch_inbox( strip_attachment_payloads=sa, batch_size=batch_size, create_folders=False, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) callback(res) @@ -1921,14 +2159,14 @@ def append_csv(filename, csv): def save_output( - results, - output_directory="output", - aggregate_json_filename="aggregate.json", - forensic_json_filename="forensic.json", - smtp_tls_json_filename="smtp_tls.json", - aggregate_csv_filename="aggregate.csv", - forensic_csv_filename="forensic.csv", - smtp_tls_csv_filename="smtp_tls.csv", + results: OrderedDict, + output_directory: str = "output", + aggregate_json_filename: str = "aggregate.json", + forensic_json_filename: str = "forensic.json", + smtp_tls_json_filename: str = "smtp_tls.json", + aggregate_csv_filename: str = "aggregate.csv", + forensic_csv_filename: str = "forensic.csv", + smtp_tls_csv_filename: str = "smtp_tls.csv", ): """ Save report data in the given directory @@ -2006,7 +2244,7 @@ def save_output( sample_file.write(sample) -def get_report_zip(results): +def get_report_zip(results: OrderedDict): """ Creates a zip file of parsed report output diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 14de880..ee4a96e 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -77,6 +77,7 @@ def cli_parse( always_use_local_files, reverse_dns_map_path, reverse_dns_map_url, + normalize_timespan_threshold_hours, conn, ): """Separated this function for multiprocessing""" @@ -91,6 +92,7 @@ def cli_parse( nameservers=nameservers, dns_timeout=dns_timeout, strip_attachment_payloads=sa, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, ) conn.send([file_results, file_path]) except ParserError as error: @@ -659,6 +661,7 @@ def _main(): webhook_forensic_url=None, webhook_smtp_tls_url=None, webhook_timeout=60, + normalize_timespan_threshold_hours=24.0, ) args = arg_parser.parse_args() @@ -674,8 +677,11 @@ def _main(): if "general" in config.sections(): general_config = config["general"] if "silent" in general_config: - if general_config["silent"].lower() == "false": - opts.silent = False + opts.silent = general_config.getboolean("silent") + if "normalize_timespan_threshold_hours" in general_config: + opts.normalize_timespan_threshold_hours = general_config.getfloat( + "normalize_timespan_threshold_hours" + ) if "index_prefix_domain_map" in general_config: with open(general_config["index_prefix_domain_map"]) as f: index_prefix_domain_map = yaml.safe_load(f) @@ -1456,6 +1462,7 @@ def _main(): opts.always_use_local_files, opts.reverse_dns_map_path, opts.reverse_dns_map_url, + opts.normalize_timespan_threshold_hours, child_conn, ), ) @@ -1506,6 +1513,7 @@ def _main(): reverse_dns_map_path=opts.reverse_dns_map_path, reverse_dns_map_url=opts.reverse_dns_map_url, offline=opts.offline, + normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours, ) aggregate_reports += reports["aggregate_reports"] forensic_reports += reports["forensic_reports"] @@ -1615,6 +1623,7 @@ def _main(): test=opts.mailbox_test, strip_attachment_payloads=opts.strip_attachment_payloads, since=opts.mailbox_since, + normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours, ) aggregate_reports += reports["aggregate_reports"] @@ -1677,6 +1686,7 @@ def _main(): reverse_dns_map_path=opts.reverse_dns_map_path, reverse_dns_map_url=opts.reverse_dns_map_url, offline=opts.offline, + normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours, ) except FileExistsError as error: logger.error("{0}".format(error.__str__())) diff --git a/parsedmarc/constants.py b/parsedmarc/constants.py index 94a103a..62f6605 100644 --- a/parsedmarc/constants.py +++ b/parsedmarc/constants.py @@ -1,2 +1,2 @@ -__version__ = "8.19.1" +__version__ = "9.0.0" USER_AGENT = f"parsedmarc/{__version__}" diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 9c71f20..0bd274a 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -67,6 +67,8 @@ class _AggregateReportDoc(Document): date_range = Date() date_begin = Date() date_end = Date() + normalized_timespan = Boolean() + original_timespan_seconds = Integer errors = Text() published_policy = Object(_PublishedPolicy) source_ip_address = Ip() @@ -393,52 +395,7 @@ def save_aggregate_report_to_elasticsearch( org_name = metadata["org_name"] report_id = metadata["report_id"] domain = aggregate_report["policy_published"]["domain"] - begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True) - end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True) - begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") - end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") - if monthly_indexes: - index_date = begin_date.strftime("%Y-%m") - else: - index_date = begin_date.strftime("%Y-%m-%d") - aggregate_report["begin_date"] = begin_date - aggregate_report["end_date"] = end_date - date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] - - org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) - report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) - domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) - begin_date_query = Q(dict(match=dict(date_begin=begin_date))) - end_date_query = Q(dict(match=dict(date_end=end_date))) - - if index_suffix is not None: - search_index = "dmarc_aggregate_{0}*".format(index_suffix) - else: - search_index = "dmarc_aggregate*" - if index_prefix is not None: - search_index = "{0}{1}".format(index_prefix, search_index) - search = Search(index=search_index) - query = org_name_query & report_id_query & domain_query - query = query & begin_date_query & end_date_query - search.query = query - - try: - existing = search.execute() - except Exception as error_: - raise ElasticsearchError( - "Elasticsearch's search for existing report \ - error: {}".format(error_.__str__()) - ) - - if len(existing) > 0: - raise AlreadySaved( - "An aggregate report ID {0} from {1} about {2} " - "with a date range of {3} UTC to {4} UTC already " - "exists in " - "Elasticsearch".format( - report_id, org_name, domain, begin_date_human, end_date_human - ) - ) + published_policy = _PublishedPolicy( domain=aggregate_report["policy_published"]["domain"], adkim=aggregate_report["policy_published"]["adkim"], @@ -450,6 +407,52 @@ def save_aggregate_report_to_elasticsearch( ) for record in aggregate_report["records"]: + begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True) + end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True) + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + aggregate_report["begin_date"] = begin_date + aggregate_report["end_date"] = end_date + date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search_index = "dmarc_aggregate_{0}*".format(index_suffix) + else: + search_index = "dmarc_aggregate*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) + query = org_name_query & report_id_query & domain_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + raise ElasticsearchError( + "Elasticsearch's search for existing report \ + error: {}".format(error_.__str__()) + ) + + if len(existing) > 0: + raise AlreadySaved( + "An aggregate report ID {0} from {1} about {2} " + "with a date range of {3} UTC to {4} UTC already " + "exists in " + "Elasticsearch".format( + report_id, org_name, domain, begin_date_human, end_date_human + ) + ) agg_doc = _AggregateReportDoc( xml_schema=aggregate_report["xml_schema"], org_name=metadata["org_name"], @@ -459,6 +462,7 @@ def save_aggregate_report_to_elasticsearch( date_range=date_range, date_begin=aggregate_report["begin_date"], date_end=aggregate_report["end_date"], + normalized_timespan=record["normalized_timespan"], errors=metadata["errors"], published_policy=published_policy, source_ip_address=record["source"]["ip_address"], diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 003400c..9d5b228 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -67,6 +67,8 @@ class _AggregateReportDoc(Document): date_range = Date() date_begin = Date() date_end = Date() + normalized_timespan = Boolean() + original_timespan_seconds = Integer errors = Text() published_policy = Object(_PublishedPolicy) source_ip_address = Ip() @@ -393,52 +395,7 @@ def save_aggregate_report_to_opensearch( org_name = metadata["org_name"] report_id = metadata["report_id"] domain = aggregate_report["policy_published"]["domain"] - begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True) - end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True) - begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") - end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") - if monthly_indexes: - index_date = begin_date.strftime("%Y-%m") - else: - index_date = begin_date.strftime("%Y-%m-%d") - aggregate_report["begin_date"] = begin_date - aggregate_report["end_date"] = end_date - date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] - - org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) - report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) - domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) - begin_date_query = Q(dict(match=dict(date_begin=begin_date))) - end_date_query = Q(dict(match=dict(date_end=end_date))) - - if index_suffix is not None: - search_index = "dmarc_aggregate_{0}*".format(index_suffix) - else: - search_index = "dmarc_aggregate*" - if index_prefix is not None: - search_index = "{0}{1}".format(index_prefix, search_index) - search = Search(index=search_index) - query = org_name_query & report_id_query & domain_query - query = query & begin_date_query & end_date_query - search.query = query - - try: - existing = search.execute() - except Exception as error_: - raise OpenSearchError( - "OpenSearch's search for existing report \ - error: {}".format(error_.__str__()) - ) - - if len(existing) > 0: - raise AlreadySaved( - "An aggregate report ID {0} from {1} about {2} " - "with a date range of {3} UTC to {4} UTC already " - "exists in " - "OpenSearch".format( - report_id, org_name, domain, begin_date_human, end_date_human - ) - ) + published_policy = _PublishedPolicy( domain=aggregate_report["policy_published"]["domain"], adkim=aggregate_report["policy_published"]["adkim"], @@ -450,6 +407,52 @@ def save_aggregate_report_to_opensearch( ) for record in aggregate_report["records"]: + begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True) + end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True) + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + aggregate_report["begin_date"] = begin_date + aggregate_report["end_date"] = end_date + date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search_index = "dmarc_aggregate_{0}*".format(index_suffix) + else: + search_index = "dmarc_aggregate*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) + query = org_name_query & report_id_query & domain_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + raise OpenSearchError( + "OpenSearch's search for existing report \ + error: {}".format(error_.__str__()) + ) + + if len(existing) > 0: + raise AlreadySaved( + "An aggregate report ID {0} from {1} about {2} " + "with a date range of {3} UTC to {4} UTC already " + "exists in " + "OpenSearch".format( + report_id, org_name, domain, begin_date_human, end_date_human + ) + ) agg_doc = _AggregateReportDoc( xml_schema=aggregate_report["xml_schema"], org_name=metadata["org_name"], diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index 3e218fb..18307d5 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -78,6 +78,9 @@ class HECClient(object): new_report = dict() for metadata in report["report_metadata"]: new_report[metadata] = report["report_metadata"][metadata] + new_report["interval_begin"] = record["interval_begin"] + new_report["interval_end"] = record["interval_end"] + new_report["normalized_timespan"] = record["normalized_timespan"] new_report["published_policy"] = report["policy_published"] new_report["source_ip_address"] = record["source"]["ip_address"] new_report["source_country"] = record["source"]["country"] @@ -98,7 +101,9 @@ class HECClient(object): new_report["spf_results"] = record["auth_results"]["spf"] data["sourcetype"] = "dmarc:aggregate" - timestamp = human_timestamp_to_unix_timestamp(new_report["begin_date"]) + timestamp = human_timestamp_to_unix_timestamp( + new_report["interval_begin"] + ) data["time"] = timestamp data["event"] = new_report.copy() json_str += "{0}\n".format(json.dumps(data))