Add RFC 9989/9990/9991 (final DMARC) report support; rename forensic→failure project-wide (#659)

* Add DMARCbis report support; rename forensic→failure project-wide

Rebased on top of master @ 2cda5bf (9.9.0), which added the ASN
source attribution work (#712, #713, #714, #715). Individual Copilot
iteration commits squashed into this single commit — the per-commit
history on the feature branch was iterative (add tests, fix lint,
move field, revert, etc.) and not worth preserving; GitHub squash-
merges PRs anyway.

New fields from the DMARCbis XSD, plumbed through types, parsing, CSV
output, and the Elasticsearch / OpenSearch mappings:

- ``np`` — non-existent subdomain policy (``none`` / ``quarantine`` /
  ``reject``)
- ``testing`` — testing mode flag (``n`` / ``y``), replaces RFC 7489
  ``pct``
- ``discovery_method`` — policy discovery method (``psl`` /
  ``treewalk``)
- ``generator`` — report generator software identifier (metadata)
- ``human_result`` — optional descriptive text on DKIM / SPF results

RFC 7489 reports parse with ``None`` for DMARCbis-only fields.

Forensic reports have been renamed to failure reports throughout the
project to reflect the proper naming since RFC 7489.

- Core: ``types.py``, ``__init__.py`` — ``ForensicReport`` →
  ``FailureReport``, ``parse_forensic_report`` →
  ``parse_failure_report``, report type ``"failure"``.
- Output modules: ``elastic.py``, ``opensearch.py``, ``splunk.py``,
  ``kafkaclient.py``, ``syslog.py``, ``gelf.py``, ``webhook.py``,
  ``loganalytics.py``, ``s3.py``.
- CLI: ``cli.py`` — args, config keys, index names
  (``dmarc_failure``).
- Docs + dashboards: all markdown, Grafana JSON, Kibana NDJSON,
  Splunk XML.

Backward compatibility preserved: old function / type names remain as
aliases (``parse_forensic_report = parse_failure_report``,
``ForensicReport = FailureReport``, etc.), CLI accepts both the old
(``save_forensic``, ``forensic_topic``) and new (``save_failure``,
``failure_topic``) config keys, and updated dashboards query both
old and new index / sourcetype names so data from before and after
the rename appears together.

Merge conflicts resolved in ``parsedmarc/constants.py`` (took bis's
10.0.0 bump), ``parsedmarc/__init__.py`` (combined bis's "failure"
wording with master's IPinfo MMDB mention), ``parsedmarc/elastic.py``
and ``parsedmarc/opensearch.py`` (kept master's ``source_asn`` /
``source_asn_name`` / ``source_asn_domain`` on the failure doc path
while renaming ``forensic_report`` → ``failure_report``), and
``CHANGELOG.md`` (10.0.0 entry now sits above the 9.9.0 entry).

All 324 tests pass; ``ruff check`` / ``ruff format --check`` clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Apply post-RFC review fixes: RFC 9990 detection, langAttrString, CFWS-aware RUF parsing

Aligns the implementation with the final RFCs (9989/9990/9991) instead of
inferring DMARCbis support from the version element or the namespace alone.

Aggregate parsing (RFC 9990):
- _text() helper unwraps langAttrString values (extra_contact_info, error,
  comment, human_result, generator) — when reporters include the lang
  attribute, xmltodict yields {"#text": ..., "@lang": ...} dicts instead
  of strings; the parser now stores the text payload in both shapes.
- New xml_namespace field on AggregateReport records the declared XML
  namespace (urn:ietf:params:xml:ns:dmarc-2.0 for RFC 9990 reports).
- RFC 9990 detection accepts namespaceless reports that follow the
  RFC 9990 shape (presence of np / testing / discovery_method / generator),
  so reporters that don't declare the namespace still receive RFC 9990-
  aware validation.
- Warnings: missing DKIM <selector> (REQUIRED in RFC 9990); legacy
  forwarded / sampled_out policy-override types (removed by RFC 9990);
  unknown policy-override types per the RFC 9990 enumeration.
- xml_namespace added to Elasticsearch and OpenSearch document mappings.

Failure parsing (RFC 9991):
- Identity-Alignment and Auth-Failure are split on commas with CFWS
  whitespace stripped per the RFC 9991 ABNF; previously "dkim, spf"
  yielded ["dkim", " spf"] with a leading space on the second token.
- Warnings logged when either REQUIRED field is missing.

Terminology: every reference to "DMARCbis" in code, tests, sample
filenames, AGENTS.md, and CHANGELOG.md is replaced with the appropriate
RFC number (9989 for the policy spec, 9990 for aggregate reports, 9991
for failure reports). Sample contents are unchanged.

Docs: corrects the prior claim that fo was dropped from RFC 9990 (only
pct was), reframes testing as a new field (not a pct replacement, since
RFC 9989 Appendix A.6 removed pct with no per-message substitute), and
documents the policy_override_reason enum changes (added policy_test_mode;
removed forwarded / sampled_out).

Tests: 8 new tests covering xml_namespace capture, RFC 9990 detection
from field shape, missing-DKIM-selector warning, legacy-override-type
warning, langAttrString unwrapping across all four affected elements,
and CFWS-aware Identity-Alignment / Auth-Failure parsing plus their
missing-field warnings. 276 tests total, all passing; ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Copilot
2026-05-20 18:51:08 -04:00
committed by GitHub
parent 8c5f63620c
commit ae1e5adb66
35 changed files with 3744 additions and 485 deletions
+1
View File
@@ -1,6 +1,7 @@
{
"permissions": {
"allow": [
"Bash(git fetch:*)",
"Bash(python -c \"import py_compile; py_compile.compile\\(''parsedmarc/cli.py'', doraise=True\\)\")",
"Bash(ruff check:*)",
"Bash(ruff format:*)",
+4 -1
View File
@@ -78,7 +78,10 @@ jobs:
run: |
pip install -e .
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/*
parsedmarc --debug -c ci.ini samples/failure/*
- name: Test building packages
run: |
hatch build
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
+26 -6
View File
@@ -4,7 +4,7 @@ This file provides guidance to AI agents when working with code in this reposito
## Project Overview
parsedmarc is a Python module and CLI utility for parsing DMARC aggregate (RUA), forensic (RUF), and SMTP TLS reports. It reads reports from IMAP, Microsoft Graph, Gmail API, Maildir, mbox files, or direct file paths, and outputs to JSON/CSV, Elasticsearch, OpenSearch, Splunk, Kafka, S3, Azure Log Analytics, syslog, or webhooks.
parsedmarc is a Python module and CLI utility for parsing DMARC aggregate (RUA), failure/forensic (RUF), and SMTP TLS reports. It supports both RFC 7489 / RFC 6591 and the final DMARC RFCs — RFC 9989 (DMARC policy), RFC 9990 (aggregate reporting), and RFC 9991 (failure reporting) — in both directions. It reads reports from IMAP, Microsoft Graph, Gmail API, Maildir, mbox files, or direct file paths, and outputs to JSON/CSV, Elasticsearch, OpenSearch, Splunk, Kafka, S3, Azure Log Analytics, syslog, or webhooks.
## Common Commands
@@ -24,7 +24,7 @@ ruff format .
# Test CLI with sample reports
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/*
parsedmarc --debug -c ci.ini samples/failure/*
# Build docs
cd docs && make html
@@ -41,16 +41,36 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
### Key modules
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing (`_load_config` + `_parse_config`), output orchestration. Supports configuration via INI files, `PARSEDMARC_{SECTION}_{KEY}` environment variables, or both (env vars override file values).
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_failure_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`. Legacy aliases (`parse_forensic_report`, etc.) are preserved for backward compatibility.
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing (`_load_config` + `_parse_config`), output orchestration. Supports configuration via INI files, `PARSEDMARC_{SECTION}_{KEY}` environment variables, or both (env vars override file values). Accepts both old (`save_forensic`, `forensic_topic`) and new (`save_failure`, `failure_topic`) config keys.
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `FailureReport`, `SMTPTLSReport`, `ParsingResults`). Legacy alias `ForensicReport = FailureReport` preserved.
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
- `parsedmarc/{elastic,opensearch,splunk,kafkaclient,loganalytics,syslog,s3,webhook,gelf}.py` — Output integrations
### Report type system
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError``InvalidDMARCReport``InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
`ReportType = Literal["aggregate", "failure", "smtp_tls"]`. Exception hierarchy: `ParserError``InvalidDMARCReport``InvalidAggregateReport`/`InvalidFailureReport`, and `InvalidSMTPTLSReport`. Legacy alias `InvalidForensicReport = InvalidFailureReport` preserved.
### RFC 9989 / RFC 9990 / RFC 9991 support
Aggregate reports parse under both RFC 7489 and RFC 9990 in one code path. RFC 9990 adds these fields, all surfaced through `AggregatePolicyPublished` / `AggregateReportMetadata` / `AggregateAuthResult*`:
- `np` — non-existent subdomain policy (`none`/`quarantine`/`reject`).
- `testing``n`/`y` flag reporting whether the published DMARC record sets `t=y`. It is a **new field**, not a replacement for `pct`; RFC 9989 Appendix A.6 removed the `pct` mechanism entirely with no per-message substitute.
- `discovery_method``psl`/`treewalk`.
- `generator` — free-text reporter software identifier, in `report_metadata`.
- `human_result` — optional descriptive text on each DKIM/SPF auth result.
`pct` is no longer part of RFC 9990's `PolicyPublishedType` and parses as `None` when absent. `fo` is **still** part of RFC 9990 (`minOccurs="0"`) and is preserved when set; it parses as `None` only when the reporter omits it. Don't repeat the older project shorthand that "RFC 9990 drops both" — only `pct` was dropped.
The parser detects an RFC 9990 report from the `urn:ietf:params:xml:ns:dmarc-2.0` XML namespace **or** the presence of any RFC 9990-only field. Real-world reporters frequently follow the RFC 9990 shape without declaring the namespace, so namespace-less RFC 9990-shaped reports still get RFC 9990-aware validation warnings (missing required DKIM `selector`, removed-in-RFC-9990 policy-override types `forwarded` / `sampled_out`). The namespace value (if any) is preserved on the parsed report as `xml_namespace`.
RFC 9990's `PolicyOverrideType` enumeration is `{local_policy, mailing_list, other, policy_test_mode, trusted_forwarder}`. `policy_test_mode` is new (emitted when `t=y` suppresses enforcement); `forwarded` and `sampled_out` were removed. Override types are stored as-is and warned about on mismatch.
Several elements (`extra_contact_info`, `error`, `comment`, `human_result`) are `langAttrString` in RFC 9990 — i.e. xs:string with an optional `lang` attribute. When the reporter sends the attribute, xmltodict turns the element into `{"#text": "...", "@lang": "en"}`; the parser unwraps that to a plain string via `_text()`.
Failure reports (RFC 9991): `Identity-Alignment` and `Auth-Failure` are split on CFWS-aware commas (each token stripped per the RFC 9991 ABNF), and a warning is logged when either REQUIRED field is missing.
### Configuration
+51 -2
View File
@@ -1,5 +1,54 @@
# Changelog
## 10.0.0
### Enhancements
#### Support for RFC 9989 / RFC 9990 / RFC 9991 reports
Adds parsing support for the final DMARC specification (RFC 9989), the new aggregate-report schema (RFC 9990), and the new failure-report format (RFC 9991), while preserving full RFC 7489 / RFC 6591 backward compatibility.
New aggregate-report fields surfaced from the RFC 9990 XSD — added to types, parsing, CSV output, and Elasticsearch/OpenSearch mappings:
- `np` — non-existent subdomain policy (`none`/`quarantine`/`reject`)
- `testing` — testing mode flag (`n`/`y`); reports whether the published DMARC record sets `t=y`. It is a **new field**, not a replacement for `pct`; the `pct` mechanism was removed entirely by RFC 9989 Appendix A.6 with no per-message replacement.
- `discovery_method` — policy discovery method (`psl`/`treewalk`)
- `generator` — report generator software identifier, in `report_metadata`
- `human_result` — optional descriptive text on DKIM/SPF auth results (langAttrString; a possible `lang` attribute is automatically unwrapped)
- `xml_namespace` — the XML namespace declared on the `<feedback>` root, if any. RFC 9990 reports declare `urn:ietf:params:xml:ns:dmarc-2.0`.
`pct` is no longer present in RFC 9990's `PolicyPublishedType` and parses as `None` when absent. `fo` is still part of RFC 9990 and is preserved when set; it parses as `None` only when the reporter omits it.
The parser detects an RFC 9990 report from the dmarc-2.0 XML namespace **or** the presence of any RFC 9990-only field, so namespaceless reports that follow the RFC 9990 shape still receive RFC 9990-aware validation warnings (missing required DKIM `selector`, removed-in-RFC-9990 policy-override types `forwarded` / `sampled_out`). RFC 9990 also added `policy_test_mode` to the policy-override enumeration; it is parsed and stored unchanged.
For failure reports (RFC 9991), `Identity-Alignment` and `Auth-Failure` are split on CFWS-aware commas (whitespace is stripped from each token, per the RFC 9991 ABNF) and a warning is logged when either REQUIRED field is missing.
Several elements that became `langAttrString` in RFC 9990 (`extra_contact_info`, `error`, `comment`, `human_result`) are now safely unwrapped when the reporter sends them with a `lang` attribute.
Backwards compatibility to RFC 7489 is maintained.
### Breaking changes
#### Forensic reports have been renamed to failure reports
Forensic reports have been renamed to failure reports throughout the project to reflect the proper naming of the reports since RFC 7489.
- **Core**: `types.py`, `__init__.py``ForensicReport``FailureReport`, `parse_forensic_report``parse_failure_report`, report type `"failure"`
- **Output modules**: `elastic.py`, `opensearch.py`, `splunk.py`, `kafkaclient.py`, `syslog.py`, `gelf.py`, `webhook.py`, `loganalytics.py`, `s3.py`
- **CLI**: `cli.py` — args, config keys, index names (`dmarc_failure`)
- **Docs & dashboards**: all markdown, Grafana JSON, OpenSearch NDJSON, Splunk XML
##### Backward compatibility
- Old function/type names preserved as aliases: `parse_forensic_report = parse_failure_report`, `ForensicReport = FailureReport`, etc.
- CLI config accepts both old (`save_forensic`, `forensic_topic`) and new keys (`save_failure`, `failure_topic`)
- IMAP archive subfolder name is intentionally kept as `Forensic` (under `archive_folder`) so existing deployments don't end up with a split archive across `Forensic/` and `Failure/`.
- RFC 7489 reports parse with `None` for RFC 9990-only fields
- **Updated dashboards with queries are backward compatible**: queries match data indexed under both old (`dmarc_forensic*` / `dmarc:forensic`) and new (`dmarc_failure*` / `dmarc:failure`) names, so dashboards show data from before and after the rename:
- **OpenSearch Dashboards**: Index pattern uses `dmarc_f*` to match both `dmarc_forensic*` and `dmarc_failure*`
- **Splunk**: Base search queries `(sourcetype="dmarc:failure" OR sourcetype="dmarc:forensic")`
- **Elasticsearch/OpenSearch**: Duplicate-check searches query across both `dmarc_failure*` and `dmarc_forensic*` index patterns
## 9.11.2
### Changes
@@ -60,7 +109,7 @@
- Renamed `[general] ip_db_url` to `ipinfo_url` to reflect what it actually overrides (the bundled IPinfo Lite MMDB download URL). The old name is still accepted as a deprecated alias and logs a warning on use; the env-var equivalent is now `PARSEDMARC_GENERAL_IPINFO_URL`, with `PARSEDMARC_GENERAL_IP_DB_URL` also still honored.
- Added an optional IPinfo Lite REST API path for country + ASN lookups, so deployments that want the freshest data can query the API directly instead of waiting for the next MMDB release. Configure `[general] ipinfo_api_token` (or `PARSEDMARC_GENERAL_IPINFO_API_TOKEN`) and every IP lookup hits `https://api.ipinfo.io/lite/<ip>` first. At startup the `https://ipinfo.io/me` account endpoint is hit once to validate the token and log the plan, month-to-date usage, and remaining quota at info level (e.g. `IPinfo API configured — plan: Lite, usage: 12345/50000 this month, 37655 remaining`). An invalid token exits the process with a fatal error. Rate-limit (HTTP 429) and quota-exhausted (HTTP 402) responses put the API in a cooldown (honoring `Retry-After`, with a 5-minute / 1-hour default) and fall through to the bundled/cached MMDB; the first event is logged once at warning level and recovery is logged once at info level when the next lookup succeeds. Transient network errors fall through per-request without triggering a cooldown. The API token is never logged.
- Renamed the ASN name and domain fields to match the IPinfo Lite MMDB's native schema: `asn_name``as_name` and `asn_domain``as_domain` on every source record (JSON output), and `source_asn_name``source_as_name` / `source_asn_domain``source_as_domain` in CSV output (aggregate + forensic) and the Elasticsearch / OpenSearch / Splunk integrations. The integer `asn` / `source_asn` field is unchanged. The emitted order is `asn`, `as_name`, `as_domain`.
- Renamed the ASN name and domain fields to match the IPinfo Lite MMDB's native schema: `asn_name``as_name` and `asn_domain``as_domain` on every source record (JSON output), and `source_asn_name``source_as_name` / `source_asn_domain``source_as_domain` in CSV output (aggregate + failure) and the Elasticsearch / OpenSearch / Splunk integrations. The integer `asn` / `source_asn` field is unchanged. The emitted order is `asn`, `as_name`, `as_domain`.
### Upgrade notes
@@ -240,7 +289,7 @@
### Fixed
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
- `get_index_prefix()` crashed on failure reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
## 9.2.1
+3 -1
View File
@@ -1,3 +1,5 @@
# CLAUD.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
@AGENTS.md
@@ -83,7 +83,7 @@
"id": 28,
"panels": [
{
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Forensic Samples\r\nThe DMARC Forensic Samples section contains information on DMARC forensic reports (also known as failure reports or ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send forensic/failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is \u201cExternal Destination Verification\u201d?](https://dmarcian.com/what-is-external-destination-verification/)",
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Failure Samples\r\nThe DMARC Failure Samples section contains information on DMARC failure reports (also known as forensic or ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is External Destination Verification?](https://dmarcian.com/what-is-external-destination-verification/)",
"datasource": null,
"fieldConfig": {
"defaults": {
@@ -101,7 +101,7 @@
"links": [],
"mode": "markdown",
"options": {
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Forensic Samples\r\nThe DMARC Forensic Samples section contains information on DMARC forensic reports (also known as failure reports or ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send forensic/failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is \u201cExternal Destination Verification\u201d?](https://dmarcian.com/what-is-external-destination-verification/)",
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Failure Samples\r\nThe DMARC Failure Samples section contains information on DMARC failure reports (also known as forensic or ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is External Destination Verification?](https://dmarcian.com/what-is-external-destination-verification/)",
"mode": "markdown"
},
"pluginVersion": "7.1.0",
@@ -2,7 +2,7 @@
<label>DMARC Failure Data</label>
<search id="base_search">
<query>
index="email" (sourcetype="dmarc:forensic" sourcetype="dmarc:failure")
index="email" (sourcetype="dmarc:failure" OR sourcetype="dmarc:forensic")
(parsed_sample.headers.From=$header_from$ OR NOT parsed_sample.headers.From=*)
(parsed_sample.headers.To=$header_to$ OR NOT parsed_sample.headers.To=*)
(parsed_sample.headers.Subject=$header_subject$ OR NOT parsed_sample.headers.Subject=*)
+1 -1
View File
@@ -214,7 +214,7 @@ Kibana index patterns with versions that match the upgraded indexes:
1. Login in to Kibana, and click on Management
2. Under Kibana, click on Saved Objects
3. Check the checkboxes for the `dmarc_aggregate` and `dmarc_forensic`
3. Check the checkboxes for the `dmarc_aggregate` and `dmarc_failure`
index patterns
4. Click Delete
5. Click Delete on the conformation message
+1 -1
View File
@@ -2,7 +2,7 @@
[general]
save_aggregate = True
save_forensic = True
save_failure = True
[imap]
host = imap.example.com
+1 -1
View File
@@ -30,7 +30,7 @@ and Valimail.
## Features
- Parses draft and 1.0 standard aggregate/rua DMARC reports
- Parses forensic/failure/ruf DMARC reports
- Parses failure/ruf DMARC reports
- Parses reports from SMTP TLS Reporting
- Can parse reports from an inbox over IMAP, Microsoft Graph, or Gmail API
- Transparently handles gzip or zip compressed reports
+1 -1
View File
@@ -89,7 +89,7 @@ information on DMARC failure reports (also known as forensic or ruf reports).
These reports contain samples of emails that have failed to pass DMARC.
:::{note}
Most recipients do not send forensic/failure/ruf reports at all to avoid
Most recipients do not send failure/ruf reports at all to avoid
privacy leaks. Some recipients (notably Chinese webmail services) will only
supply the headers of sample emails. Very few provide the entire email.
:::
+4 -4
View File
@@ -99,12 +99,12 @@ draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391
```
## Sample forensic report output
## Sample failure report output
Thanks to GitHub user [xennn](https://github.com/xennn) for the anonymized
[forensic report email sample](<https://github.com/domainaware/parsedmarc/raw/master/samples/forensic/DMARC%20Failure%20Report%20for%20domain.de%20(mail-from%3Dsharepoint%40domain.de%2C%20ip%3D10.10.10.10).eml>).
[failure report email sample](<https://github.com/domainaware/parsedmarc/raw/master/samples/failure/DMARC%20Failure%20Report%20for%20domain.de%20(mail-from%3Dsharepoint%40domain.de%2C%20ip%3D10.10.10.10).eml>).
### JSON forensic report
### JSON failure report
```json
{
@@ -198,7 +198,7 @@ Thanks to GitHub user [xennn](https://github.com/xennn) for the anonymized
}
```
### CSV forensic report
### CSV failure report
```text
feedback_type,user_agent,version,original_envelope_id,original_mail_from,original_rcpt_to,arrival_date,arrival_date_utc,subject,message_id,authentication_results,dkim_domain,source_ip_address,source_country,source_reverse_dns,source_base_domain,source_name,source_type,source_asn,source_as_name,source_as_domain,delivery_result,auth_failure,reported_domain,authentication_mechanisms,sample_headers_only
+2 -2
View File
@@ -1,10 +1,10 @@
# Splunk
Starting in version 4.3.0 `parsedmarc` supports sending aggregate and/or
forensic DMARC data to a Splunk [HTTP Event collector (HEC)].
failure DMARC data to a Splunk [HTTP Event collector (HEC)].
The project repository contains [XML files] for premade Splunk
dashboards for aggregate and forensic DMARC reports.
dashboards for aggregate and failure DMARC reports.
Copy and paste the contents of each file into a separate Splunk
dashboard XML editor.
+25 -25
View File
@@ -4,9 +4,9 @@
```text
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--forensic-json-filename FORENSIC_JSON_FILENAME]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--failure-json-filename FAILURE_JSON_FILENAME]
[--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME] [--aggregate-csv-filename AGGREGATE_CSV_FILENAME]
[--forensic-csv-filename FORENSIC_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME]
[--failure-csv-filename FAILURE_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME]
[-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] [-s] [-w] [--verbose] [--debug]
[--log-file LOG_FILE] [--no-prettify-json] [-v]
[file_path ...]
@@ -14,26 +14,26 @@ usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT
Parses DMARC reports
positional arguments:
file_path one or more paths to aggregate or forensic report files, emails, or mbox files'
file_path one or more paths to aggregate or failure report files, emails, or mbox files'
options:
-h, --help show this help message and exit
-c CONFIG_FILE, --config-file CONFIG_FILE
a path to a configuration file (--silent implied)
--strip-attachment-payloads
remove attachment payloads from forensic report output
remove attachment payloads from failure report output
-o OUTPUT, --output OUTPUT
write output files to the given directory
--aggregate-json-filename AGGREGATE_JSON_FILENAME
filename for the aggregate JSON output file
--forensic-json-filename FORENSIC_JSON_FILENAME
filename for the forensic JSON output file
--failure-json-filename FAILURE_JSON_FILENAME
filename for the failure JSON output file
--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME
filename for the SMTP TLS JSON output file
--aggregate-csv-filename AGGREGATE_CSV_FILENAME
filename for the aggregate CSV output file
--forensic-csv-filename FORENSIC_CSV_FILENAME
filename for the forensic CSV output file
--failure-csv-filename FAILURE_CSV_FILENAME
filename for the failure CSV output file
--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME
filename for the SMTP TLS CSV output file
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
@@ -70,7 +70,7 @@ For example
[general]
save_aggregate = True
save_forensic = True
save_failure = True
[imap]
host = imap.example.com
@@ -109,7 +109,7 @@ mode = tcp
[webhook]
aggregate_url = https://aggregate_url.example.com
forensic_url = https://forensic_url.example.com
failure_url = https://failure_url.example.com
smtp_tls_url = https://smtp_tls_url.example.com
timeout = 60
```
@@ -119,7 +119,7 @@ The full set of configuration options are:
- `general`
- `save_aggregate` - bool: Save aggregate report data to
Elasticsearch, Splunk and/or S3
- `save_forensic` - bool: Save forensic report data to
- `save_failure` - bool: Save failure report data to
Elasticsearch, Splunk and/or S3
- `save_smtp_tls` - bool: Save SMTP-STS report data to
Elasticsearch, Splunk and/or S3
@@ -130,7 +130,7 @@ The full set of configuration options are:
- `output` - str: Directory to place JSON and CSV files in. This is required if you set either of the JSON output file options.
- `aggregate_json_filename` - str: filename for the aggregate
JSON output file
- `forensic_json_filename` - str: filename for the forensic
- `failure_json_filename` - str: filename for the failure
JSON output file
- `ip_db_path` - str: An optional custom path to a MMDB file
from IPinfo, MaxMind, or DBIP
@@ -340,7 +340,7 @@ The full set of configuration options are:
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `aggregate_topic` - str: The Kafka topic for aggregate reports
- `forensic_topic` - str: The Kafka topic for forensic reports
- `failure_topic` - str: The Kafka topic for failure reports
- `smtp`
- `host` - str: The SMTP hostname
- `port` - int: The SMTP port (Default: `25`)
@@ -458,7 +458,7 @@ The full set of configuration options are:
- `dce` - str: The Data Collection Endpoint (DCE). Example: `https://{DCE-NAME}.{REGION}.ingest.monitor.azure.com`.
- `dcr_immutable_id` - str: The immutable ID of the Data Collection Rule (DCR)
- `dcr_aggregate_stream` - str: The stream name for aggregate reports in the DCR
- `dcr_forensic_stream` - str: The stream name for the forensic reports in the DCR
- `dcr_failure_stream` - str: The stream name for the failure reports in the DCR
- `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR
:::{note}
@@ -475,7 +475,7 @@ The full set of configuration options are:
- `webhook` - Post the individual reports to a webhook url with the report as the JSON body
- `aggregate_url` - str: URL of the webhook which should receive the aggregate reports
- `forensic_url` - str: URL of the webhook which should receive the forensic reports
- `failure_url` - str: URL of the webhook which should receive the failure reports
- `smtp_tls_url` - str: URL of the webhook which should receive the smtp_tls reports
- `timeout` - int: Interval in which the webhook call should timeout
@@ -490,26 +490,26 @@ blocks DNS requests to outside resolvers.
:::
:::{note}
`save_aggregate` and `save_forensic` are separate options
because you may not want to save forensic reports
(also known as failure reports) to your Elasticsearch instance,
`save_aggregate` and `save_failure` are separate options
because you may not want to save failure reports
(formerly known as forensic reports) to your Elasticsearch instance,
particularly if you are in a highly-regulated industry that
handles sensitive data, such as healthcare or finance. If your
legitimate outgoing email fails DMARC, it is possible
that email may appear later in a forensic report.
that email may appear later in a failure report.
Forensic reports contain the original headers of an email that
Failure reports contain the original headers of an email that
failed a DMARC check, and sometimes may also include the
full message body, depending on the policy of the reporting
organization.
Most reporting organizations do not send forensic reports of any
Most reporting organizations do not send failure reports of any
kind for privacy reasons. While aggregate DMARC reports are sent
at least daily, it is normal to receive very few forensic reports.
at least daily, it is normal to receive very few failure reports.
An alternative approach is to still collect forensic/failure/ruf
An alternative approach is to still collect failure/ruf
reports in your DMARC inbox, but run `parsedmarc` with
```save_forensic = True``` manually on a separate IMAP folder (using
```save_failure = True``` manually on a separate IMAP folder (using
the ```reports_folder``` option), after you have manually moved
known samples you want to save to that folder
(e.g. malicious samples and non-sensitive legitimate samples).
@@ -651,7 +651,7 @@ imports more predictable:
- Use `mailbox.since` to process reports in smaller time windows such as `1d`,
`7d`, or another interval that fits the backlog. This makes it easier to catch
up incrementally instead of loading an entire mailbox history in one run.
- Set `strip_attachment_payloads = True` when forensic reports contain large
- Set `strip_attachment_payloads = True` when failure reports contain large
attachments and you do not need to retain the raw payloads in the parsed
output.
- Prefer running parsedmarc separately from Elasticsearch or OpenSearch, or
+273 -100
View File
@@ -53,7 +53,8 @@ from parsedmarc.mail import (
)
from parsedmarc.types import (
AggregateReport,
ForensicReport,
FailureReport,
ForensicReport as ForensicReport,
ParsedReport,
ParsingResults,
SMTPTLSReport,
@@ -74,10 +75,33 @@ feedback_report_regex = re.compile(r"^([\w\-]+): (.+)$", re.MULTILINE)
xml_header_regex = re.compile(r"^<\?xml .*?>", re.MULTILINE)
xml_schema_regex = re.compile(r"</??xs:schema.*>", re.MULTILINE)
text_report_regex = re.compile(r"\s*([a-zA-Z\s]+):\s(.+)", re.MULTILINE)
# Captures the value of any xmlns (default or prefixed) declaration so the
# RFC 9990 namespace can be detected before xmltodict drops it.
xml_namespace_regex = re.compile(
r"""xmlns(?::[a-zA-Z_][\w.-]*)?\s*=\s*["']([^"']+)["']"""
)
# The XML namespace assigned to DMARC aggregate reports by RFC 9990.
RFC_9990_NAMESPACE = "urn:ietf:params:xml:ns:dmarc-2.0"
# PolicyOverrideType enumeration from RFC 9990. Compared to RFC 7489,
# `policy_test_mode` was added (emitted when t=y suppresses enforcement)
# and `forwarded` / `sampled_out` were removed.
RFC_9990_POLICY_OVERRIDE_TYPES = frozenset(
{
"local_policy",
"mailing_list",
"other",
"policy_test_mode",
"trusted_forwarder",
}
)
RFC_7489_REMOVED_POLICY_OVERRIDE_TYPES = frozenset({"forwarded", "sampled_out"})
MAGIC_ZIP = b"\x50\x4b\x03\x04"
MAGIC_GZIP = b"\x1f\x8b"
MAGIC_XML = b"\x3c\x3f\x78\x6d\x6c\x20"
MAGIC_XML_TAG = b"\x3c" # '<' - XML starting with an element tag (no declaration)
MAGIC_JSON = b"\7b"
EMAIL_SAMPLE_CONTENT_TYPES = (
@@ -112,8 +136,32 @@ class InvalidAggregateReport(InvalidDMARCReport):
"""Raised when an invalid DMARC aggregate report is encountered"""
class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered"""
class InvalidFailureReport(InvalidDMARCReport):
"""Raised when an invalid DMARC failure report is encountered"""
# Backward-compatible alias
InvalidForensicReport = InvalidFailureReport
def _text(value: Any) -> Optional[str]:
"""Unwrap a possibly-langAttrString value parsed by xmltodict.
RFC 9990 changed several aggregate-report elements (extra_contact_info,
error, comment, human_result) to type ``langAttrString`` — an
xs:simpleContent string with an optional ``lang`` attribute. When the
attribute is present, xmltodict parses the element as
``{"#text": "...", "@lang": "en"}`` instead of a plain string. Returns
the text payload for both shapes, ``None`` for unset values, and leaves
other scalar shapes untouched so callers can preserve whatever the
reporter sent.
"""
if value is None:
return None
if isinstance(value, dict):
text = value.get("#text")
return None if text is None else str(text)
return value
def _bucket_interval_by_day(
@@ -308,6 +356,7 @@ def _parse_report_record(
nameservers: Optional[list[str]] = None,
dns_timeout: float = DEFAULT_DNS_TIMEOUT,
dns_retries: int = DEFAULT_DNS_MAX_RETRIES,
is_rfc_9990: bool = False,
) -> dict[str, Any]:
"""
Converts a record from a DMARC aggregate report into a more consistent
@@ -357,8 +406,6 @@ def _parse_report_record(
}
if "disposition" in policy_evaluated:
new_policy_evaluated["disposition"] = policy_evaluated["disposition"]
if new_policy_evaluated["disposition"].strip().lower() == "pass":
new_policy_evaluated["disposition"] = "none"
if "dkim" in policy_evaluated:
new_policy_evaluated["dkim"] = policy_evaluated["dkim"]
if "spf" in policy_evaluated:
@@ -383,8 +430,27 @@ def _parse_report_record(
else:
reasons = [policy_evaluated["reason"]]
for reason in reasons:
if "comment" not in reason:
reason["comment"] = None
# `comment` is langAttrString in RFC 9990 — unwrap {"#text": ..., "@lang": ...}
reason["comment"] = _text(reason.get("comment"))
reason_type = reason.get("type")
if is_rfc_9990 and reason_type in RFC_7489_REMOVED_POLICY_OVERRIDE_TYPES:
logger.warning(
"Policy override reason type %r was removed in RFC 9990; "
"expected one of %s",
reason_type,
sorted(RFC_9990_POLICY_OVERRIDE_TYPES),
)
elif (
is_rfc_9990
and reason_type is not None
and reason_type not in RFC_9990_POLICY_OVERRIDE_TYPES
):
logger.warning(
"Unknown policy override reason type %r per RFC 9990; "
"expected one of %s",
reason_type,
sorted(RFC_9990_POLICY_OVERRIDE_TYPES),
)
new_policy_evaluated["policy_override_reasons"] = reasons
new_record["policy_evaluated"] = new_policy_evaluated
if "identities" in record:
@@ -414,11 +480,18 @@ def _parse_report_record(
if "selector" in result and result["selector"] is not None:
new_result["selector"] = result["selector"]
else:
if is_rfc_9990:
logger.warning(
"DKIM auth result for %r is missing the 'selector' "
"element, which is REQUIRED by RFC 9990",
result["domain"],
)
new_result["selector"] = "none"
if "result" in result and result["result"] is not None:
new_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_result["human_result"] = _text(result.get("human_result"))
new_record["auth_results"]["dkim"].append(new_result)
if not isinstance(auth_results["spf"], list):
@@ -434,6 +507,7 @@ def _parse_report_record(
new_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_result["human_result"] = _text(result.get("human_result"))
new_record["auth_results"]["spf"].append(new_result)
if "envelope_from" not in new_record["identifiers"]:
@@ -706,6 +780,21 @@ def parse_aggregate_report_xml(
# Parse XML and recover from errors
if isinstance(xml, bytes):
xml = xml.decode(errors="ignore")
# Detect the XML namespace before any rewriting strips it. The dmarc-2.0
# namespace is one of the indicators for an RFC 9990 report but it is
# NOT a reliable sole discriminator: the <version> element value is
# ambiguous (RFC 9990's appendix sample uses <version>1.0</version>
# inside the dmarc-2.0 namespace), and real-world reporters frequently
# emit RFC 9990-shaped reports without declaring the namespace at all.
# The final `is_rfc_9990` decision is made post-parse so that
# RFC 9990-only fields (np, testing, discovery_method, generator,
# human_result) can also vote it in.
xml_namespace: Optional[str] = None
namespace_match = xml_namespace_regex.search(xml)
if namespace_match:
xml_namespace = namespace_match.group(1)
try:
xmltodict.parse(xml)["feedback"]
except Exception as e:
@@ -729,16 +818,24 @@ def parse_aggregate_report_xml(
report = xmltodict.parse(xml)["feedback"]
report_metadata = report["report_metadata"]
# <email> is xs:string in both RFC 7489 and RFC 9990, but defensive
# parsing in the wild: some reporters emit it with an xml:lang or
# similar attribute, which xmltodict turns into a dict.
if isinstance(report_metadata.get("email"), dict):
logger.debug(
"Discarding malformed <email> in report_metadata: %r",
report_metadata["email"],
)
report_metadata["email"] = None
unwrapped = _text(report_metadata["email"])
if unwrapped is None:
logger.debug(
"Discarding malformed <email> in report_metadata: %r",
report_metadata["email"],
)
report_metadata["email"] = unwrapped
schema = "draft"
if "version" in report:
schema = report["version"]
new_report: dict[str, Any] = {"xml_schema": schema}
new_report: dict[str, Any] = {
"xml_schema": schema,
"xml_namespace": xml_namespace,
}
new_report_metadata: dict[str, Any] = {}
if report_metadata["org_name"] is None:
if report_metadata["email"] is not None:
@@ -759,9 +856,9 @@ def parse_aggregate_report_xml(
)
new_report_metadata["org_name"] = org_name
new_report_metadata["org_email"] = report_metadata["email"]
extra = None
if "extra_contact_info" in report_metadata:
extra = report_metadata["extra_contact_info"]
# extra_contact_info is langAttrString in RFC 9990 (xs:string in
# RFC 7489) — unwrap {"#text": ..., "@lang": ...} if present.
extra = _text(report_metadata.get("extra_contact_info"))
new_report_metadata["org_extra_contact_info"] = extra
new_report_metadata["report_id"] = report_metadata["report_id"]
report_id = new_report_metadata["report_id"]
@@ -789,17 +886,38 @@ def parse_aggregate_report_xml(
new_report_metadata["end_date"], to_utc=True
)
# <error> is langAttrString in RFC 9990 (xs:string in RFC 7489) and
# was cardinality-narrowed from "unbounded" to "1" in RFC 9990, but
# the parser still accepts a list for backward compatibility with
# RFC 7489 reports that carry multiple errors.
if "error" in report["report_metadata"]:
if not isinstance(report["report_metadata"]["error"], list):
errors = [report["report_metadata"]["error"]]
else:
errors = report["report_metadata"]["error"]
raw_errors = report["report_metadata"]["error"]
if not isinstance(raw_errors, list):
raw_errors = [raw_errors]
errors = [text for text in (_text(e) for e in raw_errors) if text]
new_report_metadata["errors"] = errors
# <generator> is a plain xs:string in RFC 9990 but apply _text() so
# a malformed reporter that decorates it with attributes still
# yields a string instead of breaking downstream consumers.
generator = _text(report_metadata.get("generator"))
new_report_metadata["generator"] = generator
new_report["report_metadata"] = new_report_metadata
records = []
policy_published = report["policy_published"]
if type(policy_published) is list:
policy_published = policy_published[0]
# Final RFC 9990 detection: the dmarc-2.0 XML namespace OR any
# RFC 9990-only field. Real-world reporters that follow the schema
# without declaring the namespace still get RFC 9990-aware
# warnings (missing DKIM selector, removed override-reason types,
# etc.) and a truthful audit trail in `xml_namespace`.
rfc_9990_only_policy_fields = {"np", "testing", "discovery_method"}
is_rfc_9990 = (
xml_namespace == RFC_9990_NAMESPACE
or "generator" in report_metadata
or any(f in policy_published for f in rfc_9990_only_policy_fields)
)
new_policy_published: dict[str, Any] = {}
new_policy_published["domain"] = policy_published["domain"]
adkim = "r"
@@ -818,16 +936,39 @@ def parse_aggregate_report_xml(
if policy_published["sp"] is not None:
sp = policy_published["sp"]
new_policy_published["sp"] = sp
pct = "100"
pct = None
if "pct" in policy_published:
if policy_published["pct"] is not None:
pct = policy_published["pct"]
new_policy_published["pct"] = pct
fo = "0"
fo = None
if "fo" in policy_published:
if policy_published["fo"] is not None:
fo = policy_published["fo"]
new_policy_published["fo"] = fo
np_ = None
if "np" in policy_published:
if policy_published["np"] is not None:
np_ = policy_published["np"]
if np_ not in ("none", "quarantine", "reject"):
logger.warning("Invalid np value: {0}".format(np_))
new_policy_published["np"] = np_
testing = None
if "testing" in policy_published:
if policy_published["testing"] is not None:
testing = policy_published["testing"]
if testing not in ("n", "y"):
logger.warning("Invalid testing value: {0}".format(testing))
new_policy_published["testing"] = testing
discovery_method = None
if "discovery_method" in policy_published:
if policy_published["discovery_method"] is not None:
discovery_method = policy_published["discovery_method"]
if discovery_method not in ("psl", "treewalk"):
logger.warning(
"Invalid discovery_method value: {0}".format(discovery_method)
)
new_policy_published["discovery_method"] = discovery_method
new_report["policy_published"] = new_policy_published
if type(report["record"]) is list:
@@ -847,6 +988,7 @@ def parse_aggregate_report_xml(
nameservers=nameservers,
dns_timeout=timeout,
dns_retries=retries,
is_rfc_9990=is_rfc_9990,
)
_append_parsed_record(
parsed_record=report_record,
@@ -869,6 +1011,7 @@ def parse_aggregate_report_xml(
nameservers=nameservers,
dns_timeout=timeout,
dns_retries=retries,
is_rfc_9990=is_rfc_9990,
)
_append_parsed_record(
parsed_record=report_record,
@@ -962,6 +1105,7 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
)
elif (
header[: len(MAGIC_XML)] == MAGIC_XML
or header[: len(MAGIC_XML_TAG)] == MAGIC_XML_TAG
or header[: len(MAGIC_JSON)] == MAGIC_JSON
):
report = file_object.read().decode(errors="ignore")
@@ -1091,6 +1235,9 @@ def parsed_aggregate_reports_to_csv_rows(
sp = report["policy_published"]["sp"]
pct = report["policy_published"]["pct"]
fo = report["policy_published"]["fo"]
np_ = report["policy_published"].get("np", None)
testing = report["policy_published"].get("testing", None)
discovery_method = report["policy_published"].get("discovery_method", None)
report_dict: dict[str, Any] = dict(
xml_schema=xml_schema,
@@ -1107,8 +1254,11 @@ def parsed_aggregate_reports_to_csv_rows(
aspf=aspf,
p=p,
sp=sp,
np=np_,
pct=pct,
fo=fo,
testing=testing,
discovery_method=discovery_method,
)
for record in report["records"]:
@@ -1207,8 +1357,11 @@ def parsed_aggregate_reports_to_csv(
"aspf",
"p",
"sp",
"np",
"pct",
"fo",
"testing",
"discovery_method",
"source_ip_address",
"source_country",
"source_reverse_dns",
@@ -1249,7 +1402,7 @@ def parsed_aggregate_reports_to_csv(
return csv_file_object.getvalue()
def parse_forensic_report(
def parse_failure_report(
feedback_report: str,
sample: str,
msg_date: datetime,
@@ -1263,9 +1416,9 @@ def parse_forensic_report(
dns_timeout: float = DEFAULT_DNS_TIMEOUT,
dns_retries: int = DEFAULT_DNS_MAX_RETRIES,
strip_attachment_payloads: bool = False,
) -> ForensicReport:
) -> FailureReport:
"""
Converts a DMARC forensic report and sample to a dict
Converts a DMARC failure report and sample to a dict
Args:
feedback_report (str): A message's feedback report as a string
@@ -1282,7 +1435,7 @@ def parse_forensic_report(
dns_retries (int): Number of times to retry DNS queries on timeout
or other transient errors
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
Returns:
dict: A parsed report and sample
@@ -1298,7 +1451,7 @@ def parse_forensic_report(
if "arrival_date" not in parsed_report:
if msg_date is None:
raise InvalidForensicReport("Forensic sample is not a valid email")
raise InvalidFailureReport("Failure sample is not a valid email")
parsed_report["arrival_date"] = msg_date.isoformat()
if "version" not in parsed_report:
@@ -1340,21 +1493,37 @@ def parse_forensic_report(
parsed_report["source"] = parsed_report_source
del parsed_report["source_ip"]
# Identity-Alignment is REQUIRED per RFC 9991 §4. Default silently for
# backward compatibility with pre-9991 reporters, but log so the
# offending reporter is visible. Values are CFWS-separated per the
# ABNF, so each mechanism is stripped after splitting.
if "identity_alignment" not in parsed_report:
logger.warning(
"Failure report missing required 'Identity-Alignment' "
"field (RFC 9991 §4); defaulting to no aligned mechanisms"
)
parsed_report["authentication_mechanisms"] = []
elif parsed_report["identity_alignment"] == "none":
parsed_report["authentication_mechanisms"] = []
del parsed_report["identity_alignment"]
else:
auth_mechanisms = parsed_report["identity_alignment"]
auth_mechanisms = auth_mechanisms.split(",")
parsed_report["authentication_mechanisms"] = auth_mechanisms
raw_alignment = parsed_report["identity_alignment"].strip()
if raw_alignment.lower() == "none":
parsed_report["authentication_mechanisms"] = []
else:
parsed_report["authentication_mechanisms"] = [
m.strip() for m in raw_alignment.split(",") if m.strip()
]
del parsed_report["identity_alignment"]
# Auth-Failure is REQUIRED per RFC 9991 §4. Comma-separated per ABNF
# so strip each token.
if "auth_failure" not in parsed_report:
logger.warning(
"Failure report missing required 'Auth-Failure' field "
"(RFC 9991 §4); defaulting to 'dmarc'"
)
parsed_report["auth_failure"] = "dmarc"
auth_failure = parsed_report["auth_failure"].split(",")
parsed_report["auth_failure"] = auth_failure
parsed_report["auth_failure"] = [
f.strip() for f in parsed_report["auth_failure"].split(",") if f.strip()
]
optional_fields = [
"original_envelope_id",
@@ -1385,27 +1554,27 @@ def parse_forensic_report(
parsed_report["sample"] = sample
parsed_report["parsed_sample"] = parsed_sample
return cast(ForensicReport, parsed_report)
return cast(FailureReport, parsed_report)
except KeyError as error:
raise InvalidForensicReport("Missing value: {0}".format(error.__str__()))
raise InvalidFailureReport("Missing value: {0}".format(error.__str__()))
except Exception as error:
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
raise InvalidFailureReport("Unexpected error: {0}".format(error.__str__()))
def parsed_forensic_reports_to_csv_rows(
reports: Union[ForensicReport, list[ForensicReport]],
def parsed_failure_reports_to_csv_rows(
reports: Union[FailureReport, list[FailureReport]],
) -> list[dict[str, Any]]:
"""
Converts one or more parsed forensic reports to a list of dicts in flat CSV
Converts one or more parsed failure reports to a list of dicts in flat CSV
format
Args:
reports: A parsed forensic report or list of parsed forensic reports
reports: A parsed failure report or list of parsed failure reports
Returns:
list: Parsed forensic report data as a list of dicts in flat CSV format
list: Parsed failure report data as a list of dicts in flat CSV format
"""
if isinstance(reports, dict):
reports = [reports]
@@ -1435,18 +1604,18 @@ def parsed_forensic_reports_to_csv_rows(
return rows
def parsed_forensic_reports_to_csv(
reports: Union[ForensicReport, list[ForensicReport]],
def parsed_failure_reports_to_csv(
reports: Union[FailureReport, list[FailureReport]],
) -> str:
"""
Converts one or more parsed forensic reports to flat CSV format, including
Converts one or more parsed failure reports to flat CSV format, including
headers
Args:
reports: A parsed forensic report or list of parsed forensic reports
reports: A parsed failure report or list of parsed failure reports
Returns:
str: Parsed forensic report data in flat CSV format, including headers
str: Parsed failure report data in flat CSV format, including headers
"""
fields = [
"feedback_type",
@@ -1481,7 +1650,7 @@ def parsed_forensic_reports_to_csv(
csv_writer = DictWriter(csv_file, fieldnames=fields)
csv_writer.writeheader()
rows = parsed_forensic_reports_to_csv_rows(reports)
rows = parsed_failure_reports_to_csv_rows(reports)
for row in rows:
new_row: dict[str, Any] = {}
@@ -1522,13 +1691,13 @@ def parse_report_email(
dns_retries (int): Number of times to retry DNS queries on timeout
or other transient errors
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
keep_alive (callable): keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict:
* ``report_type``: ``aggregate`` or ``forensic``
* ``report_type``: ``aggregate`` or ``failure``
* ``report``: The parsed report
"""
result: Optional[ParsedReport] = None
@@ -1672,7 +1841,7 @@ def parse_report_email(
if feedback_report and sample:
try:
forensic_report = parse_forensic_report(
failure_report = parse_failure_report(
feedback_report,
sample,
msg_date,
@@ -1686,17 +1855,17 @@ def parse_report_email(
dns_retries=dns_retries,
strip_attachment_payloads=strip_attachment_payloads,
)
except InvalidForensicReport as e:
except InvalidFailureReport as e:
error = (
'Message with subject "{0}" '
"is not a valid "
"forensic DMARC report: {1}".format(subject, e)
"failure DMARC report: {1}".format(subject, e)
)
raise InvalidForensicReport(error)
raise InvalidFailureReport(error)
except Exception as e:
raise InvalidForensicReport(e.__str__())
raise InvalidFailureReport(e.__str__())
result = {"report_type": "forensic", "report": forensic_report}
result = {"report_type": "failure", "report": failure_report}
return result
if result is None:
@@ -1721,7 +1890,7 @@ def parse_report_file(
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24,
) -> ParsedReport:
"""Parses a DMARC aggregate or forensic file at the given path, a
"""Parses a DMARC aggregate or failure file at the given path, a
file-like object. or bytes
Args:
@@ -1733,7 +1902,7 @@ def parse_report_file(
dns_retries (int): Number of times to retry DNS queries on timeout
or other transient errors
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
ip_db_path (str): Path to a MMDB file from IPinfo, MaxMind, or DBIP
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map
@@ -1829,7 +1998,7 @@ def get_dmarc_reports_from_mbox(
dns_retries (int): Number of times to retry DNS queries on timeout
or other transient errors
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map file
reverse_dns_map_url (str): URL to a reverse DNS map file
@@ -1838,11 +2007,11 @@ def get_dmarc_reports_from_mbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports``
"""
aggregate_reports: list[AggregateReport] = []
forensic_reports: list[ForensicReport] = []
failure_reports: list[FailureReport] = []
smtp_tls_reports: list[SMTPTLSReport] = []
try:
mbox = mailbox.mbox(input_)
@@ -1880,8 +2049,8 @@ def get_dmarc_reports_from_mbox(
"Skipping duplicate aggregate report "
f"from {report_org} with ID: {report_id}"
)
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
elif parsed_email["report_type"] == "failure":
failure_reports.append(parsed_email["report"])
elif parsed_email["report_type"] == "smtp_tls":
smtp_tls_reports.append(parsed_email["report"])
except InvalidDMARCReport as error:
@@ -1890,7 +2059,7 @@ def get_dmarc_reports_from_mbox(
raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_))
return {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"failure_reports": failure_reports,
"smtp_tls_reports": smtp_tls_reports,
}
@@ -1936,7 +2105,7 @@ def get_dmarc_reports_from_mailbox(
dns_retries (int): Number of times to retry DNS queries on timeout
or other transient errors
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
failure report results
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
(use 0 for no limit)
@@ -1947,7 +2116,7 @@ def get_dmarc_reports_from_mailbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports``
"""
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
@@ -1959,25 +2128,25 @@ def get_dmarc_reports_from_mailbox(
current_time: Optional[Union[datetime, date, str]] = None
aggregate_reports: list[AggregateReport] = []
forensic_reports: list[ForensicReport] = []
failure_reports: list[FailureReport] = []
smtp_tls_reports: list[SMTPTLSReport] = []
aggregate_report_msg_uids = []
forensic_report_msg_uids = []
failure_report_msg_uids = []
smtp_tls_msg_uids = []
aggregate_reports_folder = "{0}/Aggregate".format(archive_folder)
forensic_reports_folder = "{0}/Forensic".format(archive_folder)
failure_reports_folder = "{0}/Forensic".format(archive_folder)
smtp_tls_reports_folder = "{0}/SMTP-TLS".format(archive_folder)
invalid_reports_folder = "{0}/Invalid".format(archive_folder)
if results:
aggregate_reports = results["aggregate_reports"].copy()
forensic_reports = results["forensic_reports"].copy()
failure_reports = results["failure_reports"].copy()
smtp_tls_reports = results["smtp_tls_reports"].copy()
if not test and create_folders:
connection.create_folder(archive_folder)
connection.create_folder(aggregate_reports_folder)
connection.create_folder(forensic_reports_folder)
connection.create_folder(failure_reports_folder)
connection.create_folder(smtp_tls_reports_folder)
connection.create_folder(invalid_reports_folder)
@@ -2083,9 +2252,9 @@ def get_dmarc_reports_from_mailbox(
f"Skipping duplicate aggregate report with ID: {report_id}"
)
aggregate_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
forensic_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "failure":
failure_reports.append(parsed_email["report"])
failure_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "smtp_tls":
smtp_tls_reports.append(parsed_email["report"])
smtp_tls_msg_uids.append(message_id)
@@ -2112,7 +2281,7 @@ def get_dmarc_reports_from_mailbox(
if not test:
if delete:
processed_messages = (
aggregate_report_msg_uids + forensic_report_msg_uids + smtp_tls_msg_uids
aggregate_report_msg_uids + failure_report_msg_uids + smtp_tls_msg_uids
)
number_of_processed_msgs = len(processed_messages)
@@ -2152,24 +2321,24 @@ def get_dmarc_reports_from_mailbox(
message = "Error moving message UID"
e = "{0} {1}: {2}".format(message, msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
if len(forensic_report_msg_uids) > 0:
message = "Moving forensic report messages from"
if len(failure_report_msg_uids) > 0:
message = "Moving failure report messages from"
logger.debug(
"{0} {1} to {2}".format(
message, reports_folder, forensic_reports_folder
message, reports_folder, failure_reports_folder
)
)
number_of_forensic_msgs = len(forensic_report_msg_uids)
for i in range(number_of_forensic_msgs):
msg_uid = forensic_report_msg_uids[i]
number_of_failure_msgs = len(failure_report_msg_uids)
for i in range(number_of_failure_msgs):
msg_uid = failure_report_msg_uids[i]
message = "Moving message"
logger.debug(
"{0} {1} of {2}: UID {3}".format(
message, i + 1, number_of_forensic_msgs, msg_uid
message, i + 1, number_of_failure_msgs, msg_uid
)
)
try:
connection.move_message(msg_uid, forensic_reports_folder)
connection.move_message(msg_uid, failure_reports_folder)
except Exception as e:
e = "Error moving message UID {0}: {1}".format(msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
@@ -2196,7 +2365,7 @@ def get_dmarc_reports_from_mailbox(
logger.error("Mailbox error: {0}".format(e))
results = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"failure_reports": failure_reports,
"smtp_tls_reports": smtp_tls_reports,
}
@@ -2282,7 +2451,7 @@ def watch_inbox(
dns_retries (int): Number of times to retry DNS queries on timeout
or other transient errors
strip_attachment_payloads (bool): Replace attachment payloads in
forensic report samples with None
failure report samples with None
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
@@ -2327,7 +2496,7 @@ def append_json(
filename: str,
reports: Union[
Sequence[AggregateReport],
Sequence[ForensicReport],
Sequence[FailureReport],
Sequence[SMTPTLSReport],
],
) -> None:
@@ -2370,10 +2539,10 @@ def save_output(
*,
output_directory: str = "output",
aggregate_json_filename: str = "aggregate.json",
forensic_json_filename: str = "forensic.json",
failure_json_filename: str = "failure.json",
smtp_tls_json_filename: str = "smtp_tls.json",
aggregate_csv_filename: str = "aggregate.csv",
forensic_csv_filename: str = "forensic.csv",
failure_csv_filename: str = "failure.csv",
smtp_tls_csv_filename: str = "smtp_tls.csv",
):
"""
@@ -2383,15 +2552,15 @@ def save_output(
results: Parsing results
output_directory (str): The path to the directory to save in
aggregate_json_filename (str): Filename for the aggregate JSON file
forensic_json_filename (str): Filename for the forensic JSON file
failure_json_filename (str): Filename for the failure JSON file
smtp_tls_json_filename (str): Filename for the SMTP TLS JSON file
aggregate_csv_filename (str): Filename for the aggregate CSV file
forensic_csv_filename (str): Filename for the forensic CSV file
failure_csv_filename (str): Filename for the failure CSV file
smtp_tls_csv_filename (str): Filename for the SMTP TLS CSV file
"""
aggregate_reports = results["aggregate_reports"]
forensic_reports = results["forensic_reports"]
failure_reports = results["failure_reports"]
smtp_tls_reports = results["smtp_tls_reports"]
output_directory = os.path.expanduser(output_directory)
@@ -2410,13 +2579,11 @@ def save_output(
parsed_aggregate_reports_to_csv(aggregate_reports),
)
append_json(
os.path.join(output_directory, forensic_json_filename), forensic_reports
)
append_json(os.path.join(output_directory, failure_json_filename), failure_reports)
append_csv(
os.path.join(output_directory, forensic_csv_filename),
parsed_forensic_reports_to_csv(forensic_reports),
os.path.join(output_directory, failure_csv_filename),
parsed_failure_reports_to_csv(failure_reports),
)
append_json(
@@ -2433,10 +2600,10 @@ def save_output(
os.makedirs(samples_directory)
sample_filenames = []
for forensic_report in forensic_reports:
sample = forensic_report["sample"]
for failure_report in failure_reports:
sample = failure_report["sample"]
message_count = 0
parsed_sample = forensic_report["parsed_sample"]
parsed_sample = failure_report["parsed_sample"]
subject = (
parsed_sample.get("filename_safe_subject")
or parsed_sample.get("subject")
@@ -2570,3 +2737,9 @@ def email_results(
attachments=attachments,
plain_message=message,
)
# Backward-compatible aliases
parse_forensic_report = parse_failure_report
parsed_forensic_reports_to_csv_rows = parsed_failure_reports_to_csv_rows
parsed_forensic_reports_to_csv = parsed_failure_reports_to_csv
+74 -62
View File
@@ -335,14 +335,18 @@ def _parse_config(config: ConfigParser, opts):
opts.output = _expand_path(general_config["output"])
if "aggregate_json_filename" in general_config:
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
if "forensic_json_filename" in general_config:
opts.forensic_json_filename = general_config["forensic_json_filename"]
if "failure_json_filename" in general_config:
opts.failure_json_filename = general_config["failure_json_filename"]
elif "forensic_json_filename" in general_config:
opts.failure_json_filename = general_config["forensic_json_filename"]
if "smtp_tls_json_filename" in general_config:
opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"]
if "aggregate_csv_filename" in general_config:
opts.aggregate_csv_filename = general_config["aggregate_csv_filename"]
if "forensic_csv_filename" in general_config:
opts.forensic_csv_filename = general_config["forensic_csv_filename"]
if "failure_csv_filename" in general_config:
opts.failure_csv_filename = general_config["failure_csv_filename"]
elif "forensic_csv_filename" in general_config:
opts.failure_csv_filename = general_config["forensic_csv_filename"]
if "smtp_tls_csv_filename" in general_config:
opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"]
if "dns_timeout" in general_config:
@@ -377,8 +381,10 @@ def _parse_config(config: ConfigParser, opts):
)
if "save_aggregate" in general_config:
opts.save_aggregate = bool(general_config.getboolean("save_aggregate"))
if "save_forensic" in general_config:
opts.save_forensic = bool(general_config.getboolean("save_forensic"))
if "save_failure" in general_config:
opts.save_failure = bool(general_config.getboolean("save_failure"))
elif "save_forensic" in general_config:
opts.save_failure = bool(general_config.getboolean("save_forensic"))
if "save_smtp_tls" in general_config:
opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls"))
if "debug" in general_config:
@@ -772,11 +778,13 @@ def _parse_config(config: ConfigParser, opts):
raise ConfigurationError(
"aggregate_topic setting missing from the kafka config section"
)
if "forensic_topic" in kafka_config:
opts.kafka_forensic_topic = kafka_config["forensic_topic"]
if "failure_topic" in kafka_config:
opts.kafka_failure_topic = kafka_config["failure_topic"]
elif "forensic_topic" in kafka_config:
opts.kafka_failure_topic = kafka_config["forensic_topic"]
else:
raise ConfigurationError(
"forensic_topic setting missing from the kafka config section"
"failure_topic setting missing from the kafka config section"
)
if "smtp_tls_topic" in kafka_config:
opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"]
@@ -940,7 +948,9 @@ def _parse_config(config: ConfigParser, opts):
opts.la_dce = log_analytics_config.get("dce")
opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id")
opts.la_dcr_aggregate_stream = log_analytics_config.get("dcr_aggregate_stream")
opts.la_dcr_forensic_stream = log_analytics_config.get("dcr_forensic_stream")
opts.la_dcr_failure_stream = log_analytics_config.get(
"dcr_failure_stream"
) or log_analytics_config.get("dcr_forensic_stream")
opts.la_dcr_smtp_tls_stream = log_analytics_config.get("dcr_smtp_tls_stream")
if "gelf" in config.sections():
@@ -968,8 +978,10 @@ def _parse_config(config: ConfigParser, opts):
webhook_config = config["webhook"]
if "aggregate_url" in webhook_config:
opts.webhook_aggregate_url = webhook_config["aggregate_url"]
if "forensic_url" in webhook_config:
opts.webhook_forensic_url = webhook_config["forensic_url"]
if "failure_url" in webhook_config:
opts.webhook_failure_url = webhook_config["failure_url"]
elif "forensic_url" in webhook_config:
opts.webhook_failure_url = webhook_config["forensic_url"]
if "smtp_tls_url" in webhook_config:
opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"]
if "timeout" in webhook_config:
@@ -1112,13 +1124,13 @@ def _init_output_clients(opts):
try:
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_failure_url
or opts.webhook_smtp_tls_url
):
logger.debug("Initializing webhook client")
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
failure_url=opts.webhook_failure_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
@@ -1130,7 +1142,7 @@ def _init_output_clients(opts):
# step fails. Initialise them last so that all other clients are created
# successfully first; this minimizes the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
if opts.save_aggregate or opts.save_failure or opts.save_smtp_tls:
try:
if opts.elasticsearch_hosts:
logger.debug(
@@ -1139,17 +1151,17 @@ def _init_output_clients(opts):
opts.elasticsearch_ssl,
)
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_failure_index = "dmarc_failure"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_failure_index = "{0}_{1}".format(es_failure_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_failure_index = "{0}{1}".format(prefix, es_failure_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
@@ -1168,7 +1180,7 @@ def _init_output_clients(opts):
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
failure_indexes=[es_failure_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
except Exception as e:
@@ -1182,17 +1194,17 @@ def _init_output_clients(opts):
opts.opensearch_ssl,
)
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_failure_index = "dmarc_failure"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_failure_index = "{0}_{1}".format(os_failure_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_failure_index = "{0}{1}".format(prefix, os_failure_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
@@ -1214,7 +1226,7 @@ def _init_output_clients(opts):
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
failure_indexes=[os_failure_index],
)
clients["opensearch"] = _OpenSearchHandle()
except Exception as e:
@@ -1306,10 +1318,10 @@ def _main():
reports_,
output_directory=opts.output,
aggregate_json_filename=opts.aggregate_json_filename,
forensic_json_filename=opts.forensic_json_filename,
failure_json_filename=opts.failure_json_filename,
smtp_tls_json_filename=opts.smtp_tls_json_filename,
aggregate_csv_filename=opts.aggregate_csv_filename,
forensic_csv_filename=opts.forensic_csv_filename,
failure_csv_filename=opts.failure_csv_filename,
smtp_tls_csv_filename=opts.smtp_tls_csv_filename,
)
@@ -1321,7 +1333,7 @@ def _main():
webhook_client = clients.get("webhook_client")
kafka_aggregate_topic = opts.kafka_aggregate_topic
kafka_forensic_topic = opts.kafka_forensic_topic
kafka_failure_topic = opts.kafka_failure_topic
kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic
if opts.save_aggregate:
@@ -1409,13 +1421,13 @@ def _main():
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.save_forensic:
for report in reports_["forensic_reports"]:
if opts.save_failure:
for report in reports_["failure_reports"]:
try:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
if opts.elasticsearch_hosts:
elastic.save_forensic_report_to_elasticsearch(
elastic.save_failure_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
@@ -1435,7 +1447,7 @@ def _main():
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
if opts.opensearch_hosts:
opensearch.save_forensic_report_to_opensearch(
opensearch.save_failure_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
@@ -1453,34 +1465,34 @@ def _main():
try:
if kafka_client:
kafka_client.save_forensic_reports_to_kafka(
report, kafka_forensic_topic
kafka_client.save_failure_reports_to_kafka(
report, kafka_failure_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_forensic_report_to_s3(report)
s3_client.save_failure_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if syslog_client:
syslog_client.save_forensic_report_to_syslog(report)
syslog_client.save_failure_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_forensic_report_to_gelf(report)
gelf_client.save_failure_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_forensic_url and webhook_client:
if opts.webhook_failure_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_forensic_report_to_webhook(
webhook_client.save_failure_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
@@ -1488,9 +1500,9 @@ def _main():
if hec_client:
try:
forensic_reports_ = reports_["forensic_reports"]
if len(forensic_reports_) > 0:
hec_client.save_forensic_reports_to_splunk(forensic_reports_)
failure_reports_ = reports_["failure_reports"]
if len(failure_reports_) > 0:
hec_client.save_failure_reports_to_splunk(failure_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
@@ -1588,13 +1600,13 @@ def _main():
dce=opts.la_dce,
dcr_immutable_id=opts.la_dcr_immutable_id,
dcr_aggregate_stream=opts.la_dcr_aggregate_stream,
dcr_forensic_stream=opts.la_dcr_forensic_stream,
dcr_failure_stream=opts.la_dcr_failure_stream,
dcr_smtp_tls_stream=opts.la_dcr_smtp_tls_stream,
)
la_client.publish_results(
reports_,
opts.save_aggregate,
opts.save_forensic,
opts.save_failure,
opts.save_smtp_tls,
)
except loganalytics.LogAnalyticsException as e:
@@ -1618,10 +1630,10 @@ def _main():
arg_parser.add_argument(
"file_path",
nargs="*",
help="one or more paths to aggregate or forensic "
help="one or more paths to aggregate or failure "
"report files, emails, or mbox files'",
)
strip_attachment_help = "remove attachment payloads from forensic report output"
strip_attachment_help = "remove attachment payloads from failure report output"
arg_parser.add_argument(
"--strip-attachment-payloads", help=strip_attachment_help, action="store_true"
)
@@ -1634,9 +1646,9 @@ def _main():
default="aggregate.json",
)
arg_parser.add_argument(
"--forensic-json-filename",
help="filename for the forensic JSON output file",
default="forensic.json",
"--failure-json-filename",
help="filename for the failure JSON output file",
default="failure.json",
)
arg_parser.add_argument(
"--smtp-tls-json-filename",
@@ -1649,9 +1661,9 @@ def _main():
default="aggregate.csv",
)
arg_parser.add_argument(
"--forensic-csv-filename",
help="filename for the forensic CSV output file",
default="forensic.csv",
"--failure-csv-filename",
help="filename for the failure CSV output file",
default="failure.csv",
)
arg_parser.add_argument(
"--smtp-tls-csv-filename",
@@ -1706,7 +1718,7 @@ def _main():
arg_parser.add_argument("-v", "--version", action="version", version=__version__)
aggregate_reports = []
forensic_reports = []
failure_reports = []
smtp_tls_reports = []
args = arg_parser.parse_args()
@@ -1719,8 +1731,8 @@ def _main():
output=args.output,
aggregate_csv_filename=args.aggregate_csv_filename,
aggregate_json_filename=args.aggregate_json_filename,
forensic_csv_filename=args.forensic_csv_filename,
forensic_json_filename=args.forensic_json_filename,
failure_csv_filename=args.failure_csv_filename,
failure_json_filename=args.failure_json_filename,
smtp_tls_json_filename=args.smtp_tls_json_filename,
smtp_tls_csv_filename=args.smtp_tls_csv_filename,
nameservers=args.nameservers,
@@ -1733,7 +1745,7 @@ def _main():
verbose=args.verbose,
prettify_json=args.prettify_json,
save_aggregate=False,
save_forensic=False,
save_failure=False,
save_smtp_tls=False,
mailbox_reports_folder="INBOX",
mailbox_archive_folder="Archive",
@@ -1799,7 +1811,7 @@ def _main():
kafka_username=None,
kafka_password=None,
kafka_aggregate_topic=None,
kafka_forensic_topic=None,
kafka_failure_topic=None,
kafka_smtp_tls_topic=None,
kafka_ssl=False,
kafka_skip_certificate_verification=False,
@@ -1854,13 +1866,13 @@ def _main():
la_dce=None,
la_dcr_immutable_id=None,
la_dcr_aggregate_stream=None,
la_dcr_forensic_stream=None,
la_dcr_failure_stream=None,
la_dcr_smtp_tls_stream=None,
gelf_host=None,
gelf_port=None,
gelf_mode=None,
webhook_aggregate_url=None,
webhook_forensic_url=None,
webhook_failure_url=None,
webhook_smtp_tls_url=None,
webhook_timeout=60,
normalize_timespan_threshold_hours=24.0,
@@ -2062,8 +2074,8 @@ def _main():
"Skipping duplicate aggregate report "
f"from {report_org} with ID: {report_id}"
)
elif result[0]["report_type"] == "forensic":
forensic_reports.append(result[0]["report"])
elif result[0]["report_type"] == "failure":
failure_reports.append(result[0]["report"])
elif result[0]["report_type"] == "smtp_tls":
smtp_tls_reports.append(result[0]["report"])
@@ -2088,7 +2100,7 @@ def _main():
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
failure_reports += reports["failure_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
mailbox_connection = None
@@ -2230,7 +2242,7 @@ def _main():
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
failure_reports += reports["failure_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
except Exception:
@@ -2239,7 +2251,7 @@ def _main():
parsing_results: ParsingResults = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"failure_reports": failure_reports,
"smtp_tls_reports": smtp_tls_reports,
}
+1 -1
View File
@@ -1,4 +1,4 @@
__version__ = "9.11.2"
__version__ = "10.0.0"
USER_AGENT = f"parsedmarc/{__version__}"
+121 -66
View File
@@ -13,6 +13,7 @@ from elasticsearch_dsl import (
InnerDoc,
Integer,
Ip,
Keyword,
Nested,
Object,
Search,
@@ -21,7 +22,7 @@ from elasticsearch_dsl import (
)
from elasticsearch_dsl.search import Q
from parsedmarc import InvalidForensicReport
from parsedmarc import InvalidFailureReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -43,18 +44,23 @@ class _PublishedPolicy(InnerDoc):
sp = Text()
pct = Integer()
fo = Text()
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
class _DKIMResult(InnerDoc):
domain = Text()
selector = Text()
result = Text()
human_result = Text()
class _SPFResult(InnerDoc):
domain = Text()
scope = Text()
results = Text()
human_result = Text()
class _AggregateReportDoc(Document):
@@ -62,6 +68,7 @@ class _AggregateReportDoc(Document):
name = "dmarc_aggregate"
xml_schema = Text()
xml_namespace = Keyword()
org_name = Text()
org_email = Text()
org_extra_contact_info = Text()
@@ -93,17 +100,45 @@ class _AggregateReportDoc(Document):
envelope_to = Text()
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
generator = Text()
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue]
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
def add_dkim_result(
self,
domain: str,
selector: str,
result: _DKIMResult,
human_result: str = None,
):
self.dkim_results.append(
_DKIMResult(domain=domain, selector=selector, result=result)
_DKIMResult(
domain=domain,
selector=selector,
result=result,
human_result=human_result,
)
) # pyright: ignore[reportCallIssue]
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) # pyright: ignore[reportCallIssue]
def add_spf_result(
self,
domain: str,
scope: str,
result: _SPFResult,
human_result: str = None,
):
self.spf_results.append(
_SPFResult(
domain=domain,
scope=scope,
result=result,
human_result=human_result,
)
) # pyright: ignore[reportCallIssue]
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
self.passed_dmarc = False
@@ -123,7 +158,7 @@ class _EmailAttachmentDoc(Document):
sha256 = Text()
class _ForensicSampleDoc(InnerDoc):
class _FailureSampleDoc(InnerDoc):
raw = Text()
headers = Object()
headers_only = Boolean()
@@ -160,9 +195,9 @@ class _ForensicSampleDoc(InnerDoc):
) # pyright: ignore[reportCallIssue]
class _ForensicReportDoc(Document):
class _FailureReportDoc(Document):
class Index:
name = "dmarc_forensic"
name = "dmarc_failure"
feedback_type = Text()
user_agent = Text()
@@ -183,7 +218,7 @@ class _ForensicReportDoc(Document):
source_auth_failures = Text()
dkim_domain = Text()
original_rcpt_to = Text()
sample = Object(_ForensicSampleDoc)
sample = Object(_FailureSampleDoc)
class _SMTPTLSFailureDetailsDoc(InnerDoc):
@@ -336,20 +371,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
failure_indexes: Optional[list[str]] = None,
):
"""
Updates index mappings
Args:
aggregate_indexes (list): A list of aggregate index names
forensic_indexes (list): A list of forensic index names
failure_indexes (list): A list of failure index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if forensic_indexes is None:
forensic_indexes = []
if failure_indexes is None:
failure_indexes = []
for aggregate_index_name in aggregate_indexes:
if not Index(aggregate_index_name).exists():
continue
@@ -379,7 +414,7 @@ def migrate_indexes(
reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType]
Index(aggregate_index_name).delete()
for forensic_index in forensic_indexes:
for failure_index in failure_indexes:
pass
@@ -395,7 +430,7 @@ def save_aggregate_report_to_elasticsearch(
Saves a parsed DMARC aggregate report to Elasticsearch
Args:
aggregate_report (dict): A parsed forensic report
aggregate_report (dict): A parsed aggregate report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -463,6 +498,9 @@ def save_aggregate_report_to_elasticsearch(
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get("discovery_method"),
)
for record in aggregate_report["records"]:
@@ -479,6 +517,7 @@ def save_aggregate_report_to_elasticsearch(
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"],
xml_namespace=aggregate_report.get("xml_namespace"),
org_name=metadata["org_name"],
org_email=metadata["org_email"],
org_extra_contact_info=metadata["org_extra_contact_info"],
@@ -507,6 +546,12 @@ def save_aggregate_report_to_elasticsearch(
header_from=record["identifiers"]["header_from"],
envelope_from=record["identifiers"]["envelope_from"],
envelope_to=record["identifiers"]["envelope_to"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get(
"discovery_method"
),
generator=metadata.get("generator"),
)
for override in record["policy_evaluated"]["policy_override_reasons"]:
@@ -519,6 +564,7 @@ def save_aggregate_report_to_elasticsearch(
domain=dkim_result["domain"],
selector=dkim_result["selector"],
result=dkim_result["result"],
human_result=dkim_result.get("human_result"),
)
for spf_result in record["auth_results"]["spf"]:
@@ -526,6 +572,7 @@ def save_aggregate_report_to_elasticsearch(
domain=spf_result["domain"],
scope=spf_result["scope"],
result=spf_result["result"],
human_result=spf_result.get("human_result"),
)
index = "dmarc_aggregate"
@@ -547,8 +594,8 @@ def save_aggregate_report_to_elasticsearch(
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
def save_forensic_report_to_elasticsearch(
forensic_report: dict[str, Any],
def save_failure_report_to_elasticsearch(
failure_report: dict[str, Any],
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -556,10 +603,10 @@ def save_forensic_report_to_elasticsearch(
number_of_replicas: int = 0,
):
"""
Saves a parsed DMARC forensic report to Elasticsearch
Saves a parsed DMARC failure report to Elasticsearch
Args:
forensic_report (dict): A parsed forensic report
failure_report (dict): A parsed failure report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -572,26 +619,28 @@ def save_forensic_report_to_elasticsearch(
AlreadySaved
"""
logger.info("Saving forensic report to Elasticsearch")
forensic_report = forensic_report.copy()
logger.info("Saving failure report to Elasticsearch")
failure_report = failure_report.copy()
sample_date = None
if forensic_report["parsed_sample"]["date"] is not None:
sample_date = forensic_report["parsed_sample"]["date"]
if failure_report["parsed_sample"]["date"] is not None:
sample_date = failure_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
original_headers = failure_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_forensic_{0}*".format(index_suffix)
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(index_suffix)
else:
search_index = "dmarc_forensic*"
search_index = "dmarc_failure*,dmarc_forensic*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search_index = ",".join(
"{0}{1}".format(index_prefix, part) for part in search_index.split(",")
)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) # pyright: ignore[reportArgumentType]
@@ -632,67 +681,67 @@ def save_forensic_report_to_elasticsearch(
if len(existing) > 0:
raise AlreadySaved(
"A forensic sample to {0} from {1} "
"A failure sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"Elasticsearch".format(
to_, from_, subject, forensic_report["arrival_date_utc"]
to_, from_, subject, failure_report["arrival_date_utc"]
)
)
parsed_sample = forensic_report["parsed_sample"]
sample = _ForensicSampleDoc(
raw=forensic_report["sample"],
parsed_sample = failure_report["parsed_sample"]
sample = _FailureSampleDoc(
raw=failure_report["sample"],
headers=headers,
headers_only=forensic_report["sample_headers_only"],
headers_only=failure_report["sample_headers_only"],
date=sample_date,
subject=forensic_report["parsed_sample"]["subject"],
subject=failure_report["parsed_sample"]["subject"],
filename_safe_subject=parsed_sample["filename_safe_subject"],
body=forensic_report["parsed_sample"]["body"],
body=failure_report["parsed_sample"]["body"],
)
for address in forensic_report["parsed_sample"]["to"]:
for address in failure_report["parsed_sample"]["to"]:
sample.add_to(display_name=address["display_name"], address=address["address"])
for address in forensic_report["parsed_sample"]["reply_to"]:
for address in failure_report["parsed_sample"]["reply_to"]:
sample.add_reply_to(
display_name=address["display_name"], address=address["address"]
)
for address in forensic_report["parsed_sample"]["cc"]:
for address in failure_report["parsed_sample"]["cc"]:
sample.add_cc(display_name=address["display_name"], address=address["address"])
for address in forensic_report["parsed_sample"]["bcc"]:
for address in failure_report["parsed_sample"]["bcc"]:
sample.add_bcc(display_name=address["display_name"], address=address["address"])
for attachment in forensic_report["parsed_sample"]["attachments"]:
for attachment in failure_report["parsed_sample"]["attachments"]:
sample.add_attachment(
filename=attachment["filename"],
content_type=attachment["mail_content_type"],
sha256=attachment["sha256"],
)
try:
forensic_doc = _ForensicReportDoc(
feedback_type=forensic_report["feedback_type"],
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
failure_doc = _FailureReportDoc(
feedback_type=failure_report["feedback_type"],
user_agent=failure_report["user_agent"],
version=failure_report["version"],
original_mail_from=failure_report["original_mail_from"],
arrival_date=arrival_date_epoch_milliseconds,
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
delivery_results=forensic_report["delivery_result"],
source_ip_address=forensic_report["source"]["ip_address"],
source_country=forensic_report["source"]["country"],
source_reverse_dns=forensic_report["source"]["reverse_dns"],
source_base_domain=forensic_report["source"]["base_domain"],
source_asn=forensic_report["source"]["asn"],
source_as_name=forensic_report["source"]["as_name"],
source_as_domain=forensic_report["source"]["as_domain"],
authentication_mechanisms=forensic_report["authentication_mechanisms"],
auth_failure=forensic_report["auth_failure"],
dkim_domain=forensic_report["dkim_domain"],
original_rcpt_to=forensic_report["original_rcpt_to"],
domain=failure_report["reported_domain"],
original_envelope_id=failure_report["original_envelope_id"],
authentication_results=failure_report["authentication_results"],
delivery_results=failure_report["delivery_result"],
source_ip_address=failure_report["source"]["ip_address"],
source_country=failure_report["source"]["country"],
source_reverse_dns=failure_report["source"]["reverse_dns"],
source_base_domain=failure_report["source"]["base_domain"],
source_asn=failure_report["source"]["asn"],
source_as_name=failure_report["source"]["as_name"],
source_as_domain=failure_report["source"]["as_domain"],
authentication_mechanisms=failure_report["authentication_mechanisms"],
auth_failure=failure_report["auth_failure"],
dkim_domain=failure_report["dkim_domain"],
original_rcpt_to=failure_report["original_rcpt_to"],
sample=sample,
)
index = "dmarc_forensic"
index = "dmarc_failure"
if index_suffix:
index = "{0}_{1}".format(index, index_suffix)
if index_prefix:
@@ -706,14 +755,14 @@ def save_forensic_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
failure_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
try:
forensic_doc.save()
failure_doc.save()
except Exception as e:
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
except KeyError as e:
raise InvalidForensicReport(
"Forensic report missing required field: {0}".format(e.__str__())
raise InvalidFailureReport(
"Failure report missing required field: {0}".format(e.__str__())
)
@@ -867,3 +916,9 @@ def save_smtp_tls_report_to_elasticsearch(
smtp_tls_doc.save()
except Exception as e:
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
# Backward-compatible aliases
_ForensicSampleDoc = _FailureSampleDoc
_ForensicReportDoc = _FailureReportDoc
save_forensic_report_to_elasticsearch = save_failure_report_to_elasticsearch
+11 -5
View File
@@ -9,10 +9,12 @@ from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_failure_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
from typing import Any
from parsedmarc.types import AggregateReport, SMTPTLSReport
log_context_data = threading.local()
@@ -57,11 +59,11 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
def save_failure_report_to_gelf(self, failure_reports: list[dict[str, Any]]):
rows = parsed_failure_reports_to_csv_rows(failure_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
self.logger.info("parsedmarc failure report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
@@ -73,3 +75,7 @@ class GelfClient(object):
"""Remove and close the GELF handler, releasing its connection."""
self.logger.removeHandler(self.handler)
self.handler.close()
# Backward-compatible aliases
GelfClient.save_forensic_report_to_gelf = GelfClient.save_failure_report_to_gelf
+17 -13
View File
@@ -143,31 +143,31 @@ class KafkaClient(object):
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
def save_forensic_reports_to_kafka(
def save_failure_reports_to_kafka(
self,
forensic_reports: Union[dict[str, Any], list[dict[str, Any]]],
forensic_topic: str,
failure_reports: Union[dict[str, Any], list[dict[str, Any]]],
failure_topic: str,
):
"""
Saves forensic DMARC reports to Kafka, sends individual
Saves failure DMARC reports to Kafka, sends individual
records (slices) since Kafka requires messages to be <= 1MB
by default.
Args:
forensic_reports (list): A list of forensic report dicts
failure_reports (list): A list of failure report dicts
to save to Kafka
forensic_topic (str): The name of the Kafka topic
failure_topic (str): The name of the Kafka topic
"""
if isinstance(forensic_reports, dict):
forensic_reports = [forensic_reports]
if isinstance(failure_reports, dict):
failure_reports = [failure_reports]
if len(forensic_reports) < 1:
if len(failure_reports) < 1:
return
try:
logger.debug("Saving forensic reports to Kafka")
self.producer.send(forensic_topic, forensic_reports)
logger.debug("Saving failure reports to Kafka")
self.producer.send(failure_topic, failure_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Kafka error: Unknown topic or partition on broker")
except Exception as e:
@@ -188,7 +188,7 @@ class KafkaClient(object):
by default.
Args:
smtp_tls_reports (list): A list of forensic report dicts
smtp_tls_reports (list): A list of SMTP TLS report dicts
to save to Kafka
smtp_tls_topic (str): The name of the Kafka topic
@@ -200,7 +200,7 @@ class KafkaClient(object):
return
try:
logger.debug("Saving forensic reports to Kafka")
logger.debug("Saving SMTP TLS reports to Kafka")
self.producer.send(smtp_tls_topic, smtp_tls_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Kafka error: Unknown topic or partition on broker")
@@ -210,3 +210,7 @@ class KafkaClient(object):
self.producer.flush()
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
# Backward-compatible aliases
KafkaClient.save_forensic_reports_to_kafka = KafkaClient.save_failure_reports_to_kafka
+18 -18
View File
@@ -38,9 +38,9 @@ class LogAnalyticsConfig:
The Stream name where
the Aggregate DMARC reports
need to be pushed.
dcr_forensic_stream (str):
dcr_failure_stream (str):
The Stream name where
the Forensic DMARC reports
the Failure DMARC reports
need to be pushed.
dcr_smtp_tls_stream (str):
The Stream name where
@@ -56,7 +56,7 @@ class LogAnalyticsConfig:
dce: str,
dcr_immutable_id: str,
dcr_aggregate_stream: str,
dcr_forensic_stream: str,
dcr_failure_stream: str,
dcr_smtp_tls_stream: str,
):
self.client_id = client_id
@@ -65,7 +65,7 @@ class LogAnalyticsConfig:
self.dce = dce
self.dcr_immutable_id = dcr_immutable_id
self.dcr_aggregate_stream = dcr_aggregate_stream
self.dcr_forensic_stream = dcr_forensic_stream
self.dcr_failure_stream = dcr_failure_stream
self.dcr_smtp_tls_stream = dcr_smtp_tls_stream
@@ -84,7 +84,7 @@ class LogAnalyticsClient(object):
dce: str,
dcr_immutable_id: str,
dcr_aggregate_stream: str,
dcr_forensic_stream: str,
dcr_failure_stream: str,
dcr_smtp_tls_stream: str,
):
self.conf = LogAnalyticsConfig(
@@ -94,7 +94,7 @@ class LogAnalyticsClient(object):
dce=dce,
dcr_immutable_id=dcr_immutable_id,
dcr_aggregate_stream=dcr_aggregate_stream,
dcr_forensic_stream=dcr_forensic_stream,
dcr_failure_stream=dcr_failure_stream,
dcr_smtp_tls_stream=dcr_smtp_tls_stream,
)
if (
@@ -135,7 +135,7 @@ class LogAnalyticsClient(object):
self,
results: dict[str, Any],
save_aggregate: bool,
save_forensic: bool,
save_failure: bool,
save_smtp_tls: bool,
):
"""
@@ -146,13 +146,13 @@ class LogAnalyticsClient(object):
Args:
results (list):
The DMARC reports (Aggregate & Forensic)
The DMARC reports (Aggregate & Failure)
save_aggregate (bool):
Whether Aggregate reports can be saved into Log Analytics
save_forensic (bool):
Whether Forensic reports can be saved into Log Analytics
save_failure (bool):
Whether Failure reports can be saved into Log Analytics
save_smtp_tls (bool):
Whether Forensic reports can be saved into Log Analytics
Whether Failure reports can be saved into Log Analytics
"""
conf = self.conf
credential = ClientSecretCredential(
@@ -173,16 +173,16 @@ class LogAnalyticsClient(object):
)
logger.info("Successfully pushed aggregate reports.")
if (
results["forensic_reports"]
and conf.dcr_forensic_stream
and len(results["forensic_reports"]) > 0
and save_forensic
results["failure_reports"]
and conf.dcr_failure_stream
and len(results["failure_reports"]) > 0
and save_failure
):
logger.info("Publishing forensic reports.")
logger.info("Publishing failure reports.")
self.publish_json(
results["forensic_reports"], logs_client, conf.dcr_forensic_stream
results["failure_reports"], logs_client, conf.dcr_failure_stream
)
logger.info("Successfully pushed forensic reports.")
logger.info("Successfully pushed failure reports.")
if (
results["smtp_tls_reports"]
and conf.dcr_smtp_tls_stream
+121 -68
View File
@@ -14,6 +14,7 @@ from opensearchpy import (
InnerDoc,
Integer,
Ip,
Keyword,
Nested,
Object,
Q,
@@ -24,7 +25,7 @@ from opensearchpy import (
)
from opensearchpy.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc import InvalidFailureReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -46,18 +47,23 @@ class _PublishedPolicy(InnerDoc):
sp = Text()
pct = Integer()
fo = Text()
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
class _DKIMResult(InnerDoc):
domain = Text()
selector = Text()
result = Text()
human_result = Text()
class _SPFResult(InnerDoc):
domain = Text()
scope = Text()
results = Text()
human_result = Text()
class _AggregateReportDoc(Document):
@@ -65,6 +71,7 @@ class _AggregateReportDoc(Document):
name = "dmarc_aggregate"
xml_schema = Text()
xml_namespace = Keyword()
org_name = Text()
org_email = Text()
org_extra_contact_info = Text()
@@ -96,17 +103,45 @@ class _AggregateReportDoc(Document):
envelope_to = Text()
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
generator = Text()
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
def add_dkim_result(
self,
domain: str,
selector: str,
result: _DKIMResult,
human_result: str = None,
):
self.dkim_results.append(
_DKIMResult(domain=domain, selector=selector, result=result)
_DKIMResult(
domain=domain,
selector=selector,
result=result,
human_result=human_result,
)
)
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def add_spf_result(
self,
domain: str,
scope: str,
result: _SPFResult,
human_result: str = None,
):
self.spf_results.append(
_SPFResult(
domain=domain,
scope=scope,
result=result,
human_result=human_result,
)
)
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
self.passed_dmarc = False
@@ -126,7 +161,7 @@ class _EmailAttachmentDoc(Document):
sha256 = Text()
class _ForensicSampleDoc(InnerDoc):
class _FailureSampleDoc(InnerDoc):
raw = Text()
headers = Object()
headers_only = Boolean()
@@ -163,9 +198,9 @@ class _ForensicSampleDoc(InnerDoc):
)
class _ForensicReportDoc(Document):
class _FailureReportDoc(Document):
class Index:
name = "dmarc_forensic"
name = "dmarc_failure"
feedback_type = Text()
user_agent = Text()
@@ -186,7 +221,7 @@ class _ForensicReportDoc(Document):
source_auth_failures = Text()
dkim_domain = Text()
original_rcpt_to = Text()
sample = Object(_ForensicSampleDoc)
sample = Object(_FailureSampleDoc)
class _SMTPTLSFailureDetailsDoc(InnerDoc):
@@ -366,20 +401,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
failure_indexes: Optional[list[str]] = None,
):
"""
Updates index mappings
Args:
aggregate_indexes (list): A list of aggregate index names
forensic_indexes (list): A list of forensic index names
failure_indexes (list): A list of failure index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if forensic_indexes is None:
forensic_indexes = []
if failure_indexes is None:
failure_indexes = []
for aggregate_index_name in aggregate_indexes:
if not Index(aggregate_index_name).exists():
continue
@@ -409,7 +444,7 @@ def migrate_indexes(
reindex(connections.get_connection(), aggregate_index_name, new_index_name)
Index(aggregate_index_name).delete()
for forensic_index in forensic_indexes:
for failure_index in failure_indexes:
pass
@@ -425,7 +460,7 @@ def save_aggregate_report_to_opensearch(
Saves a parsed DMARC aggregate report to OpenSearch
Args:
aggregate_report (dict): A parsed forensic report
aggregate_report (dict): A parsed aggregate report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -493,6 +528,9 @@ def save_aggregate_report_to_opensearch(
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get("discovery_method"),
)
for record in aggregate_report["records"]:
@@ -509,6 +547,7 @@ def save_aggregate_report_to_opensearch(
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"],
xml_namespace=aggregate_report.get("xml_namespace"),
org_name=metadata["org_name"],
org_email=metadata["org_email"],
org_extra_contact_info=metadata["org_extra_contact_info"],
@@ -537,6 +576,12 @@ def save_aggregate_report_to_opensearch(
header_from=record["identifiers"]["header_from"],
envelope_from=record["identifiers"]["envelope_from"],
envelope_to=record["identifiers"]["envelope_to"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get(
"discovery_method"
),
generator=metadata.get("generator"),
)
for override in record["policy_evaluated"]["policy_override_reasons"]:
@@ -549,6 +594,7 @@ def save_aggregate_report_to_opensearch(
domain=dkim_result["domain"],
selector=dkim_result["selector"],
result=dkim_result["result"],
human_result=dkim_result.get("human_result"),
)
for spf_result in record["auth_results"]["spf"]:
@@ -556,6 +602,7 @@ def save_aggregate_report_to_opensearch(
domain=spf_result["domain"],
scope=spf_result["scope"],
result=spf_result["result"],
human_result=spf_result.get("human_result"),
)
index = "dmarc_aggregate"
@@ -577,8 +624,8 @@ def save_aggregate_report_to_opensearch(
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
def save_forensic_report_to_opensearch(
forensic_report: dict[str, Any],
def save_failure_report_to_opensearch(
failure_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
@@ -586,10 +633,10 @@ def save_forensic_report_to_opensearch(
number_of_replicas: int = 0,
):
"""
Saves a parsed DMARC forensic report to OpenSearch
Saves a parsed DMARC failure report to OpenSearch
Args:
forensic_report (dict): A parsed forensic report
failure_report (dict): A parsed failure report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -602,26 +649,28 @@ def save_forensic_report_to_opensearch(
AlreadySaved
"""
logger.info("Saving forensic report to OpenSearch")
forensic_report = forensic_report.copy()
logger.info("Saving failure report to OpenSearch")
failure_report = failure_report.copy()
sample_date = None
if forensic_report["parsed_sample"]["date"] is not None:
sample_date = forensic_report["parsed_sample"]["date"]
if failure_report["parsed_sample"]["date"] is not None:
sample_date = failure_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
original_headers = failure_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_forensic_{0}*".format(index_suffix)
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(index_suffix)
else:
search_index = "dmarc_forensic*"
search_index = "dmarc_failure*,dmarc_forensic*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search_index = ",".join(
"{0}{1}".format(index_prefix, part) for part in search_index.split(",")
)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
@@ -662,67 +711,65 @@ def save_forensic_report_to_opensearch(
if len(existing) > 0:
raise AlreadySaved(
"A forensic sample to {0} from {1} "
"A failure sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"OpenSearch".format(
to_, from_, subject, forensic_report["arrival_date_utc"]
)
"OpenSearch".format(to_, from_, subject, failure_report["arrival_date_utc"])
)
parsed_sample = forensic_report["parsed_sample"]
sample = _ForensicSampleDoc(
raw=forensic_report["sample"],
parsed_sample = failure_report["parsed_sample"]
sample = _FailureSampleDoc(
raw=failure_report["sample"],
headers=headers,
headers_only=forensic_report["sample_headers_only"],
headers_only=failure_report["sample_headers_only"],
date=sample_date,
subject=forensic_report["parsed_sample"]["subject"],
subject=failure_report["parsed_sample"]["subject"],
filename_safe_subject=parsed_sample["filename_safe_subject"],
body=forensic_report["parsed_sample"]["body"],
body=failure_report["parsed_sample"]["body"],
)
for address in forensic_report["parsed_sample"]["to"]:
for address in failure_report["parsed_sample"]["to"]:
sample.add_to(display_name=address["display_name"], address=address["address"])
for address in forensic_report["parsed_sample"]["reply_to"]:
for address in failure_report["parsed_sample"]["reply_to"]:
sample.add_reply_to(
display_name=address["display_name"], address=address["address"]
)
for address in forensic_report["parsed_sample"]["cc"]:
for address in failure_report["parsed_sample"]["cc"]:
sample.add_cc(display_name=address["display_name"], address=address["address"])
for address in forensic_report["parsed_sample"]["bcc"]:
for address in failure_report["parsed_sample"]["bcc"]:
sample.add_bcc(display_name=address["display_name"], address=address["address"])
for attachment in forensic_report["parsed_sample"]["attachments"]:
for attachment in failure_report["parsed_sample"]["attachments"]:
sample.add_attachment(
filename=attachment["filename"],
content_type=attachment["mail_content_type"],
sha256=attachment["sha256"],
)
try:
forensic_doc = _ForensicReportDoc(
feedback_type=forensic_report["feedback_type"],
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
failure_doc = _FailureReportDoc(
feedback_type=failure_report["feedback_type"],
user_agent=failure_report["user_agent"],
version=failure_report["version"],
original_mail_from=failure_report["original_mail_from"],
arrival_date=arrival_date_epoch_milliseconds,
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
delivery_results=forensic_report["delivery_result"],
source_ip_address=forensic_report["source"]["ip_address"],
source_country=forensic_report["source"]["country"],
source_reverse_dns=forensic_report["source"]["reverse_dns"],
source_base_domain=forensic_report["source"]["base_domain"],
source_asn=forensic_report["source"]["asn"],
source_as_name=forensic_report["source"]["as_name"],
source_as_domain=forensic_report["source"]["as_domain"],
authentication_mechanisms=forensic_report["authentication_mechanisms"],
auth_failure=forensic_report["auth_failure"],
dkim_domain=forensic_report["dkim_domain"],
original_rcpt_to=forensic_report["original_rcpt_to"],
domain=failure_report["reported_domain"],
original_envelope_id=failure_report["original_envelope_id"],
authentication_results=failure_report["authentication_results"],
delivery_results=failure_report["delivery_result"],
source_ip_address=failure_report["source"]["ip_address"],
source_country=failure_report["source"]["country"],
source_reverse_dns=failure_report["source"]["reverse_dns"],
source_base_domain=failure_report["source"]["base_domain"],
source_asn=failure_report["source"]["asn"],
source_as_name=failure_report["source"]["as_name"],
source_as_domain=failure_report["source"]["as_domain"],
authentication_mechanisms=failure_report["authentication_mechanisms"],
auth_failure=failure_report["auth_failure"],
dkim_domain=failure_report["dkim_domain"],
original_rcpt_to=failure_report["original_rcpt_to"],
sample=sample,
)
index = "dmarc_forensic"
index = "dmarc_failure"
if index_suffix:
index = "{0}_{1}".format(index, index_suffix)
if index_prefix:
@@ -736,14 +783,14 @@ def save_forensic_report_to_opensearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
forensic_doc.meta.index = index
failure_doc.meta.index = index
try:
forensic_doc.save()
failure_doc.save()
except Exception as e:
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
except KeyError as e:
raise InvalidForensicReport(
"Forensic report missing required field: {0}".format(e.__str__())
raise InvalidFailureReport(
"Failure report missing required field: {0}".format(e.__str__())
)
@@ -897,3 +944,9 @@ def save_smtp_tls_report_to_opensearch(
smtp_tls_doc.save()
except Exception as e:
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
# Backward-compatible aliases
_ForensicSampleDoc = _FailureSampleDoc
_ForensicReportDoc = _FailureReportDoc
save_forensic_report_to_opensearch = save_failure_report_to_opensearch
+6 -2
View File
@@ -56,8 +56,8 @@ class S3Client(object):
def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate")
def save_forensic_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "forensic")
def save_failure_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "failure")
def save_smtp_tls_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "smtp_tls")
@@ -101,3 +101,7 @@ class S3Client(object):
self.s3.meta.client.close()
except Exception:
pass
# Backward-compatible aliases
S3Client.save_forensic_report_to_s3 = S3Client.save_failure_report_to_s3
+14 -10
View File
@@ -139,28 +139,28 @@ class HECClient(object):
if response["code"] != 0:
raise SplunkError(response["text"])
def save_forensic_reports_to_splunk(
def save_failure_reports_to_splunk(
self,
forensic_reports: Union[list[dict[str, Any]], dict[str, Any]],
failure_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
"""
Saves forensic DMARC reports to Splunk
Saves failure DMARC reports to Splunk
Args:
forensic_reports (list): A list of forensic report dictionaries
failure_reports (list): A list of failure report dictionaries
to save in Splunk
"""
logger.debug("Saving forensic reports to Splunk")
if isinstance(forensic_reports, dict):
forensic_reports = [forensic_reports]
logger.debug("Saving failure reports to Splunk")
if isinstance(failure_reports, dict):
failure_reports = [failure_reports]
if len(forensic_reports) < 1:
if len(failure_reports) < 1:
return
json_str = ""
for report in forensic_reports:
for report in failure_reports:
data = self._common_data.copy()
data["sourcetype"] = "dmarc:forensic"
data["sourcetype"] = "dmarc:failure"
timestamp = human_timestamp_to_unix_timestamp(report["arrival_date_utc"])
data["time"] = timestamp
data["event"] = report.copy()
@@ -220,3 +220,7 @@ class HECClient(object):
def close(self):
"""Close the underlying HTTP session."""
self.session.close()
# Backward-compatible aliases
HECClient.save_forensic_reports_to_splunk = HECClient.save_failure_reports_to_splunk
+7 -3
View File
@@ -13,7 +13,7 @@ from typing import Any, Optional
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_failure_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
@@ -170,8 +170,8 @@ class SyslogClient(object):
for row in rows:
self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
def save_failure_report_to_syslog(self, failure_reports: list[dict[str, Any]]):
rows = parsed_failure_reports_to_csv_rows(failure_reports)
for row in rows:
self.logger.info(json.dumps(row))
@@ -184,3 +184,7 @@ class SyslogClient(object):
"""Remove and close the syslog handler, releasing its socket."""
self.logger.removeHandler(self.log_handler)
self.log_handler.close()
# Backward-compatible aliases
SyslogClient.save_forensic_report_to_syslog = SyslogClient.save_failure_report_to_syslog
+25 -10
View File
@@ -8,7 +8,7 @@ from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# For optional keys, use total=False TypedDicts.
ReportType = Literal["aggregate", "forensic", "smtp_tls"]
ReportType = Literal["aggregate", "failure", "smtp_tls"]
class AggregateReportMetadata(TypedDict):
@@ -21,6 +21,7 @@ class AggregateReportMetadata(TypedDict):
timespan_requires_normalization: bool
original_timespan_seconds: int
errors: List[str]
generator: Optional[str]
class AggregatePolicyPublished(TypedDict):
@@ -29,8 +30,11 @@ class AggregatePolicyPublished(TypedDict):
aspf: str
p: str
sp: str
pct: str
fo: str
pct: Optional[str]
fo: Optional[str]
np: Optional[str]
testing: Optional[str]
discovery_method: Optional[str]
class IPSourceInfo(TypedDict):
@@ -66,12 +70,14 @@ class AggregateAuthResultDKIM(TypedDict):
domain: str
result: str
selector: str
human_result: Optional[str]
class AggregateAuthResultSPF(TypedDict):
domain: str
result: str
scope: str
human_result: Optional[str]
class AggregateAuthResults(TypedDict):
@@ -100,6 +106,7 @@ class AggregateRecord(TypedDict):
class AggregateReport(TypedDict):
xml_schema: str
xml_namespace: Optional[str]
report_metadata: AggregateReportMetadata
policy_published: AggregatePolicyPublished
records: List[AggregateRecord]
@@ -122,7 +129,7 @@ ParsedEmail = TypedDict(
"ParsedEmail",
{
# This is a lightly-specified version of mailsuite/mailparser JSON.
# It focuses on the fields parsedmarc uses in forensic handling.
# It focuses on the fields parsedmarc uses in failure report handling.
"headers": Dict[str, Any],
"subject": Optional[str],
"filename_safe_subject": Optional[str],
@@ -141,7 +148,7 @@ ParsedEmail = TypedDict(
)
class ForensicReport(TypedDict):
class FailureReport(TypedDict):
feedback_type: Optional[str]
user_agent: Optional[str]
version: Optional[str]
@@ -162,6 +169,10 @@ class ForensicReport(TypedDict):
parsed_sample: ParsedEmail
# Backward-compatible alias
ForensicReport = FailureReport
class SMTPTLSFailureDetails(TypedDict):
result_type: str
failed_session_count: int
@@ -204,9 +215,13 @@ class AggregateParsedReport(TypedDict):
report: AggregateReport
class ForensicParsedReport(TypedDict):
report_type: Literal["forensic"]
report: ForensicReport
class FailureParsedReport(TypedDict):
report_type: Literal["failure"]
report: FailureReport
# Backward-compatible alias
ForensicParsedReport = FailureParsedReport
class SMTPTLSParsedReport(TypedDict):
@@ -214,10 +229,10 @@ class SMTPTLSParsedReport(TypedDict):
report: SMTPTLSReport
ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport]
ParsedReport = Union[AggregateParsedReport, FailureParsedReport, SMTPTLSParsedReport]
class ParsingResults(TypedDict):
aggregate_reports: List[AggregateReport]
forensic_reports: List[ForensicReport]
failure_reports: List[FailureReport]
smtp_tls_reports: List[SMTPTLSReport]
+1 -1
View File
@@ -57,7 +57,7 @@ _RETRYABLE_DNS_ERRORS = (
parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
null_file = open(os.devnull, "w")
null_file = subprocess.DEVNULL
mailparser_logger = logging.getLogger("mailparser")
mailparser_logger.setLevel(logging.CRITICAL)
psl = publicsuffixlist.PublicSuffixList()
+11 -5
View File
@@ -16,7 +16,7 @@ class WebhookClient(object):
def __init__(
self,
aggregate_url: str,
forensic_url: str,
failure_url: str,
smtp_tls_url: str,
timeout: Optional[int] = 60,
):
@@ -24,12 +24,12 @@ class WebhookClient(object):
Initializes the WebhookClient
Args:
aggregate_url (str): The aggregate report webhook url
forensic_url (str): The forensic report webhook url
failure_url (str): The failure report webhook url
smtp_tls_url (str): The smtp_tls report webhook url
timeout (int): The timeout to use when calling the webhooks
"""
self.aggregate_url = aggregate_url
self.forensic_url = forensic_url
self.failure_url = failure_url
self.smtp_tls_url = smtp_tls_url
self.timeout = timeout
self.session = requests.Session()
@@ -38,9 +38,9 @@ class WebhookClient(object):
"Content-Type": "application/json",
}
def save_forensic_report_to_webhook(self, report: str):
def save_failure_report_to_webhook(self, report: str):
try:
self._send_to_webhook(self.forensic_url, report)
self._send_to_webhook(self.failure_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
@@ -67,3 +67,9 @@ class WebhookClient(object):
def close(self):
"""Close the underlying HTTP session."""
self.session.close()
# Backward-compatible aliases
WebhookClient.save_forensic_report_to_webhook = (
WebhookClient.save_failure_report_to_webhook
)
@@ -0,0 +1,77 @@
<?xml version="1.0"?>
<feedback>
<version>2.0</version>
<report_metadata>
<org_name>example.net</org_name>
<email>postmaster@example.net</email>
<report_id>dmarcbis-test-report-001</report_id>
<date_range>
<begin>1700000000</begin>
<end>1700086399</end>
</date_range>
</report_metadata>
<policy_published>
<domain>example.com</domain>
<adkim>s</adkim>
<aspf>s</aspf>
<p>reject</p>
<sp>quarantine</sp>
<np>reject</np>
<testing>y</testing>
<discovery_method>treewalk</discovery_method>
<fo>1</fo>
</policy_published>
<record>
<row>
<source_ip>198.51.100.1</source_ip>
<count>5</count>
<policy_evaluated>
<disposition>none</disposition>
<dkim>pass</dkim>
<spf>pass</spf>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<dkim>
<domain>example.com</domain>
<selector>selector1</selector>
<result>pass</result>
</dkim>
<spf>
<domain>example.com</domain>
<scope>mfrom</scope>
<result>pass</result>
</spf>
</auth_results>
</record>
<record>
<row>
<source_ip>203.0.113.10</source_ip>
<count>2</count>
<policy_evaluated>
<disposition>reject</disposition>
<dkim>fail</dkim>
<spf>fail</spf>
<reason>
<type>other</type>
<comment>sender not authorized</comment>
</reason>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>spoofed.example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<spf>
<domain>spoofed.example.com</domain>
<scope>mfrom</scope>
<result>fail</result>
</spf>
</auth_results>
</record>
</feedback>
+48
View File
@@ -0,0 +1,48 @@
<feedback xmlns="urn:ietf:params:xml:ns:dmarc-2.0">
<version>1.0</version>
<report_metadata>
<org_name>Sample Reporter</org_name>
<email>report_sender@example-reporter.com</email>
<extra_contact_info>...</extra_contact_info>
<report_id>3v98abbp8ya9n3va8yr8oa3ya</report_id>
<date_range>
<begin>302832000</begin>
<end>302918399</end>
</date_range>
<generator>Example DMARC Aggregate Reporter v1.2</generator>
</report_metadata>
<policy_published>
<domain>example.com</domain>
<p>quarantine</p>
<sp>none</sp>
<np>none</np>
<testing>n</testing>
<discovery_method>treewalk</discovery_method>
</policy_published>
<record>
<row>
<source_ip>192.0.2.123</source_ip>
<count>123</count>
<policy_evaluated>
<disposition>pass</disposition>
<dkim>pass</dkim>
<spf>fail</spf>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<dkim>
<domain>example.com</domain>
<result>pass</result>
<selector>abc123</selector>
</dkim>
<spf>
<domain>example.com</domain>
<result>fail</result>
</spf>
</auth_results>
</record>
</feedback>
+2796 -73
View File
File diff suppressed because it is too large Load Diff