Compare commits

...

27 Commits

Author SHA1 Message Date
Sean Whalen
110c6e507d Update docs 2025-12-01 17:04:37 -05:00
Sean Whalen
c8cdd90a1e Normalize timespans for aggregate reports in Elasticsearch and Opensearch 2025-12-01 16:34:40 -05:00
Sean Whalen
46a62cc10a Update launch configuration and metadata key for timespan in aggregate report 2025-12-01 16:10:41 -05:00
Sean Whalen
67fe009145 Add sources my name table to the Kibana DMARC Summary dashboard
This matches the table in the Splunk DMARC  Aggregate reports dashboard
2025-11-30 19:43:14 -05:00
Sean Whalen
e405e8fa53 Update changelog to correct timespan threshold for DMARC report normalization 2025-11-30 16:17:07 -05:00
Sean Whalen
a72d08ceb7 Refactor configuration loading for normalize_timespan_threshold_hours 2025-11-30 16:16:32 -05:00
Sean Whalen
2785e3df34 More fixes for normalize_timespan_threshold_hours: 2025-11-30 13:56:50 -05:00
Sean Whalen
f4470a7dd2 Fix normalize_timespan_threshold_hours 2025-11-30 13:46:21 -05:00
Sean Whalen
18b9894a1f Code formatting 2025-11-30 12:40:09 -05:00
Sean Whalen
d1791a97d3 Make timespan normalization hours configurable, with a 24.0 default 2025-11-30 12:23:38 -05:00
Sean Whalen
47ca6561c1 Fix changelog version 2025-11-30 10:46:48 -05:00
Sean Whalen
a0e18206ce Bump version to 9.0.0 2025-11-29 23:01:04 -05:00
Sean Whalen
9e4ffdd54c Add interval_begin, interval_end, and normalized_timespan to the Splunk report 2025-11-29 21:32:33 -05:00
Sean Whalen
434bd49eb3 Fix normalized_timespan in CSV output for aggregate reports 2025-11-29 21:23:39 -05:00
Sean Whalen
589038d2c9 Add normalized_timespan to CSV output for aggregate reports 2025-11-29 21:17:27 -05:00
Sean Whalen
c558224671 Rename normalized_timespan to timespan_requires_normalization and include interval_begin and interval_end in CSV output 2025-11-29 21:16:30 -05:00
Sean Whalen
044aa9e9a0 Include interval_begin in splunk output for accurate timestamping 2025-11-29 20:50:13 -05:00
Sean Whalen
6270468d30 Remove unneeded fields 2025-11-29 17:13:24 -05:00
Sean Whalen
832be7cfa3 Clean up imports 2025-11-29 16:56:12 -05:00
Sean Whalen
04dd11cf54 Fix formatting 2025-11-29 16:51:57 -05:00
Sean Whalen
0b41942916 Always include interval_begin and interval_end in records 2025-11-29 16:46:03 -05:00
Sean Whalen
f14a34202f Add morse type hints 2025-11-29 16:33:40 -05:00
Sean Whalen
daa6653c29 Bump version to 8.20.0 and update changelog for new report volume normalization 2025-11-29 15:26:25 -05:00
Sean Whalen
45d1093a99 Normalize report volumes when a report timespan exceed 24 hours 2025-11-29 14:52:57 -05:00
Sean Whalen
c1a757ca29 Remove outdated launch config 2025-11-29 14:45:21 -05:00
Sean Whalen
69b9d25a99 Revert code formatting 2025-11-29 14:14:54 -05:00
Sean Whalen
94d65f979d Code formatting 2025-11-29 14:04:20 -05:00
11 changed files with 567 additions and 299 deletions

13
.vscode/launch.json vendored
View File

@@ -19,20 +19,11 @@
"console": "integratedTerminal" "console": "integratedTerminal"
}, },
{ {
"name": "sample.eml", "name": "sample",
"type": "debugpy", "type": "debugpy",
"request": "launch", "request": "launch",
"module": "parsedmarc.cli", "module": "parsedmarc.cli",
"args": ["samples/private/sample.eml"] "args": ["samples/private/sample"]
},
{
"name": "find_sus_domains.py",
"type": "debugpy",
"request": "launch",
"program": "find_sus_domains.py",
"args": ["-i", "unknown_domains.txt", "-o", "sus_domains.csv"],
"cwd": "${workspaceFolder}/parsedmarc/resources/maps",
"console": "integratedTerminal"
}, },
{ {
"name": "sortlists.py", "name": "sortlists.py",

View File

@@ -1,6 +1,11 @@
Changelog Changelog
========= =========
9.0.0
------
- Normalize aggregate DMARC report volumes when a report timespan exceeds 24 hours
8.19.1 8.19.1
------ ------

View File

@@ -23,6 +23,8 @@ of the report schema.
"report_id": "9391651994964116463", "report_id": "9391651994964116463",
"begin_date": "2012-04-27 20:00:00", "begin_date": "2012-04-27 20:00:00",
"end_date": "2012-04-28 19:59:59", "end_date": "2012-04-28 19:59:59",
"timespan_requires_normalization": false,
"original_timespan_seconds": 86399,
"errors": [] "errors": []
}, },
"policy_published": { "policy_published": {
@@ -39,8 +41,10 @@ of the report schema.
"source": { "source": {
"ip_address": "72.150.241.94", "ip_address": "72.150.241.94",
"country": "US", "country": "US",
"reverse_dns": "adsl-72-150-241-94.shv.bellsouth.net", "reverse_dns": null,
"base_domain": "bellsouth.net" "base_domain": null,
"name": null,
"type": null
}, },
"count": 2, "count": 2,
"alignment": { "alignment": {
@@ -74,7 +78,10 @@ of the report schema.
"result": "pass" "result": "pass"
} }
] ]
} },
"normalized_timespan": false,
"interval_begin": "2012-04-28 00:00:00",
"interval_end": "2012-04-28 23:59:59"
} }
] ]
} }
@@ -83,8 +90,10 @@ of the report schema.
### CSV aggregate report ### CSV aggregate report
```text ```text
xml_schema,org_name,org_email,org_extra_contact_info,report_id,begin_date,end_date,errors,domain,adkim,aspf,p,sp,pct,fo,source_ip_address,source_country,source_reverse_dns,source_base_domain,count,spf_aligned,dkim_aligned,dmarc_aligned,disposition,policy_override_reasons,policy_override_comments,envelope_from,header_from,envelope_to,dkim_domains,dkim_selectors,dkim_results,spf_domains,spf_scopes,spf_results xml_schema,org_name,org_email,org_extra_contact_info,report_id,begin_date,end_date,normalized_timespan,errors,domain,adkim,aspf,p,sp,pct,fo,source_ip_address,source_country,source_reverse_dns,source_base_domain,source_name,source_type,count,spf_aligned,dkim_aligned,dmarc_aligned,disposition,policy_override_reasons,policy_override_comments,envelope_from,header_from,envelope_to,dkim_domains,dkim_selectors,dkim_results,spf_domains,spf_scopes,spf_results
draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-27 20:00:00,2012-04-28 19:59:59,,example.com,r,r,none,none,100,0,72.150.241.94,US,adsl-72-150-241-94.shv.bellsouth.net,bellsouth.net,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-28 00:00:00,2012-04-28 23:59:59,False,,example.com,r,r,none,none,100,0,72.150.241.94,US,,,,,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass
draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-28 00:00:00,2012-04-28 23:59:59,False,,example.com,r,r,none,none,100,0,72.150.241.94,US,,,,,2,True,False,True,none,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass
``` ```
## Sample forensic report output ## Sample forensic report output

View File

@@ -4,47 +4,50 @@
```text ```text
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT] usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--forensic-json-filename FORENSIC_JSON_FILENAME]
[--forensic-json-filename FORENSIC_JSON_FILENAME] [--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME] [--aggregate-csv-filename AGGREGATE_CSV_FILENAME]
[--aggregate-csv-filename AGGREGATE_CSV_FILENAME] [--forensic-csv-filename FORENSIC_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME]
[--forensic-csv-filename FORENSIC_CSV_FILENAME] [-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] [-s] [-w] [--verbose] [--debug]
[-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] [--log-file LOG_FILE] [--no-prettify-json] [-v]
[-s] [--verbose] [--debug] [--log-file LOG_FILE] [-v] [file_path ...]
[file_path ...]
Parses DMARC reports Parses DMARC reports
positional arguments: positional arguments:
file_path one or more paths to aggregate or forensic report file_path one or more paths to aggregate or forensic report files, emails, or mbox files'
files, emails, or mbox files'
optional arguments: options:
-h, --help show this help message and exit -h, --help show this help message and exit
-c CONFIG_FILE, --config-file CONFIG_FILE -c CONFIG_FILE, --config-file CONFIG_FILE
a path to a configuration file (--silent implied) a path to a configuration file (--silent implied)
--strip-attachment-payloads --strip-attachment-payloads
remove attachment payloads from forensic report output remove attachment payloads from forensic report output
-o OUTPUT, --output OUTPUT -o OUTPUT, --output OUTPUT
write output files to the given directory write output files to the given directory
--aggregate-json-filename AGGREGATE_JSON_FILENAME --aggregate-json-filename AGGREGATE_JSON_FILENAME
filename for the aggregate JSON output file filename for the aggregate JSON output file
--forensic-json-filename FORENSIC_JSON_FILENAME --forensic-json-filename FORENSIC_JSON_FILENAME
filename for the forensic JSON output file filename for the forensic JSON output file
--aggregate-csv-filename AGGREGATE_CSV_FILENAME --smtp-tls-json-filename SMTP_TLS_JSON_FILENAME
filename for the aggregate CSV output file filename for the SMTP TLS JSON output file
--forensic-csv-filename FORENSIC_CSV_FILENAME --aggregate-csv-filename AGGREGATE_CSV_FILENAME
filename for the forensic CSV output file filename for the aggregate CSV output file
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...] --forensic-csv-filename FORENSIC_CSV_FILENAME
nameservers to query filename for the forensic CSV output file
-t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT --smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME
number of seconds to wait for an answer from DNS filename for the SMTP TLS CSV output file
(default: 2.0) -n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
--offline do not make online queries for geolocation or DNS nameservers to query
-s, --silent only print errors and warnings -t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT
--verbose more verbose output number of seconds to wait for an answer from DNS (default: 2.0)
--debug print debugging information --offline do not make online queries for geolocation or DNS
--log-file LOG_FILE output logging to a file -s, --silent only print errors
-v, --version show program's version number and exit -w, --warnings print warnings in addition to errors
--verbose more verbose output
--debug print debugging information
--log-file LOG_FILE output logging to a file
--no-prettify-json output JSON in a single line without indentation
-v, --version show program's version number and exit
``` ```
:::{note} :::{note}

File diff suppressed because one or more lines are too long

View File

@@ -2,6 +2,10 @@
"""A Python package for parsing DMARC reports""" """A Python package for parsing DMARC reports"""
from __future__ import annotations
from typing import Dict, List, Any, Union, IO, Callable
import binascii import binascii
import email import email
import email.utils import email.utils
@@ -17,9 +21,8 @@ import zlib
from base64 import b64decode from base64 import b64decode
from collections import OrderedDict from collections import OrderedDict
from csv import DictWriter from csv import DictWriter
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone, tzinfo
from io import BytesIO, StringIO from io import BytesIO, StringIO
from typing import Callable
import mailparser import mailparser
import xmltodict import xmltodict
@@ -79,15 +82,196 @@ class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered""" """Raised when an invalid DMARC forensic report is encountered"""
def _bucket_interval_by_day(
begin: datetime,
end: datetime,
total_count: int,
) -> List[Dict[Any]]:
"""
Split the interval [begin, end) into daily buckets and distribute
`total_count` proportionally across those buckets.
The function:
1. Identifies each calendar day touched by [begin, end)
2. Computes how many seconds of the interval fall into each day
3. Assigns counts in proportion to those overlaps
4. Ensures the final counts sum exactly to total_count
Args:
begin: timezone-aware datetime, inclusive start of interval
end: timezone-aware datetime, exclusive end of interval
total_count: number of messages to distribute
Returns:
A list of dicts like:
{
"begin": datetime,
"end": datetime,
"count": int
}
"""
# --- Input validation ----------------------------------------------------
if begin > end:
raise ValueError("begin must be earlier than end")
if begin.tzinfo is None or end.tzinfo is None:
raise ValueError("begin and end must be timezone-aware")
if begin.tzinfo is not end.tzinfo:
raise ValueError("begin and end must have the same tzinfo")
if total_count < 0:
raise ValueError("total_count must be non-negative")
# --- Short-circuit trivial cases -----------------------------------------
interval_seconds = (end - begin).total_seconds()
if interval_seconds <= 0 or total_count == 0:
return []
tz: tzinfo = begin.tzinfo
# --- Step 1: Determine all calendar days touched by [begin, end) ----------
#
# For example:
# begin = Jan 1 12:00
# end = Jan 3 06:00
#
# We need buckets for:
# Jan 1 12:00 → Jan 2 00:00
# Jan 2 00:00 → Jan 3 00:00
# Jan 3 00:00 → Jan 3 06:00
#
# Start at midnight on the day of `begin`.
day_cursor = datetime(begin.year, begin.month, begin.day, tzinfo=tz)
# If `begin` is earlier on that day (e.g. 10:00), we want that midnight.
# If `begin` is past that midnight (e.g. 00:30), this is correct.
# If `begin` is BEFORE that midnight (rare unless tz shifts), adjust:
if day_cursor > begin:
day_cursor -= timedelta(days=1)
day_buckets: List[Dict[str, Any]] = []
while day_cursor < end:
day_start = day_cursor
day_end = day_cursor + timedelta(days=1)
# Overlap between [begin, end) and this day
overlap_start = max(begin, day_start)
overlap_end = min(end, day_end)
overlap_seconds = (overlap_end - overlap_start).total_seconds()
if overlap_seconds > 0:
day_buckets.append(
{
"begin": overlap_start,
"end": overlap_end,
"seconds": overlap_seconds,
}
)
day_cursor = day_end
# --- Step 2: Pro-rate counts across buckets -------------------------------
#
# Compute the exact fractional count for each bucket:
# bucket_fraction = bucket_seconds / interval_seconds
# bucket_exact = total_count * bucket_fraction
#
# Then apply a "largest remainder" rounding strategy to ensure the sum
# equals exactly total_count.
exact_values: List[float] = [
(b["seconds"] / interval_seconds) * total_count for b in day_buckets
]
floor_values: List[int] = [int(x) for x in exact_values]
fractional_parts: List[float] = [x - int(x) for x in exact_values]
# How many counts do we still need to distribute after flooring?
remainder = total_count - sum(floor_values)
# Sort buckets by descending fractional remainder
indices_by_fraction = sorted(
range(len(day_buckets)),
key=lambda i: fractional_parts[i],
reverse=True,
)
# Start with floor values
final_counts = floor_values[:]
# Add +1 to the buckets with the largest fractional parts
for idx in indices_by_fraction[:remainder]:
final_counts[idx] += 1
# --- Step 3: Build the final per-day result list -------------------------
results: List[Dict[str, Any]] = []
for bucket, count in zip(day_buckets, final_counts):
if count > 0:
results.append(
{
"begin": bucket["begin"],
"end": bucket["end"],
"count": count,
}
)
return results
def _append_parsed_record(
parsed_record: Dict[str, Any],
records: List[Dict[str, Any]],
begin_dt: datetime,
end_dt: datetime,
normalize: bool,
) -> None:
"""
Append a parsed DMARC record either unchanged or normalized.
Args:
parsed_record: The record returned by _parse_report_record().
records: Accumulating list of output records.
begin_dt: Report-level begin datetime (UTC).
end_dt: Report-level end datetime (UTC).
normalize: Whether this report exceeded the allowed timespan
and should be normalized per-day.
"""
if not normalize:
parsed_record["normalized_timespan"] = False
parsed_record["interval_begin"] = begin_dt.strftime("%Y-%m-%d %H:%M:%S")
parsed_record["interval_end"] = end_dt.strftime("%Y-%m-%d %H:%M:%S")
records.append(parsed_record)
return
# Normalization path: break record into daily buckets
total_count = int(parsed_record.get("count", 0))
buckets = _bucket_interval_by_day(begin_dt, end_dt, total_count)
if not buckets:
return
for part_index, bucket in enumerate(buckets):
new_rec = parsed_record.copy()
new_rec["count"] = bucket["count"]
new_rec["normalized_timespan"] = True
new_rec["interval_begin"] = bucket["begin"].strftime("%Y-%m-%d %H:%M:%S")
new_rec["interval_end"] = bucket["end"].strftime("%Y-%m-%d %H:%M:%S")
records.append(new_rec)
def _parse_report_record( def _parse_report_record(
record, record: dict,
ip_db_path=None, ip_db_path: str = None,
always_use_local_files=False, always_use_local_files: bool = False,
reverse_dns_map_path=None, reverse_dns_map_path: str = None,
reverse_dns_map_url=None, reverse_dns_map_url: str = None,
offline=False, offline: bool = False,
nameservers=None, nameservers: list[str] = None,
dns_timeout=2.0, dns_timeout: float = 2.0,
): ):
""" """
Converts a record from a DMARC aggregate report into a more consistent Converts a record from a DMARC aggregate report into a more consistent
@@ -242,7 +426,7 @@ def _parse_report_record(
return new_record return new_record
def _parse_smtp_tls_failure_details(failure_details): def _parse_smtp_tls_failure_details(failure_details: dict):
try: try:
new_failure_details = OrderedDict( new_failure_details = OrderedDict(
result_type=failure_details["result-type"], result_type=failure_details["result-type"],
@@ -278,7 +462,7 @@ def _parse_smtp_tls_failure_details(failure_details):
raise InvalidSMTPTLSReport(str(e)) raise InvalidSMTPTLSReport(str(e))
def _parse_smtp_tls_report_policy(policy): def _parse_smtp_tls_report_policy(policy: dict):
policy_types = ["tlsa", "sts", "no-policy-found"] policy_types = ["tlsa", "sts", "no-policy-found"]
try: try:
policy_domain = policy["policy"]["policy-domain"] policy_domain = policy["policy"]["policy-domain"]
@@ -315,7 +499,7 @@ def _parse_smtp_tls_report_policy(policy):
raise InvalidSMTPTLSReport(str(e)) raise InvalidSMTPTLSReport(str(e))
def parse_smtp_tls_report_json(report): def parse_smtp_tls_report_json(report: dict):
"""Parses and validates an SMTP TLS report""" """Parses and validates an SMTP TLS report"""
required_fields = [ required_fields = [
"organization-name", "organization-name",
@@ -354,7 +538,7 @@ def parse_smtp_tls_report_json(report):
raise InvalidSMTPTLSReport(str(e)) raise InvalidSMTPTLSReport(str(e))
def parsed_smtp_tls_reports_to_csv_rows(reports): def parsed_smtp_tls_reports_to_csv_rows(reports: dict):
"""Converts one oor more parsed SMTP TLS reports into a list of single """Converts one oor more parsed SMTP TLS reports into a list of single
layer OrderedDict objects suitable for use in a CSV""" layer OrderedDict objects suitable for use in a CSV"""
if type(reports) is OrderedDict: if type(reports) is OrderedDict:
@@ -389,7 +573,7 @@ def parsed_smtp_tls_reports_to_csv_rows(reports):
return rows return rows
def parsed_smtp_tls_reports_to_csv(reports): def parsed_smtp_tls_reports_to_csv(reports: dict):
""" """
Converts one or more parsed SMTP TLS reports to flat CSV format, including Converts one or more parsed SMTP TLS reports to flat CSV format, including
headers headers
@@ -435,15 +619,16 @@ def parsed_smtp_tls_reports_to_csv(reports):
def parse_aggregate_report_xml( def parse_aggregate_report_xml(
xml, xml: str,
ip_db_path=None, ip_db_path: bool = None,
always_use_local_files=False, always_use_local_files: bool = False,
reverse_dns_map_path=None, reverse_dns_map_path: bool = None,
reverse_dns_map_url=None, reverse_dns_map_url: bool = None,
offline=False, offline: bool = False,
nameservers=None, nameservers: bool = None,
timeout=2.0, timeout: float = 2.0,
keep_alive=None, keep_alive: callable = None,
normalize_timespan_threshold_hours: float = 24.0,
): ):
"""Parses a DMARC XML report string and returns a consistent OrderedDict """Parses a DMARC XML report string and returns a consistent OrderedDict
@@ -458,6 +643,7 @@ def parse_aggregate_report_xml(
(Cloudflare's public DNS resolvers by default) (Cloudflare's public DNS resolvers by default)
timeout (float): Sets the DNS timeout in seconds timeout (float): Sets the DNS timeout in seconds
keep_alive (callable): Keep alive function keep_alive (callable): Keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns: Returns:
OrderedDict: The parsed aggregate DMARC report OrderedDict: The parsed aggregate DMARC report
@@ -522,13 +708,27 @@ def parse_aggregate_report_xml(
report_id = report_id.replace("<", "").replace(">", "").split("@")[0] report_id = report_id.replace("<", "").replace(">", "").split("@")[0]
new_report_metadata["report_id"] = report_id new_report_metadata["report_id"] = report_id
date_range = report["report_metadata"]["date_range"] date_range = report["report_metadata"]["date_range"]
if int(date_range["end"]) - int(date_range["begin"]) > 2 * 86400:
_error = "Time span > 24 hours - RFC 7489 section 7.2" begin_ts = int(date_range["begin"])
raise InvalidAggregateReport(_error) end_ts = int(date_range["end"])
date_range["begin"] = timestamp_to_human(date_range["begin"]) span_seconds = end_ts - begin_ts
date_range["end"] = timestamp_to_human(date_range["end"])
normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600
date_range["begin"] = timestamp_to_human(begin_ts)
date_range["end"] = timestamp_to_human(end_ts)
new_report_metadata["begin_date"] = date_range["begin"] new_report_metadata["begin_date"] = date_range["begin"]
new_report_metadata["end_date"] = date_range["end"] new_report_metadata["end_date"] = date_range["end"]
new_report_metadata["timespan_requires_normalization"] = normalize_timespan
new_report_metadata["original_timespan_seconds"] = span_seconds
begin_dt = human_timestamp_to_datetime(
new_report_metadata["begin_date"], to_utc=True
)
end_dt = human_timestamp_to_datetime(
new_report_metadata["end_date"], to_utc=True
)
if "error" in report["report_metadata"]: if "error" in report["report_metadata"]:
if not isinstance(report["report_metadata"]["error"], list): if not isinstance(report["report_metadata"]["error"], list):
errors = [report["report_metadata"]["error"]] errors = [report["report_metadata"]["error"]]
@@ -587,7 +787,13 @@ def parse_aggregate_report_xml(
nameservers=nameservers, nameservers=nameservers,
dns_timeout=timeout, dns_timeout=timeout,
) )
records.append(report_record) _append_parsed_record(
parsed_record=report_record,
records=records,
begin_dt=begin_dt,
end_dt=end_dt,
normalize=normalize_timespan,
)
except Exception as e: except Exception as e:
logger.warning("Could not parse record: {0}".format(e)) logger.warning("Could not parse record: {0}".format(e))
@@ -602,7 +808,13 @@ def parse_aggregate_report_xml(
nameservers=nameservers, nameservers=nameservers,
dns_timeout=timeout, dns_timeout=timeout,
) )
records.append(report_record) _append_parsed_record(
parsed_record=report_record,
records=records,
begin_dt=begin_dt,
end_dt=end_dt,
normalize=normalize_timespan,
)
new_report["records"] = records new_report["records"] = records
@@ -620,7 +832,7 @@ def parse_aggregate_report_xml(
raise InvalidAggregateReport("Unexpected error: {0}".format(error.__str__())) raise InvalidAggregateReport("Unexpected error: {0}".format(error.__str__()))
def extract_report(content): def extract_report(content: Union[bytes, str, IO[Any]]):
""" """
Extracts text from a zip or gzip file, as a base64-encoded string, Extracts text from a zip or gzip file, as a base64-encoded string,
file-like object, or bytes. file-like object, or bytes.
@@ -684,15 +896,16 @@ def extract_report_from_file_path(file_path):
def parse_aggregate_report_file( def parse_aggregate_report_file(
_input, _input: Union[str, bytes, IO[Any]],
offline=False, offline: bool = False,
always_use_local_files=None, always_use_local_files: bool = None,
reverse_dns_map_path=None, reverse_dns_map_path: str = None,
reverse_dns_map_url=None, reverse_dns_map_url: str = None,
ip_db_path=None, ip_db_path: str = None,
nameservers=None, nameservers: list[str] = None,
dns_timeout=2.0, dns_timeout: float = 2.0,
keep_alive=None, keep_alive: Callable = None,
normalize_timespan_threshold_hours: float = 24.0,
): ):
"""Parses a file at the given path, a file-like object. or bytes as an """Parses a file at the given path, a file-like object. or bytes as an
aggregate DMARC report aggregate DMARC report
@@ -708,6 +921,7 @@ def parse_aggregate_report_file(
(Cloudflare's public DNS resolvers by default) (Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds dns_timeout (float): Sets the DNS timeout in seconds
keep_alive (callable): Keep alive function keep_alive (callable): Keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns: Returns:
OrderedDict: The parsed DMARC aggregate report OrderedDict: The parsed DMARC aggregate report
@@ -728,10 +942,11 @@ def parse_aggregate_report_file(
nameservers=nameservers, nameservers=nameservers,
timeout=dns_timeout, timeout=dns_timeout,
keep_alive=keep_alive, keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
def parsed_aggregate_reports_to_csv_rows(reports): def parsed_aggregate_reports_to_csv_rows(reports: list[dict]):
""" """
Converts one or more parsed aggregate reports to list of dicts in flat CSV Converts one or more parsed aggregate reports to list of dicts in flat CSV
format format
@@ -760,6 +975,9 @@ def parsed_aggregate_reports_to_csv_rows(reports):
report_id = report["report_metadata"]["report_id"] report_id = report["report_metadata"]["report_id"]
begin_date = report["report_metadata"]["begin_date"] begin_date = report["report_metadata"]["begin_date"]
end_date = report["report_metadata"]["end_date"] end_date = report["report_metadata"]["end_date"]
normalized_timespan = report["report_metadata"][
"timespan_requires_normalization"
]
errors = "|".join(report["report_metadata"]["errors"]) errors = "|".join(report["report_metadata"]["errors"])
domain = report["policy_published"]["domain"] domain = report["policy_published"]["domain"]
adkim = report["policy_published"]["adkim"] adkim = report["policy_published"]["adkim"]
@@ -777,6 +995,7 @@ def parsed_aggregate_reports_to_csv_rows(reports):
report_id=report_id, report_id=report_id,
begin_date=begin_date, begin_date=begin_date,
end_date=end_date, end_date=end_date,
normalized_timespan=normalized_timespan,
errors=errors, errors=errors,
domain=domain, domain=domain,
adkim=adkim, adkim=adkim,
@@ -789,6 +1008,8 @@ def parsed_aggregate_reports_to_csv_rows(reports):
for record in report["records"]: for record in report["records"]:
row = report_dict.copy() row = report_dict.copy()
row["begin_date"] = record["interval_begin"]
row["end_date"] = record["interval_end"]
row["source_ip_address"] = record["source"]["ip_address"] row["source_ip_address"] = record["source"]["ip_address"]
row["source_country"] = record["source"]["country"] row["source_country"] = record["source"]["country"]
row["source_reverse_dns"] = record["source"]["reverse_dns"] row["source_reverse_dns"] = record["source"]["reverse_dns"]
@@ -849,7 +1070,7 @@ def parsed_aggregate_reports_to_csv_rows(reports):
return rows return rows
def parsed_aggregate_reports_to_csv(reports): def parsed_aggregate_reports_to_csv(reports: list[OrderedDict]):
""" """
Converts one or more parsed aggregate reports to flat CSV format, including Converts one or more parsed aggregate reports to flat CSV format, including
headers headers
@@ -869,6 +1090,7 @@ def parsed_aggregate_reports_to_csv(reports):
"report_id", "report_id",
"begin_date", "begin_date",
"end_date", "end_date",
"normalized_timespan",
"errors", "errors",
"domain", "domain",
"adkim", "adkim",
@@ -915,17 +1137,17 @@ def parsed_aggregate_reports_to_csv(reports):
def parse_forensic_report( def parse_forensic_report(
feedback_report, feedback_report: str,
sample, sample: str,
msg_date, msg_date: datetime,
always_use_local_files=False, always_use_local_files: bool = False,
reverse_dns_map_path=None, reverse_dns_map_path: str = None,
reverse_dns_map_url=None, reverse_dns_map_url: str = None,
offline=False, offline: bool = False,
ip_db_path=None, ip_db_path: str = None,
nameservers=None, nameservers: list[str] = None,
dns_timeout=2.0, dns_timeout: float = 2.0,
strip_attachment_payloads=False, strip_attachment_payloads: bool = False,
): ):
""" """
Converts a DMARC forensic report and sample to a ``OrderedDict`` Converts a DMARC forensic report and sample to a ``OrderedDict``
@@ -1054,7 +1276,7 @@ def parse_forensic_report(
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__())) raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(reports): def parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict]):
""" """
Converts one or more parsed forensic reports to a list of dicts in flat CSV Converts one or more parsed forensic reports to a list of dicts in flat CSV
format format
@@ -1090,7 +1312,7 @@ def parsed_forensic_reports_to_csv_rows(reports):
return rows return rows
def parsed_forensic_reports_to_csv(reports): def parsed_forensic_reports_to_csv(reports: list[dict]):
""" """
Converts one or more parsed forensic reports to flat CSV format, including Converts one or more parsed forensic reports to flat CSV format, including
headers headers
@@ -1143,16 +1365,17 @@ def parsed_forensic_reports_to_csv(reports):
def parse_report_email( def parse_report_email(
input_, input_: Union[bytes, str],
offline=False, offline: bool = False,
ip_db_path=None, ip_db_path: str = None,
always_use_local_files=False, always_use_local_files: bool = False,
reverse_dns_map_path=None, reverse_dns_map_path: str = None,
reverse_dns_map_url=None, reverse_dns_map_url: str = None,
nameservers=None, nameservers: list[str] = None,
dns_timeout=2.0, dns_timeout: float = 2.0,
strip_attachment_payloads=False, strip_attachment_payloads: bool = False,
keep_alive=None, keep_alive: callable = None,
normalize_timespan_threshold_hours: float = 24.0,
): ):
""" """
Parses a DMARC report from an email Parses a DMARC report from an email
@@ -1169,6 +1392,7 @@ def parse_report_email(
strip_attachment_payloads (bool): Remove attachment payloads from strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results forensic report results
keep_alive (callable): keep alive function keep_alive (callable): keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns: Returns:
OrderedDict: OrderedDict:
@@ -1281,6 +1505,7 @@ def parse_report_email(
nameservers=nameservers, nameservers=nameservers,
timeout=dns_timeout, timeout=dns_timeout,
keep_alive=keep_alive, keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
result = OrderedDict( result = OrderedDict(
[("report_type", "aggregate"), ("report", aggregate_report)] [("report_type", "aggregate"), ("report", aggregate_report)]
@@ -1337,16 +1562,17 @@ def parse_report_email(
def parse_report_file( def parse_report_file(
input_, input_: Union[bytes, str, IO[Any]],
nameservers=None, nameservers: list[str] = None,
dns_timeout=2.0, dns_timeout: float = 2.0,
strip_attachment_payloads=False, strip_attachment_payloads: bool = False,
ip_db_path=None, ip_db_path: str = None,
always_use_local_files=False, always_use_local_files: bool = False,
reverse_dns_map_path=None, reverse_dns_map_path: str = None,
reverse_dns_map_url=None, reverse_dns_map_url: str = None,
offline=False, offline: bool = False,
keep_alive=None, keep_alive: Callable = None,
normalize_timespan_threshold_hours: float = 24,
): ):
"""Parses a DMARC aggregate or forensic file at the given path, a """Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes file-like object. or bytes
@@ -1389,6 +1615,7 @@ def parse_report_file(
nameservers=nameservers, nameservers=nameservers,
dns_timeout=dns_timeout, dns_timeout=dns_timeout,
keep_alive=keep_alive, keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
results = OrderedDict([("report_type", "aggregate"), ("report", report)]) results = OrderedDict([("report_type", "aggregate"), ("report", report)])
except InvalidAggregateReport: except InvalidAggregateReport:
@@ -1409,6 +1636,7 @@ def parse_report_file(
dns_timeout=dns_timeout, dns_timeout=dns_timeout,
strip_attachment_payloads=sa, strip_attachment_payloads=sa,
keep_alive=keep_alive, keep_alive=keep_alive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
except InvalidDMARCReport: except InvalidDMARCReport:
raise ParserError("Not a valid report") raise ParserError("Not a valid report")
@@ -1416,15 +1644,16 @@ def parse_report_file(
def get_dmarc_reports_from_mbox( def get_dmarc_reports_from_mbox(
input_, input_: str,
nameservers=None, nameservers: list[str] = None,
dns_timeout=2.0, dns_timeout: float = 2.0,
strip_attachment_payloads=False, strip_attachment_payloads: bool = False,
ip_db_path=None, ip_db_path: str = None,
always_use_local_files=False, always_use_local_files: bool = False,
reverse_dns_map_path=None, reverse_dns_map_path: str = None,
reverse_dns_map_url=None, reverse_dns_map_url: str = None,
offline=False, offline: bool = False,
normalize_timespan_threshold_hours: float = 24.0,
): ):
"""Parses a mailbox in mbox format containing e-mails with attached """Parses a mailbox in mbox format containing e-mails with attached
DMARC reports DMARC reports
@@ -1441,6 +1670,7 @@ def get_dmarc_reports_from_mbox(
reverse_dns_map_url (str): URL to a reverse DNS map file reverse_dns_map_url (str): URL to a reverse DNS map file
ip_db_path (str): Path to a MMDB file from MaxMind or DBIP ip_db_path (str): Path to a MMDB file from MaxMind or DBIP
offline (bool): Do not make online queries for geolocation or DNS offline (bool): Do not make online queries for geolocation or DNS
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns: Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports`` OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
@@ -1470,6 +1700,7 @@ def get_dmarc_reports_from_mbox(
nameservers=nameservers, nameservers=nameservers,
dns_timeout=dns_timeout, dns_timeout=dns_timeout,
strip_attachment_payloads=sa, strip_attachment_payloads=sa,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
if parsed_email["report_type"] == "aggregate": if parsed_email["report_type"] == "aggregate":
report_org = parsed_email["report"]["report_metadata"]["org_name"] report_org = parsed_email["report"]["report_metadata"]["org_name"]
@@ -1502,22 +1733,23 @@ def get_dmarc_reports_from_mbox(
def get_dmarc_reports_from_mailbox( def get_dmarc_reports_from_mailbox(
connection: MailboxConnection, connection: MailboxConnection,
reports_folder="INBOX", reports_folder: str = "INBOX",
archive_folder="Archive", archive_folder: str = "Archive",
delete=False, delete: bool = False,
test=False, test: bool = False,
ip_db_path=None, ip_db_path: str = None,
always_use_local_files=False, always_use_local_files: str = False,
reverse_dns_map_path=None, reverse_dns_map_path: str = None,
reverse_dns_map_url=None, reverse_dns_map_url: str = None,
offline=False, offline: bool = False,
nameservers=None, nameservers: list[str] = None,
dns_timeout=6.0, dns_timeout: float = 6.0,
strip_attachment_payloads=False, strip_attachment_payloads: bool = False,
results=None, results: dict = None,
batch_size=10, batch_size: int = 10,
since=None, since: datetime = None,
create_folders=True, create_folders: bool = True,
normalize_timespan_threshold_hours: float = 24,
): ):
""" """
Fetches and parses DMARC reports from a mailbox Fetches and parses DMARC reports from a mailbox
@@ -1544,6 +1776,7 @@ def get_dmarc_reports_from_mailbox(
(units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}) (units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"})
create_folders (bool): Whether to create the destination folders create_folders (bool): Whether to create the destination folders
(not used in watch) (not used in watch)
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns: Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports`` OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
@@ -1661,6 +1894,7 @@ def get_dmarc_reports_from_mailbox(
offline=offline, offline=offline,
strip_attachment_payloads=sa, strip_attachment_payloads=sa,
keep_alive=connection.keepalive, keep_alive=connection.keepalive,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
if parsed_email["report_type"] == "aggregate": if parsed_email["report_type"] == "aggregate":
report_org = parsed_email["report"]["report_metadata"]["org_name"] report_org = parsed_email["report"]["report_metadata"]["org_name"]
@@ -1812,6 +2046,7 @@ def get_dmarc_reports_from_mailbox(
reverse_dns_map_url=reverse_dns_map_url, reverse_dns_map_url=reverse_dns_map_url,
offline=offline, offline=offline,
since=current_time, since=current_time,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
return results return results
@@ -1820,20 +2055,21 @@ def get_dmarc_reports_from_mailbox(
def watch_inbox( def watch_inbox(
mailbox_connection: MailboxConnection, mailbox_connection: MailboxConnection,
callback: Callable, callback: Callable,
reports_folder="INBOX", reports_folder: str = "INBOX",
archive_folder="Archive", archive_folder: str = "Archive",
delete=False, delete: bool = False,
test=False, test: bool = False,
check_timeout=30, check_timeout: int = 30,
ip_db_path=None, ip_db_path: str = None,
always_use_local_files=False, always_use_local_files: bool = False,
reverse_dns_map_path=None, reverse_dns_map_path: str = None,
reverse_dns_map_url=None, reverse_dns_map_url: str = None,
offline=False, offline: bool = False,
nameservers=None, nameservers: list[str] = None,
dns_timeout=6.0, dns_timeout: float = 6.0,
strip_attachment_payloads=False, strip_attachment_payloads: bool = False,
batch_size=None, batch_size: int = None,
normalize_timespan_threshold_hours: float = 24,
): ):
""" """
Watches the mailbox for new messages and Watches the mailbox for new messages and
@@ -1859,6 +2095,7 @@ def watch_inbox(
strip_attachment_payloads (bool): Replace attachment payloads in strip_attachment_payloads (bool): Replace attachment payloads in
forensic report samples with None forensic report samples with None
batch_size (int): Number of messages to read and process before saving batch_size (int): Number of messages to read and process before saving
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
""" """
def check_callback(connection): def check_callback(connection):
@@ -1879,6 +2116,7 @@ def watch_inbox(
strip_attachment_payloads=sa, strip_attachment_payloads=sa,
batch_size=batch_size, batch_size=batch_size,
create_folders=False, create_folders=False,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
callback(res) callback(res)
@@ -1921,14 +2159,14 @@ def append_csv(filename, csv):
def save_output( def save_output(
results, results: OrderedDict,
output_directory="output", output_directory: str = "output",
aggregate_json_filename="aggregate.json", aggregate_json_filename: str = "aggregate.json",
forensic_json_filename="forensic.json", forensic_json_filename: str = "forensic.json",
smtp_tls_json_filename="smtp_tls.json", smtp_tls_json_filename: str = "smtp_tls.json",
aggregate_csv_filename="aggregate.csv", aggregate_csv_filename: str = "aggregate.csv",
forensic_csv_filename="forensic.csv", forensic_csv_filename: str = "forensic.csv",
smtp_tls_csv_filename="smtp_tls.csv", smtp_tls_csv_filename: str = "smtp_tls.csv",
): ):
""" """
Save report data in the given directory Save report data in the given directory
@@ -2006,7 +2244,7 @@ def save_output(
sample_file.write(sample) sample_file.write(sample)
def get_report_zip(results): def get_report_zip(results: OrderedDict):
""" """
Creates a zip file of parsed report output Creates a zip file of parsed report output

View File

@@ -77,6 +77,7 @@ def cli_parse(
always_use_local_files, always_use_local_files,
reverse_dns_map_path, reverse_dns_map_path,
reverse_dns_map_url, reverse_dns_map_url,
normalize_timespan_threshold_hours,
conn, conn,
): ):
"""Separated this function for multiprocessing""" """Separated this function for multiprocessing"""
@@ -91,6 +92,7 @@ def cli_parse(
nameservers=nameservers, nameservers=nameservers,
dns_timeout=dns_timeout, dns_timeout=dns_timeout,
strip_attachment_payloads=sa, strip_attachment_payloads=sa,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )
conn.send([file_results, file_path]) conn.send([file_results, file_path])
except ParserError as error: except ParserError as error:
@@ -659,6 +661,7 @@ def _main():
webhook_forensic_url=None, webhook_forensic_url=None,
webhook_smtp_tls_url=None, webhook_smtp_tls_url=None,
webhook_timeout=60, webhook_timeout=60,
normalize_timespan_threshold_hours=24.0,
) )
args = arg_parser.parse_args() args = arg_parser.parse_args()
@@ -674,8 +677,11 @@ def _main():
if "general" in config.sections(): if "general" in config.sections():
general_config = config["general"] general_config = config["general"]
if "silent" in general_config: if "silent" in general_config:
if general_config["silent"].lower() == "false": opts.silent = general_config.getboolean("silent")
opts.silent = False if "normalize_timespan_threshold_hours" in general_config:
opts.normalize_timespan_threshold_hours = general_config.getfloat(
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config: if "index_prefix_domain_map" in general_config:
with open(general_config["index_prefix_domain_map"]) as f: with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f) index_prefix_domain_map = yaml.safe_load(f)
@@ -1445,6 +1451,7 @@ def _main():
opts.always_use_local_files, opts.always_use_local_files,
opts.reverse_dns_map_path, opts.reverse_dns_map_path,
opts.reverse_dns_map_url, opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours,
child_conn, child_conn,
), ),
) )
@@ -1495,6 +1502,7 @@ def _main():
reverse_dns_map_path=opts.reverse_dns_map_path, reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url, reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline, offline=opts.offline,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
) )
aggregate_reports += reports["aggregate_reports"] aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"] forensic_reports += reports["forensic_reports"]
@@ -1604,6 +1612,7 @@ def _main():
test=opts.mailbox_test, test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads, strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since, since=opts.mailbox_since,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
) )
aggregate_reports += reports["aggregate_reports"] aggregate_reports += reports["aggregate_reports"]
@@ -1666,6 +1675,7 @@ def _main():
reverse_dns_map_path=opts.reverse_dns_map_path, reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url, reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline, offline=opts.offline,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
) )
except FileExistsError as error: except FileExistsError as error:
logger.error("{0}".format(error.__str__())) logger.error("{0}".format(error.__str__()))

View File

@@ -1,2 +1,2 @@
__version__ = "8.19.1" __version__ = "9.0.0"
USER_AGENT = f"parsedmarc/{__version__}" USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -67,6 +67,8 @@ class _AggregateReportDoc(Document):
date_range = Date() date_range = Date()
date_begin = Date() date_begin = Date()
date_end = Date() date_end = Date()
normalized_timespan = Boolean()
original_timespan_seconds = Integer
errors = Text() errors = Text()
published_policy = Object(_PublishedPolicy) published_policy = Object(_PublishedPolicy)
source_ip_address = Ip() source_ip_address = Ip()
@@ -393,52 +395,7 @@ def save_aggregate_report_to_elasticsearch(
org_name = metadata["org_name"] org_name = metadata["org_name"]
report_id = metadata["report_id"] report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"] domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
aggregate_report["begin_date"] = begin_date
aggregate_report["end_date"] = end_date
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
raise ElasticsearchError(
"Elasticsearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"Elasticsearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
published_policy = _PublishedPolicy( published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"], domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"], adkim=aggregate_report["policy_published"]["adkim"],
@@ -450,6 +407,52 @@ def save_aggregate_report_to_elasticsearch(
) )
for record in aggregate_report["records"]: for record in aggregate_report["records"]:
begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True)
end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
aggregate_report["begin_date"] = begin_date
aggregate_report["end_date"] = end_date
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
raise ElasticsearchError(
"Elasticsearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"Elasticsearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
agg_doc = _AggregateReportDoc( agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"], xml_schema=aggregate_report["xml_schema"],
org_name=metadata["org_name"], org_name=metadata["org_name"],
@@ -459,6 +462,7 @@ def save_aggregate_report_to_elasticsearch(
date_range=date_range, date_range=date_range,
date_begin=aggregate_report["begin_date"], date_begin=aggregate_report["begin_date"],
date_end=aggregate_report["end_date"], date_end=aggregate_report["end_date"],
normalized_timespan=record["normalized_timespan"],
errors=metadata["errors"], errors=metadata["errors"],
published_policy=published_policy, published_policy=published_policy,
source_ip_address=record["source"]["ip_address"], source_ip_address=record["source"]["ip_address"],

View File

@@ -67,6 +67,8 @@ class _AggregateReportDoc(Document):
date_range = Date() date_range = Date()
date_begin = Date() date_begin = Date()
date_end = Date() date_end = Date()
normalized_timespan = Boolean()
original_timespan_seconds = Integer
errors = Text() errors = Text()
published_policy = Object(_PublishedPolicy) published_policy = Object(_PublishedPolicy)
source_ip_address = Ip() source_ip_address = Ip()
@@ -393,52 +395,7 @@ def save_aggregate_report_to_opensearch(
org_name = metadata["org_name"] org_name = metadata["org_name"]
report_id = metadata["report_id"] report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"] domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
aggregate_report["begin_date"] = begin_date
aggregate_report["end_date"] = end_date
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
raise OpenSearchError(
"OpenSearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"OpenSearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
published_policy = _PublishedPolicy( published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"], domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"], adkim=aggregate_report["policy_published"]["adkim"],
@@ -450,6 +407,52 @@ def save_aggregate_report_to_opensearch(
) )
for record in aggregate_report["records"]: for record in aggregate_report["records"]:
begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True)
end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
aggregate_report["begin_date"] = begin_date
aggregate_report["end_date"] = end_date
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
raise OpenSearchError(
"OpenSearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"OpenSearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
agg_doc = _AggregateReportDoc( agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"], xml_schema=aggregate_report["xml_schema"],
org_name=metadata["org_name"], org_name=metadata["org_name"],

View File

@@ -78,6 +78,9 @@ class HECClient(object):
new_report = dict() new_report = dict()
for metadata in report["report_metadata"]: for metadata in report["report_metadata"]:
new_report[metadata] = report["report_metadata"][metadata] new_report[metadata] = report["report_metadata"][metadata]
new_report["interval_begin"] = record["interval_begin"]
new_report["interval_end"] = record["interval_end"]
new_report["normalized_timespan"] = record["normalized_timespan"]
new_report["published_policy"] = report["policy_published"] new_report["published_policy"] = report["policy_published"]
new_report["source_ip_address"] = record["source"]["ip_address"] new_report["source_ip_address"] = record["source"]["ip_address"]
new_report["source_country"] = record["source"]["country"] new_report["source_country"] = record["source"]["country"]
@@ -98,7 +101,9 @@ class HECClient(object):
new_report["spf_results"] = record["auth_results"]["spf"] new_report["spf_results"] = record["auth_results"]["spf"]
data["sourcetype"] = "dmarc:aggregate" data["sourcetype"] = "dmarc:aggregate"
timestamp = human_timestamp_to_unix_timestamp(new_report["begin_date"]) timestamp = human_timestamp_to_unix_timestamp(
new_report["interval_begin"]
)
data["time"] = timestamp data["time"] = timestamp
data["event"] = new_report.copy() data["event"] = new_report.copy()
json_str += "{0}\n".format(json.dumps(data)) json_str += "{0}\n".format(json.dumps(data))