Compare commits

..

10 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
32e85399bf Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e
2026-03-20 21:30:35 +00:00
copilot-swe-agent[bot]
c3876bb9d9 Initial plan 2026-03-20 21:26:34 +00:00
Copilot
a05eb0807a Best-effort initialization for optional output clients in watch mode (#701)
* Initial plan

* Wrap optional output client init in try/except for best-effort initialization

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 17:18:08 -04:00
Sean Whalen
6fceb3e2ce Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 17:07:20 -04:00
Copilot
510d5d05a9 SIGHUP-based configuration reload: address review feedback (#700)
* Initial plan

* Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 16:57:00 -04:00
Copilot
3445438684 [WIP] SIGHUP-based configuration reload for watch mode (#699)
* Initial plan

* Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 15:54:40 -04:00
Copilot
17defb75b0 [WIP] SIGHUP-based configuration reload for watch mode (#698)
* Initial plan

* Fix reload state consistency, resource leaks, stale opts; add tests

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 15:40:44 -04:00
Sean Whalen
893d0a4f03 Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 15:28:38 -04:00
Sean Whalen
58e07140a8 Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 15:27:22 -04:00
Sean Whalen
dfdffe4947 Enhance mailbox connection watch method to support reload functionality
- Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so.
- Modified related tests to accommodate the new method signature.
- Changed logger calls from `critical` to `error` for consistency in logging severity.
- Added a new settings file for Claude with specific permissions for testing and code checks.
2026-03-20 15:00:21 -04:00
46 changed files with 782 additions and 4912 deletions

View File

@@ -1,15 +1,13 @@
{
"permissions": {
"allow": [
"Bash(git fetch:*)",
"Bash(python -c \"import py_compile; py_compile.compile\\(''parsedmarc/cli.py'', doraise=True\\)\")",
"Bash(ruff check:*)",
"Bash(ruff format:*)",
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
"Bash(ls tests*)",
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
"Bash(python -m pytest tests.py --no-header -q)"
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
],
"additionalDirectories": [
"/tmp"

View File

@@ -78,10 +78,7 @@ jobs:
run: |
pip install -e .
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/failure/*
- name: Test building packages
run: |
hatch build
parsedmarc --debug -c ci.ini samples/forensic/*
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:

View File

@@ -52,7 +52,6 @@
"geoipupdate",
"Geolite",
"geolocation",
"getuid",
"githubpages",
"Grafana",
"hostnames",
@@ -76,7 +75,6 @@
"LISTSERV",
"loganalytics",
"lxml",
"Maildir",
"mailparser",
"mailrelay",
"mailsuite",

View File

@@ -4,7 +4,7 @@ This file provides guidance to AI agents when working with code in this reposito
## Project Overview
parsedmarc is a Python module and CLI utility for parsing DMARC aggregate (RUA), failure/forensic (RUF), and SMTP TLS reports. It supports both RFC 7489 and DMARCbis (draft-ietf-dmarc-dmarcbis-41, draft-ietf-dmarc-aggregate-reporting-32, draft-ietf-dmarc-failure-reporting-24) report formats. It reads reports from IMAP, Microsoft Graph, Gmail API, Maildir, mbox files, or direct file paths, and outputs to JSON/CSV, Elasticsearch, OpenSearch, Splunk, Kafka, S3, Azure Log Analytics, syslog, or webhooks.
parsedmarc is a Python module and CLI utility for parsing DMARC aggregate (RUA), forensic (RUF), and SMTP TLS reports. It reads reports from IMAP, Microsoft Graph, Gmail API, Maildir, mbox files, or direct file paths, and outputs to JSON/CSV, Elasticsearch, OpenSearch, Splunk, Kafka, S3, Azure Log Analytics, syslog, or webhooks.
## Common Commands
@@ -24,7 +24,7 @@ ruff format .
# Test CLI with sample reports
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/failure/*
parsedmarc --debug -c ci.ini samples/forensic/*
# Build docs
cd docs && make html
@@ -41,24 +41,16 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
### Key modules
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_failure_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`. Legacy aliases (`parse_forensic_report`, etc.) are preserved for backward compatibility.
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing (`_load_config` + `_parse_config`), output orchestration. Supports configuration via INI files, `PARSEDMARC_{SECTION}_{KEY}` environment variables, or both (env vars override file values). Accepts both old (`save_forensic`, `forensic_topic`) and new (`save_failure`, `failure_topic`) config keys.
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `FailureReport`, `SMTPTLSReport`, `ParsingResults`). Legacy alias `ForensicReport = FailureReport` preserved.
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
- `parsedmarc/{elastic,opensearch,splunk,kafkaclient,loganalytics,syslog,s3,webhook,gelf}.py` — Output integrations
### Report type system
`ReportType = Literal["aggregate", "failure", "smtp_tls"]`. Exception hierarchy: `ParserError``InvalidDMARCReport``InvalidAggregateReport`/`InvalidFailureReport`, and `InvalidSMTPTLSReport`. Legacy alias `InvalidForensicReport = InvalidFailureReport` preserved.
### DMARCbis support
Aggregate reports support both RFC 7489 and DMARCbis formats. DMARCbis adds fields: `np` (non-existent subdomain policy), `testing` (replaces `pct`), `discovery_method` (`psl`/`treewalk`), `generator` (report metadata), and `human_result` (DKIM/SPF auth results). `pct` and `fo` default to `None` when absent (DMARCbis drops these). Namespaced XML is handled automatically.
### Configuration
Config priority: CLI args > env vars > config file > defaults. Env var naming: `PARSEDMARC_{SECTION}_{KEY}` (e.g. `PARSEDMARC_IMAP_PASSWORD`). Section names with underscores use longest-prefix matching (`PARSEDMARC_SPLUNK_HEC_TOKEN``[splunk_hec] token`). Some INI keys have short aliases for env var friendliness (e.g. `[maildir] create` for `maildir_create`). File path values are expanded via `os.path.expanduser`/`os.path.expandvars`. Config can be loaded purely from env vars with no file (`PARSEDMARC_CONFIG_FILE` sets the file path).
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError``InvalidDMARCReport``InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
### Caching
@@ -70,6 +62,3 @@ IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour
- TypedDict for structured data, type hints throughout
- Python ≥3.10 required
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`
- File path config values must be wrapped with `_expand_path()` in `cli.py`
- Maildir UID checks are intentionally relaxed (warn, don't crash) for Docker compatibility
- Token file writes must create parent directories before opening for write

View File

@@ -1,133 +1,23 @@
# Changelog
## 10.0.0
### Enhancements
#### Support for DMARCbis reports
New fields from the XSD schema, added to types, parsing, CSV output, and Elasticsearch/OpenSearch mappings:
- `np` — non-existent subdomain policy (`none`/`quarantine`/`reject`)
- `testing` — testing mode flag (`n`/`y`), replaces RFC7489 `pct`
- `discovery_method` — policy discovery method (`psl`/`treewalk`)
- `generator` — report generator software identifier (metadata)
- `human_result` — optional descriptive text on DKIM/SPF auth results
Backwards compatibility to RFC7489 is maintained.
### Breaking changes
#### Forensic reports have been renamed to failure reports
Forensic reports have been renamed to failure reports throughout the project to reflect the proper naming of the reports since RFC7489.
- **Core**: `types.py`, `__init__.py``ForensicReport``FailureReport`, `parse_forensic_report``parse_failure_report`, report type `"failure"`
- **Output modules**: `elastic.py`, `opensearch.py`, `splunk.py`, `kafkaclient.py`, `syslog.py`, `gelf.py`, `webhook.py`, `loganalytics.py`, `s3.py`
- **CLI**: `cli.py` — args, config keys, index names (`dmarc_failure`)
- **Docs & dashboards**: all markdown, Grafana JSON, Kibana NDJSON, Splunk XML
##### Backward compatibility
- Old function/type names preserved as aliases: `parse_forensic_report = parse_failure_report`, `ForensicReport = FailureReport`, etc.
- CLI config accepts both old (`save_forensic`, `forensic_topic`) and new keys (`save_failure`, `failure_topic`)
- RFC 7489 reports parse with `None` for DMARCbis-only fields
- **Updated dashboards with queries are backward compatible**: queries match data indexed under both old (`dmarc_forensic*` / `dmarc:forensic`) and new (`dmarc_failure*` / `dmarc:failure`) names, so dashboards show data from before and after the rename:
- **Kibana**: Index pattern uses `dmarc_f*` to match both `dmarc_forensic*` and `dmarc_failure*`
- **Splunk**: Base search queries `(sourcetype="dmarc:failure" OR sourcetype="dmarc:forensic")`
- **Elasticsearch/OpenSearch**: Duplicate-check searches query across both `dmarc_failure*` and `dmarc_forensic*` index patterns
## 9.5.4
### Fixed
- Maildir `fetch_messages` now respects the `reports_folder` argument. Previously it always read from the top-level Maildir, ignoring the configured reports folder. `fetch_message`, `delete_message`, and `move_message` now also operate on the correct active folder.
- Config key aliases for env var compatibility: `[maildir] create` and `path` are now accepted as aliases for `maildir_create` and `maildir_path`, and `[msgraph] url` for `graph_url`. This allows natural env var names like `PARSEDMARC_MAILDIR_CREATE` to work without the redundant `PARSEDMARC_MAILDIR_MAILDIR_CREATE`.
## 9.5.3
### Fixed
- Fixed `FileNotFoundError` when using Maildir with Docker volume mounts. Python's `mailbox.Maildir(create=True)` only creates `cur/new/tmp` subdirectories when the top-level directory doesn't exist; Docker volume mounts pre-create the directory as empty, skipping subdirectory creation. parsedmarc now explicitly creates the subdirectories when `maildir_create` is enabled.
- Maildir UID mismatch no longer crashes the process. In Docker containers where volume ownership differs from the container UID, parsedmarc now logs a warning instead of raising an exception. Also handles `os.setuid` failures gracefully in containers without `CAP_SETUID`.
- Token file writes (MS Graph and Gmail) now create parent directories automatically, preventing `FileNotFoundError` when the token path points to a directory that doesn't yet exist.
- File paths from config (`token_file`, `credentials_file`, `cert_path`, `log_file`, `output`, `ip_db_path`, `maildir_path`, syslog cert paths, etc.) now expand `~` and `$VAR` references via `os.path.expanduser`/`os.path.expandvars`.
## 9.5.2
### Fixed
- Fixed `ValueError: invalid interpolation syntax` when config values (from env vars or INI files) contain `%` characters, such as in passwords. Disabled ConfigParser's `%`-based string interpolation.
## 9.5.1
### Changes
- Correct ISO format for MSGraphConnection timestamps (PR #706)
## 9.5.0
### Added
- Environment variable configuration support: any config option can now be set via `PARSEDMARC_{SECTION}_{KEY}` environment variables (e.g. `PARSEDMARC_IMAP_PASSWORD`, `PARSEDMARC_SPLUNK_HEC_TOKEN`). Environment variables override config file values but are overridden by CLI arguments.
- `PARSEDMARC_CONFIG_FILE` environment variable to specify the config file path without the `-c` flag.
- Env-only mode: parsedmarc can now run without a config file when `PARSEDMARC_*` environment variables are set, enabling fully file-less Docker deployments.
- Explicit read permission check on config file, giving a clear error message when the container UID cannot read the file (e.g. `chmod 600` with a UID mismatch).
## 9.4.0
### Added
- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
- Add premade OpenSearch index patterns, visualizations, and dashboards
### Changed
- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
- Bump OpenSearch support to `< 4`
### Fixed
- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
## 9.3.1
### Breaking changes
- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
### Fixed
- Splunk HEC `skip_certificate_verification` now works correctly
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
## 9.3.0
### Added
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
- Use `systemctl reload parsedmarc` when running under `systemd`.
- On a successful reload, old output clients are closed and recreated.
- On a failed reload, the previous configuration remains fully active.
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
### Fixed
- `get_index_prefix()` crashed on failure reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
- SIGHUP-based configuration reload for watch mode — update output
destinations, DNS/GeoIP settings, processing flags, and log level
without restarting the service or interrupting in-progress report
processing. Use `systemctl reload parsedmarc` when running under
systemd.
- Extracted `_parse_config_file()` and `_init_output_clients()` from
`_main()` in `cli.py` to support config reload and reduce code
duplication.
## 9.2.1
### Added
- Better checking of `msgraph` configuration (PR #695)
- Better checking of `msconfig` configuration (PR #695)
### Changed

View File

@@ -1,5 +1,3 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
# CLAUD.md
@AGENTS.md

View File

@@ -15,7 +15,7 @@ services:
condition: service_healthy
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:3
image: opensearchproject/opensearch-dashboards:2
environment:
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
ports:
@@ -27,7 +27,7 @@ services:
grafana:
image: grafana/grafana:latest
environment:
- GRAFANA_PASSWORD=${GRAFANA_PASSWORD}
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
ports:
- "127.0.0.1:3000:3000"
@@ -41,7 +41,5 @@ services:
- SPLUNK_START_ARGS=--accept-license
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
ports:
- "127.0.0.1:8000:8000"
- "127.0.0.1:8088:8088"

View File

@@ -214,7 +214,7 @@ Kibana index patterns with versions that match the upgraded indexes:
1. Login in to Kibana, and click on Management
2. Under Kibana, click on Saved Objects
3. Check the checkboxes for the `dmarc_aggregate` and `dmarc_failure`
3. Check the checkboxes for the `dmarc_aggregate` and `dmarc_forensic`
index patterns
4. Click Delete
5. Click Delete on the conformation message

View File

@@ -2,7 +2,7 @@
[general]
save_aggregate = True
save_failure = True
save_forensic = True
[imap]
host = imap.example.com

View File

@@ -34,7 +34,7 @@ and Valimail.
## Features
- Parses draft and 1.0 standard aggregate/rua DMARC reports
- Parses failure/ruf DMARC reports
- Parses forensic/failure/ruf DMARC reports
- Parses reports from SMTP TLS Reporting
- Can parse reports from an inbox over IMAP, Microsoft Graph, or Gmail API
- Transparently handles gzip or zip compressed reports

View File

@@ -74,14 +74,14 @@ the DMARC Summary dashboard. To view failures only, use the pie chart.
Any other filters work the same way. You can also add your own custom temporary
filters by clicking on Add Filter at the upper right of the page.
## DMARC Failure Samples
## DMARC Forensic Samples
The DMARC Failure Samples dashboard contains information on DMARC failure
reports (also known as ruf reports). These reports contain
The DMARC Forensic Samples dashboard contains information on DMARC forensic
reports (also known as failure reports or ruf reports). These reports contain
samples of emails that have failed to pass DMARC.
:::{note}
Most recipients do not send failure/ruf reports at all to avoid
Most recipients do not send forensic/failure/ruf reports at all to avoid
privacy leaks. Some recipients (notably Chinese webmail services) will only
supply the headers of sample emails. Very few provide the entire email.
:::

View File

@@ -96,12 +96,12 @@ draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391
```
## Sample failure report output
## Sample forensic report output
Thanks to GitHub user [xennn](https://github.com/xennn) for the anonymized
[failure report email sample](<https://github.com/domainaware/parsedmarc/raw/master/samples/failure/DMARC%20Failure%20Report%20for%20domain.de%20(mail-from%3Dsharepoint%40domain.de%2C%20ip%3D10.10.10.10).eml>).
[forensic report email sample](<https://github.com/domainaware/parsedmarc/raw/master/samples/forensic/DMARC%20Failure%20Report%20for%20domain.de%20(mail-from%3Dsharepoint%40domain.de%2C%20ip%3D10.10.10.10).eml>).
### JSON failure report
### JSON forensic report
```json
{
@@ -190,7 +190,7 @@ Thanks to GitHub user [xennn](https://github.com/xennn) for the anonymized
}
```
### CSV failure report
### CSV forensic report
```text
feedback_type,user_agent,version,original_envelope_id,original_mail_from,original_rcpt_to,arrival_date,arrival_date_utc,subject,message_id,authentication_results,dkim_domain,source_ip_address,source_country,source_reverse_dns,source_base_domain,delivery_result,auth_failure,reported_domain,authentication_mechanisms,sample_headers_only

View File

@@ -1,10 +1,10 @@
# Splunk
Starting in version 4.3.0 `parsedmarc` supports sending aggregate and/or
failure DMARC data to a Splunk [HTTP Event collector (HEC)].
forensic DMARC data to a Splunk [HTTP Event collector (HEC)].
The project repository contains [XML files] for premade Splunk
dashboards for aggregate and failure DMARC reports.
dashboards for aggregate and forensic DMARC reports.
Copy and paste the contents of each file into a separate Splunk
dashboard XML editor.

View File

@@ -4,9 +4,9 @@
```text
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--failure-json-filename FAILURE_JSON_FILENAME]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--forensic-json-filename FORENSIC_JSON_FILENAME]
[--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME] [--aggregate-csv-filename AGGREGATE_CSV_FILENAME]
[--failure-csv-filename FAILURE_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME]
[--forensic-csv-filename FORENSIC_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME]
[-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] [-s] [-w] [--verbose] [--debug]
[--log-file LOG_FILE] [--no-prettify-json] [-v]
[file_path ...]
@@ -14,26 +14,26 @@ usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT
Parses DMARC reports
positional arguments:
file_path one or more paths to aggregate or failure report files, emails, or mbox files'
file_path one or more paths to aggregate or forensic report files, emails, or mbox files'
options:
-h, --help show this help message and exit
-c CONFIG_FILE, --config-file CONFIG_FILE
a path to a configuration file (--silent implied)
--strip-attachment-payloads
remove attachment payloads from failure report output
remove attachment payloads from forensic report output
-o OUTPUT, --output OUTPUT
write output files to the given directory
--aggregate-json-filename AGGREGATE_JSON_FILENAME
filename for the aggregate JSON output file
--failure-json-filename FAILURE_JSON_FILENAME
filename for the failure JSON output file
--forensic-json-filename FORENSIC_JSON_FILENAME
filename for the forensic JSON output file
--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME
filename for the SMTP TLS JSON output file
--aggregate-csv-filename AGGREGATE_CSV_FILENAME
filename for the aggregate CSV output file
--failure-csv-filename FAILURE_CSV_FILENAME
filename for the failure CSV output file
--forensic-csv-filename FORENSIC_CSV_FILENAME
filename for the forensic CSV output file
--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME
filename for the SMTP TLS CSV output file
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
@@ -70,7 +70,7 @@ For example
[general]
save_aggregate = True
save_failure = True
save_forensic = True
[imap]
host = imap.example.com
@@ -109,7 +109,7 @@ mode = tcp
[webhook]
aggregate_url = https://aggregate_url.example.com
failure_url = https://failure_url.example.com
forensic_url = https://forensic_url.example.com
smtp_tls_url = https://smtp_tls_url.example.com
timeout = 60
```
@@ -119,7 +119,7 @@ The full set of configuration options are:
- `general`
- `save_aggregate` - bool: Save aggregate report data to
Elasticsearch, Splunk and/or S3
- `save_failure` - bool: Save failure report data to
- `save_forensic` - bool: Save forensic report data to
Elasticsearch, Splunk and/or S3
- `save_smtp_tls` - bool: Save SMTP-STS report data to
Elasticsearch, Splunk and/or S3
@@ -130,7 +130,7 @@ The full set of configuration options are:
- `output` - str: Directory to place JSON and CSV files in. This is required if you set either of the JSON output file options.
- `aggregate_json_filename` - str: filename for the aggregate
JSON output file
- `failure_json_filename` - str: filename for the failure
- `forensic_json_filename` - str: filename for the forensic
JSON output file
- `ip_db_path` - str: An optional custom path to a MMDB file
from MaxMind or DBIP
@@ -273,8 +273,6 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -302,8 +300,6 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -325,7 +321,7 @@ The full set of configuration options are:
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `aggregate_topic` - str: The Kafka topic for aggregate reports
- `failure_topic` - str: The Kafka topic for failure reports
- `forensic_topic` - str: The Kafka topic for forensic reports
- `smtp`
- `host` - str: The SMTP hostname
- `port` - int: The SMTP port (Default: `25`)
@@ -443,7 +439,7 @@ The full set of configuration options are:
- `dce` - str: The Data Collection Endpoint (DCE). Example: `https://{DCE-NAME}.{REGION}.ingest.monitor.azure.com`.
- `dcr_immutable_id` - str: The immutable ID of the Data Collection Rule (DCR)
- `dcr_aggregate_stream` - str: The stream name for aggregate reports in the DCR
- `dcr_failure_stream` - str: The stream name for the failure reports in the DCR
- `dcr_forensic_stream` - str: The stream name for the forensic reports in the DCR
- `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR
:::{note}
@@ -460,7 +456,7 @@ The full set of configuration options are:
- `webhook` - Post the individual reports to a webhook url with the report as the JSON body
- `aggregate_url` - str: URL of the webhook which should receive the aggregate reports
- `failure_url` - str: URL of the webhook which should receive the failure reports
- `forensic_url` - str: URL of the webhook which should receive the forensic reports
- `smtp_tls_url` - str: URL of the webhook which should receive the smtp_tls reports
- `timeout` - int: Interval in which the webhook call should timeout
@@ -475,26 +471,26 @@ blocks DNS requests to outside resolvers.
:::
:::{note}
`save_aggregate` and `save_failure` are separate options
because you may not want to save failure reports
(formerly known as forensic reports) to your Elasticsearch instance,
`save_aggregate` and `save_forensic` are separate options
because you may not want to save forensic reports
(also known as failure reports) to your Elasticsearch instance,
particularly if you are in a highly-regulated industry that
handles sensitive data, such as healthcare or finance. If your
legitimate outgoing email fails DMARC, it is possible
that email may appear later in a failure report.
that email may appear later in a forensic report.
Failure reports contain the original headers of an email that
Forensic reports contain the original headers of an email that
failed a DMARC check, and sometimes may also include the
full message body, depending on the policy of the reporting
organization.
Most reporting organizations do not send failure reports of any
Most reporting organizations do not send forensic reports of any
kind for privacy reasons. While aggregate DMARC reports are sent
at least daily, it is normal to receive very few failure reports.
at least daily, it is normal to receive very few forensic reports.
An alternative approach is to still collect failure/ruf
An alternative approach is to still collect forensic/failure/ruf
reports in your DMARC inbox, but run `parsedmarc` with
```save_failure = True``` manually on a separate IMAP folder (using
```save_forensic = True``` manually on a separate IMAP folder (using
the ```reports_folder``` option), after you have manually moved
known samples you want to save to that folder
(e.g. malicious samples and non-sensitive legitimate samples).
@@ -531,96 +527,6 @@ PUT _cluster/settings
Increasing this value increases resource usage.
:::
## Environment variable configuration
Any configuration option can be set via environment variables using the
naming convention `PARSEDMARC_{SECTION}_{KEY}` (uppercase). This is
especially useful for Docker deployments where file permissions make it
difficult to use config files for secrets.
**Priority order:** CLI arguments > environment variables > config file > defaults
### Examples
```bash
# Set IMAP credentials via env vars
export PARSEDMARC_IMAP_HOST=imap.example.com
export PARSEDMARC_IMAP_USER=dmarc@example.com
export PARSEDMARC_IMAP_PASSWORD=secret
# Elasticsearch
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://localhost:9200
export PARSEDMARC_ELASTICSEARCH_SSL=false
# Splunk HEC (note: section name splunk_hec becomes SPLUNK_HEC)
export PARSEDMARC_SPLUNK_HEC_URL=https://splunk.example.com
export PARSEDMARC_SPLUNK_HEC_TOKEN=my-hec-token
export PARSEDMARC_SPLUNK_HEC_INDEX=email
# General settings
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
export PARSEDMARC_GENERAL_DEBUG=true
```
### Specifying the config file via environment variable
```bash
export PARSEDMARC_CONFIG_FILE=/etc/parsedmarc.ini
parsedmarc
```
### Running without a config file (env-only mode)
When no config file is given (neither `-c` flag nor `PARSEDMARC_CONFIG_FILE`),
parsedmarc will still pick up any `PARSEDMARC_*` environment variables. This
enables fully file-less deployments:
```bash
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
export PARSEDMARC_GENERAL_OFFLINE=true
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://elasticsearch:9200
parsedmarc /path/to/reports/*
```
### Docker Compose example
```yaml
services:
parsedmarc:
image: parsedmarc:latest
environment:
PARSEDMARC_IMAP_HOST: imap.example.com
PARSEDMARC_IMAP_USER: dmarc@example.com
PARSEDMARC_IMAP_PASSWORD: ${IMAP_PASSWORD}
PARSEDMARC_MAILBOX_WATCH: "true"
PARSEDMARC_ELASTICSEARCH_HOSTS: http://elasticsearch:9200
PARSEDMARC_GENERAL_SAVE_AGGREGATE: "true"
PARSEDMARC_GENERAL_SAVE_FORENSIC: "true"
```
### Section name mapping
For sections with underscores in the name, the full section name is used:
| Section | Env var prefix |
|------------------|-------------------------------|
| `general` | `PARSEDMARC_GENERAL_` |
| `mailbox` | `PARSEDMARC_MAILBOX_` |
| `imap` | `PARSEDMARC_IMAP_` |
| `msgraph` | `PARSEDMARC_MSGRAPH_` |
| `elasticsearch` | `PARSEDMARC_ELASTICSEARCH_` |
| `opensearch` | `PARSEDMARC_OPENSEARCH_` |
| `splunk_hec` | `PARSEDMARC_SPLUNK_HEC_` |
| `kafka` | `PARSEDMARC_KAFKA_` |
| `smtp` | `PARSEDMARC_SMTP_` |
| `s3` | `PARSEDMARC_S3_` |
| `syslog` | `PARSEDMARC_SYSLOG_` |
| `gmail_api` | `PARSEDMARC_GMAIL_API_` |
| `maildir` | `PARSEDMARC_MAILDIR_` |
| `log_analytics` | `PARSEDMARC_LOG_ANALYTICS_` |
| `gelf` | `PARSEDMARC_GELF_` |
| `webhook` | `PARSEDMARC_WEBHOOK_` |
## Performance tuning
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
@@ -760,15 +666,8 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
Gmail API, Maildir path) are **not** reloaded — changing those still
requires a full restart.
On a **successful** reload, existing output client connections are
closed and new ones are created from the updated configuration. The
service then resumes watching with the new settings.
If the new configuration file contains errors (missing required
settings, unreachable output destinations, etc.), the **entire reload
is aborted** — no output clients are replaced and the previous
configuration remains fully active. This means a typo in one section
will not take down an otherwise working setup. Check the logs for
If the new configuration file contains errors, the reload is aborted
and the previous configuration remains active. Check the logs for
details:
```bash

View File

@@ -83,7 +83,7 @@
"id": 28,
"panels": [
{
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Failure Samples\r\nThe DMARC Failure Samples section contains information on DMARC failure reports (also known as ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is “External Destination Verification”?](https://dmarcian.com/what-is-external-destination-verification/)",
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Forensic Samples\r\nThe DMARC Forensic Samples section contains information on DMARC forensic reports (also known as failure reports or ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send forensic/failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is “External Destination Verification”?](https://dmarcian.com/what-is-external-destination-verification/)",
"datasource": null,
"fieldConfig": {
"defaults": {
@@ -101,7 +101,7 @@
"links": [],
"mode": "markdown",
"options": {
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Failure Samples\r\nThe DMARC Failure Samples section contains information on DMARC failure reports (also known as ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is “External Destination Verification”?](https://dmarcian.com/what-is-external-destination-verification/)",
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Forensic Samples\r\nThe DMARC Forensic Samples section contains information on DMARC forensic reports (also known as failure reports or ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send forensic/failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is “External Destination Verification”?](https://dmarcian.com/what-is-external-destination-verification/)",
"mode": "markdown"
},
"pluginVersion": "7.1.0",

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -48,8 +48,7 @@ from parsedmarc.mail import (
)
from parsedmarc.types import (
AggregateReport,
FailureReport,
ForensicReport as ForensicReport,
ForensicReport,
ParsedReport,
ParsingResults,
SMTPTLSReport,
@@ -74,7 +73,6 @@ text_report_regex = re.compile(r"\s*([a-zA-Z\s]+):\s(.+)", re.MULTILINE)
MAGIC_ZIP = b"\x50\x4b\x03\x04"
MAGIC_GZIP = b"\x1f\x8b"
MAGIC_XML = b"\x3c\x3f\x78\x6d\x6c\x20"
MAGIC_XML_TAG = b"\x3c" # '<' - XML starting with an element tag (no declaration)
MAGIC_JSON = b"\7b"
EMAIL_SAMPLE_CONTENT_TYPES = (
@@ -109,12 +107,8 @@ class InvalidAggregateReport(InvalidDMARCReport):
"""Raised when an invalid DMARC aggregate report is encountered"""
class InvalidFailureReport(InvalidDMARCReport):
"""Raised when an invalid DMARC failure report is encountered"""
# Backward-compatible alias
InvalidForensicReport = InvalidFailureReport
class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered"""
def _bucket_interval_by_day(
@@ -354,6 +348,8 @@ def _parse_report_record(
}
if "disposition" in policy_evaluated:
new_policy_evaluated["disposition"] = policy_evaluated["disposition"]
if new_policy_evaluated["disposition"].strip().lower() == "pass":
new_policy_evaluated["disposition"] = "none"
if "dkim" in policy_evaluated:
new_policy_evaluated["dkim"] = policy_evaluated["dkim"]
if "spf" in policy_evaluated:
@@ -414,7 +410,6 @@ def _parse_report_record(
new_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_result["human_result"] = result.get("human_result", None)
new_record["auth_results"]["dkim"].append(new_result)
if not isinstance(auth_results["spf"], list):
@@ -430,7 +425,6 @@ def _parse_report_record(
new_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_result["human_result"] = result.get("human_result", None)
new_record["auth_results"]["spf"].append(new_result)
if "envelope_from" not in new_record["identifiers"]:
@@ -783,10 +777,6 @@ def parse_aggregate_report_xml(
else:
errors = report["report_metadata"]["error"]
new_report_metadata["errors"] = errors
generator = None
if "generator" in report_metadata:
generator = report_metadata["generator"]
new_report_metadata["generator"] = generator
new_report["report_metadata"] = new_report_metadata
records = []
policy_published = report["policy_published"]
@@ -810,39 +800,16 @@ def parse_aggregate_report_xml(
if policy_published["sp"] is not None:
sp = policy_published["sp"]
new_policy_published["sp"] = sp
pct = None
pct = "100"
if "pct" in policy_published:
if policy_published["pct"] is not None:
pct = policy_published["pct"]
new_policy_published["pct"] = pct
fo = None
fo = "0"
if "fo" in policy_published:
if policy_published["fo"] is not None:
fo = policy_published["fo"]
new_policy_published["fo"] = fo
np_ = None
if "np" in policy_published:
if policy_published["np"] is not None:
np_ = policy_published["np"]
if np_ not in ("none", "quarantine", "reject"):
logger.warning("Invalid np value: {0}".format(np_))
new_policy_published["np"] = np_
testing = None
if "testing" in policy_published:
if policy_published["testing"] is not None:
testing = policy_published["testing"]
if testing not in ("n", "y"):
logger.warning("Invalid testing value: {0}".format(testing))
new_policy_published["testing"] = testing
discovery_method = None
if "discovery_method" in policy_published:
if policy_published["discovery_method"] is not None:
discovery_method = policy_published["discovery_method"]
if discovery_method not in ("psl", "treewalk"):
logger.warning(
"Invalid discovery_method value: {0}".format(discovery_method)
)
new_policy_published["discovery_method"] = discovery_method
new_report["policy_published"] = new_policy_published
if type(report["record"]) is list:
@@ -975,7 +942,6 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
)
elif (
header[: len(MAGIC_XML)] == MAGIC_XML
or header[: len(MAGIC_XML_TAG)] == MAGIC_XML_TAG
or header[: len(MAGIC_JSON)] == MAGIC_JSON
):
report = file_object.read().decode(errors="ignore")
@@ -1101,9 +1067,6 @@ def parsed_aggregate_reports_to_csv_rows(
sp = report["policy_published"]["sp"]
pct = report["policy_published"]["pct"]
fo = report["policy_published"]["fo"]
np_ = report["policy_published"].get("np", None)
testing = report["policy_published"].get("testing", None)
discovery_method = report["policy_published"].get("discovery_method", None)
report_dict: dict[str, Any] = dict(
xml_schema=xml_schema,
@@ -1120,11 +1083,8 @@ def parsed_aggregate_reports_to_csv_rows(
aspf=aspf,
p=p,
sp=sp,
np=np_,
pct=pct,
fo=fo,
testing=testing,
discovery_method=discovery_method,
)
for record in report["records"]:
@@ -1220,11 +1180,8 @@ def parsed_aggregate_reports_to_csv(
"aspf",
"p",
"sp",
"np",
"pct",
"fo",
"testing",
"discovery_method",
"source_ip_address",
"source_country",
"source_reverse_dns",
@@ -1262,7 +1219,7 @@ def parsed_aggregate_reports_to_csv(
return csv_file_object.getvalue()
def parse_failure_report(
def parse_forensic_report(
feedback_report: str,
sample: str,
msg_date: datetime,
@@ -1275,9 +1232,9 @@ def parse_failure_report(
nameservers: Optional[list[str]] = None,
dns_timeout: float = 2.0,
strip_attachment_payloads: bool = False,
) -> FailureReport:
) -> ForensicReport:
"""
Converts a DMARC failure report and sample to a dict
Converts a DMARC forensic report and sample to a dict
Args:
feedback_report (str): A message's feedback report as a string
@@ -1292,7 +1249,7 @@ def parse_failure_report(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
Returns:
dict: A parsed report and sample
@@ -1308,7 +1265,7 @@ def parse_failure_report(
if "arrival_date" not in parsed_report:
if msg_date is None:
raise InvalidFailureReport("Failure sample is not a valid email")
raise InvalidForensicReport("Forensic sample is not a valid email")
parsed_report["arrival_date"] = msg_date.isoformat()
if "version" not in parsed_report:
@@ -1394,27 +1351,27 @@ def parse_failure_report(
parsed_report["sample"] = sample
parsed_report["parsed_sample"] = parsed_sample
return cast(FailureReport, parsed_report)
return cast(ForensicReport, parsed_report)
except KeyError as error:
raise InvalidFailureReport("Missing value: {0}".format(error.__str__()))
raise InvalidForensicReport("Missing value: {0}".format(error.__str__()))
except Exception as error:
raise InvalidFailureReport("Unexpected error: {0}".format(error.__str__()))
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_failure_reports_to_csv_rows(
reports: Union[FailureReport, list[FailureReport]],
def parsed_forensic_reports_to_csv_rows(
reports: Union[ForensicReport, list[ForensicReport]],
) -> list[dict[str, Any]]:
"""
Converts one or more parsed failure reports to a list of dicts in flat CSV
Converts one or more parsed forensic reports to a list of dicts in flat CSV
format
Args:
reports: A parsed failure report or list of parsed failure reports
reports: A parsed forensic report or list of parsed forensic reports
Returns:
list: Parsed failure report data as a list of dicts in flat CSV format
list: Parsed forensic report data as a list of dicts in flat CSV format
"""
if isinstance(reports, dict):
reports = [reports]
@@ -1441,18 +1398,18 @@ def parsed_failure_reports_to_csv_rows(
return rows
def parsed_failure_reports_to_csv(
reports: Union[FailureReport, list[FailureReport]],
def parsed_forensic_reports_to_csv(
reports: Union[ForensicReport, list[ForensicReport]],
) -> str:
"""
Converts one or more parsed failure reports to flat CSV format, including
Converts one or more parsed forensic reports to flat CSV format, including
headers
Args:
reports: A parsed failure report or list of parsed failure reports
reports: A parsed forensic report or list of parsed forensic reports
Returns:
str: Parsed failure report data in flat CSV format, including headers
str: Parsed forensic report data in flat CSV format, including headers
"""
fields = [
"feedback_type",
@@ -1484,7 +1441,7 @@ def parsed_failure_reports_to_csv(
csv_writer = DictWriter(csv_file, fieldnames=fields)
csv_writer.writeheader()
rows = parsed_failure_reports_to_csv_rows(reports)
rows = parsed_forensic_reports_to_csv_rows(reports)
for row in rows:
new_row: dict[str, Any] = {}
@@ -1522,13 +1479,13 @@ def parse_report_email(
nameservers (list): A list of one or more nameservers to use
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
keep_alive (callable): keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict:
* ``report_type``: ``aggregate`` or ``failure``
* ``report_type``: ``aggregate`` or ``forensic``
* ``report``: The parsed report
"""
result: Optional[ParsedReport] = None
@@ -1671,7 +1628,7 @@ def parse_report_email(
if feedback_report and sample:
try:
failure_report = parse_failure_report(
forensic_report = parse_forensic_report(
feedback_report,
sample,
msg_date,
@@ -1684,17 +1641,17 @@ def parse_report_email(
dns_timeout=dns_timeout,
strip_attachment_payloads=strip_attachment_payloads,
)
except InvalidFailureReport as e:
except InvalidForensicReport as e:
error = (
'Message with subject "{0}" '
"is not a valid "
"failure DMARC report: {1}".format(subject, e)
"forensic DMARC report: {1}".format(subject, e)
)
raise InvalidFailureReport(error)
raise InvalidForensicReport(error)
except Exception as e:
raise InvalidFailureReport(e.__str__())
raise InvalidForensicReport(e.__str__())
result = {"report_type": "failure", "report": failure_report}
result = {"report_type": "forensic", "report": forensic_report}
return result
if result is None:
@@ -1718,7 +1675,7 @@ def parse_report_file(
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24,
) -> ParsedReport:
"""Parses a DMARC aggregate or failure file at the given path, a
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
Args:
@@ -1728,7 +1685,7 @@ def parse_report_file(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
ip_db_path (str): Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map
@@ -1819,7 +1776,7 @@ def get_dmarc_reports_from_mbox(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map file
reverse_dns_map_url (str): URL to a reverse DNS map file
@@ -1828,11 +1785,11 @@ def get_dmarc_reports_from_mbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
"""
aggregate_reports: list[AggregateReport] = []
failure_reports: list[FailureReport] = []
forensic_reports: list[ForensicReport] = []
smtp_tls_reports: list[SMTPTLSReport] = []
try:
mbox = mailbox.mbox(input_)
@@ -1869,8 +1826,8 @@ def get_dmarc_reports_from_mbox(
"Skipping duplicate aggregate report "
f"from {report_org} with ID: {report_id}"
)
elif parsed_email["report_type"] == "failure":
failure_reports.append(parsed_email["report"])
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
elif parsed_email["report_type"] == "smtp_tls":
smtp_tls_reports.append(parsed_email["report"])
except InvalidDMARCReport as error:
@@ -1879,7 +1836,7 @@ def get_dmarc_reports_from_mbox(
raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_))
return {
"aggregate_reports": aggregate_reports,
"failure_reports": failure_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
@@ -1922,7 +1879,7 @@ def get_dmarc_reports_from_mailbox(
nameservers (list): A list of DNS nameservers to query
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
(use 0 for no limit)
@@ -1933,7 +1890,7 @@ def get_dmarc_reports_from_mailbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
"""
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
@@ -1945,25 +1902,25 @@ def get_dmarc_reports_from_mailbox(
current_time: Optional[Union[datetime, date, str]] = None
aggregate_reports: list[AggregateReport] = []
failure_reports: list[FailureReport] = []
forensic_reports: list[ForensicReport] = []
smtp_tls_reports: list[SMTPTLSReport] = []
aggregate_report_msg_uids = []
failure_report_msg_uids = []
forensic_report_msg_uids = []
smtp_tls_msg_uids = []
aggregate_reports_folder = "{0}/Aggregate".format(archive_folder)
failure_reports_folder = "{0}/Forensic".format(archive_folder)
forensic_reports_folder = "{0}/Forensic".format(archive_folder)
smtp_tls_reports_folder = "{0}/SMTP-TLS".format(archive_folder)
invalid_reports_folder = "{0}/Invalid".format(archive_folder)
if results:
aggregate_reports = results["aggregate_reports"].copy()
failure_reports = results["failure_reports"].copy()
forensic_reports = results["forensic_reports"].copy()
smtp_tls_reports = results["smtp_tls_reports"].copy()
if not test and create_folders:
connection.create_folder(archive_folder)
connection.create_folder(aggregate_reports_folder)
connection.create_folder(failure_reports_folder)
connection.create_folder(forensic_reports_folder)
connection.create_folder(smtp_tls_reports_folder)
connection.create_folder(invalid_reports_folder)
@@ -1998,7 +1955,9 @@ def get_dmarc_reports_from_mailbox(
)
current_time = datetime.now(timezone.utc).strftime("%d-%b-%Y")
elif isinstance(connection, MSGraphConnection):
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).isoformat()
since = (
datetime.now(timezone.utc) - timedelta(minutes=_since)
).isoformat() + "Z"
current_time = datetime.now(timezone.utc).isoformat() + "Z"
elif isinstance(connection, GmailConnection):
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime(
@@ -2065,9 +2024,9 @@ def get_dmarc_reports_from_mailbox(
f"Skipping duplicate aggregate report with ID: {report_id}"
)
aggregate_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "failure":
failure_reports.append(parsed_email["report"])
failure_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
forensic_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "smtp_tls":
smtp_tls_reports.append(parsed_email["report"])
smtp_tls_msg_uids.append(message_id)
@@ -2094,7 +2053,7 @@ def get_dmarc_reports_from_mailbox(
if not test:
if delete:
processed_messages = (
aggregate_report_msg_uids + failure_report_msg_uids + smtp_tls_msg_uids
aggregate_report_msg_uids + forensic_report_msg_uids + smtp_tls_msg_uids
)
number_of_processed_msgs = len(processed_messages)
@@ -2134,24 +2093,24 @@ def get_dmarc_reports_from_mailbox(
message = "Error moving message UID"
e = "{0} {1}: {2}".format(message, msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
if len(failure_report_msg_uids) > 0:
message = "Moving failure report messages from"
if len(forensic_report_msg_uids) > 0:
message = "Moving forensic report messages from"
logger.debug(
"{0} {1} to {2}".format(
message, reports_folder, failure_reports_folder
message, reports_folder, forensic_reports_folder
)
)
number_of_failure_msgs = len(failure_report_msg_uids)
for i in range(number_of_failure_msgs):
msg_uid = failure_report_msg_uids[i]
number_of_forensic_msgs = len(forensic_report_msg_uids)
for i in range(number_of_forensic_msgs):
msg_uid = forensic_report_msg_uids[i]
message = "Moving message"
logger.debug(
"{0} {1} of {2}: UID {3}".format(
message, i + 1, number_of_failure_msgs, msg_uid
message, i + 1, number_of_forensic_msgs, msg_uid
)
)
try:
connection.move_message(msg_uid, failure_reports_folder)
connection.move_message(msg_uid, forensic_reports_folder)
except Exception as e:
e = "Error moving message UID {0}: {1}".format(msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
@@ -2178,7 +2137,7 @@ def get_dmarc_reports_from_mailbox(
logger.error("Mailbox error: {0}".format(e))
results = {
"aggregate_reports": aggregate_reports,
"failure_reports": failure_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
@@ -2236,7 +2195,7 @@ def watch_inbox(
batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24,
config_reloading: Optional[Callable] = None,
should_reload: Optional[Callable] = None,
):
"""
Watches the mailbox for new messages and
@@ -2260,11 +2219,11 @@ def watch_inbox(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Replace attachment payloads in
failure report samples with None
forensic report samples with None
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
config_reloading: Optional callable that returns True when a config
should_reload: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
"""
@@ -2294,8 +2253,8 @@ def watch_inbox(
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if config_reloading is not None:
watch_kwargs["config_reloading"] = config_reloading
if should_reload is not None:
watch_kwargs["should_reload"] = should_reload
mailbox_connection.watch(**watch_kwargs)
@@ -2304,7 +2263,7 @@ def append_json(
filename: str,
reports: Union[
Sequence[AggregateReport],
Sequence[FailureReport],
Sequence[ForensicReport],
Sequence[SMTPTLSReport],
],
) -> None:
@@ -2347,10 +2306,10 @@ def save_output(
*,
output_directory: str = "output",
aggregate_json_filename: str = "aggregate.json",
failure_json_filename: str = "failure.json",
forensic_json_filename: str = "forensic.json",
smtp_tls_json_filename: str = "smtp_tls.json",
aggregate_csv_filename: str = "aggregate.csv",
failure_csv_filename: str = "failure.csv",
forensic_csv_filename: str = "forensic.csv",
smtp_tls_csv_filename: str = "smtp_tls.csv",
):
"""
@@ -2360,15 +2319,15 @@ def save_output(
results: Parsing results
output_directory (str): The path to the directory to save in
aggregate_json_filename (str): Filename for the aggregate JSON file
failure_json_filename (str): Filename for the failure JSON file
forensic_json_filename (str): Filename for the forensic JSON file
smtp_tls_json_filename (str): Filename for the SMTP TLS JSON file
aggregate_csv_filename (str): Filename for the aggregate CSV file
failure_csv_filename (str): Filename for the failure CSV file
forensic_csv_filename (str): Filename for the forensic CSV file
smtp_tls_csv_filename (str): Filename for the SMTP TLS CSV file
"""
aggregate_reports = results["aggregate_reports"]
failure_reports = results["failure_reports"]
forensic_reports = results["forensic_reports"]
smtp_tls_reports = results["smtp_tls_reports"]
output_directory = os.path.expanduser(output_directory)
@@ -2387,11 +2346,13 @@ def save_output(
parsed_aggregate_reports_to_csv(aggregate_reports),
)
append_json(os.path.join(output_directory, failure_json_filename), failure_reports)
append_json(
os.path.join(output_directory, forensic_json_filename), forensic_reports
)
append_csv(
os.path.join(output_directory, failure_csv_filename),
parsed_failure_reports_to_csv(failure_reports),
os.path.join(output_directory, forensic_csv_filename),
parsed_forensic_reports_to_csv(forensic_reports),
)
append_json(
@@ -2408,10 +2369,10 @@ def save_output(
os.makedirs(samples_directory)
sample_filenames = []
for failure_report in failure_reports:
sample = failure_report["sample"]
for forensic_report in forensic_reports:
sample = forensic_report["sample"]
message_count = 0
parsed_sample = failure_report["parsed_sample"]
parsed_sample = forensic_report["parsed_sample"]
subject = (
parsed_sample.get("filename_safe_subject")
or parsed_sample.get("subject")
@@ -2545,9 +2506,3 @@ def email_results(
attachments=attachments,
plain_message=message,
)
# Backward-compatible aliases
parse_forensic_report = parse_failure_report
parsed_forensic_reports_to_csv_rows = parsed_failure_reports_to_csv_rows
parsed_forensic_reports_to_csv = parsed_failure_reports_to_csv

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,3 @@
__version__ = "10.0.0"
__version__ = "9.2.1"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -13,7 +13,6 @@ from elasticsearch_dsl import (
InnerDoc,
Integer,
Ip,
Keyword,
Nested,
Object,
Search,
@@ -22,7 +21,7 @@ from elasticsearch_dsl import (
)
from elasticsearch_dsl.search import Q
from parsedmarc import InvalidFailureReport
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -44,23 +43,18 @@ class _PublishedPolicy(InnerDoc):
sp = Text()
pct = Integer()
fo = Text()
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
class _DKIMResult(InnerDoc):
domain = Text()
selector = Text()
result = Text()
human_result = Text()
class _SPFResult(InnerDoc):
domain = Text()
scope = Text()
results = Text()
human_result = Text()
class _AggregateReportDoc(Document):
@@ -96,45 +90,17 @@ class _AggregateReportDoc(Document):
envelope_to = Text()
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
generator = Text()
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue]
def add_dkim_result(
self,
domain: str,
selector: str,
result: _DKIMResult,
human_result: str = None,
):
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
self.dkim_results.append(
_DKIMResult(
domain=domain,
selector=selector,
result=result,
human_result=human_result,
)
_DKIMResult(domain=domain, selector=selector, result=result)
) # pyright: ignore[reportCallIssue]
def add_spf_result(
self,
domain: str,
scope: str,
result: _SPFResult,
human_result: str = None,
):
self.spf_results.append(
_SPFResult(
domain=domain,
scope=scope,
result=result,
human_result=human_result,
)
) # pyright: ignore[reportCallIssue]
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) # pyright: ignore[reportCallIssue]
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
self.passed_dmarc = False
@@ -154,7 +120,7 @@ class _EmailAttachmentDoc(Document):
sha256 = Text()
class _FailureSampleDoc(InnerDoc):
class _ForensicSampleDoc(InnerDoc):
raw = Text()
headers = Object()
headers_only = Boolean()
@@ -191,9 +157,9 @@ class _FailureSampleDoc(InnerDoc):
) # pyright: ignore[reportCallIssue]
class _FailureReportDoc(Document):
class _ForensicReportDoc(Document):
class Index:
name = "dmarc_failure"
name = "dmarc_forensic"
feedback_type = Text()
user_agent = Text()
@@ -211,7 +177,7 @@ class _FailureReportDoc(Document):
source_auth_failures = Text()
dkim_domain = Text()
original_rcpt_to = Text()
sample = Object(_FailureSampleDoc)
sample = Object(_ForensicSampleDoc)
class _SMTPTLSFailureDetailsDoc(InnerDoc):
@@ -302,7 +268,6 @@ def set_hosts(
*,
use_ssl: bool = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -315,7 +280,6 @@ def set_hosts(
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -327,11 +291,10 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["ca_certs"] = ssl_cert_path
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
@@ -364,20 +327,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
failure_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
):
"""
Updates index mappings
Args:
aggregate_indexes (list): A list of aggregate index names
failure_indexes (list): A list of failure index names
forensic_indexes (list): A list of forensic index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if failure_indexes is None:
failure_indexes = []
if forensic_indexes is None:
forensic_indexes = []
for aggregate_index_name in aggregate_indexes:
if not Index(aggregate_index_name).exists():
continue
@@ -407,7 +370,7 @@ def migrate_indexes(
reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType]
Index(aggregate_index_name).delete()
for failure_index in failure_indexes:
for forensic_index in forensic_indexes:
pass
@@ -423,7 +386,7 @@ def save_aggregate_report_to_elasticsearch(
Saves a parsed DMARC aggregate report to Elasticsearch
Args:
aggregate_report (dict): A parsed aggregate report
aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -491,9 +454,6 @@ def save_aggregate_report_to_elasticsearch(
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get("discovery_method"),
)
for record in aggregate_report["records"]:
@@ -535,12 +495,6 @@ def save_aggregate_report_to_elasticsearch(
header_from=record["identifiers"]["header_from"],
envelope_from=record["identifiers"]["envelope_from"],
envelope_to=record["identifiers"]["envelope_to"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get(
"discovery_method"
),
generator=metadata.get("generator"),
)
for override in record["policy_evaluated"]["policy_override_reasons"]:
@@ -553,7 +507,6 @@ def save_aggregate_report_to_elasticsearch(
domain=dkim_result["domain"],
selector=dkim_result["selector"],
result=dkim_result["result"],
human_result=dkim_result.get("human_result"),
)
for spf_result in record["auth_results"]["spf"]:
@@ -561,7 +514,6 @@ def save_aggregate_report_to_elasticsearch(
domain=spf_result["domain"],
scope=spf_result["scope"],
result=spf_result["result"],
human_result=spf_result.get("human_result"),
)
index = "dmarc_aggregate"
@@ -583,8 +535,8 @@ def save_aggregate_report_to_elasticsearch(
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
def save_failure_report_to_elasticsearch(
failure_report: dict[str, Any],
def save_forensic_report_to_elasticsearch(
forensic_report: dict[str, Any],
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -592,10 +544,10 @@ def save_failure_report_to_elasticsearch(
number_of_replicas: int = 0,
):
"""
Saves a parsed DMARC failure report to Elasticsearch
Saves a parsed DMARC forensic report to Elasticsearch
Args:
failure_report (dict): A parsed failure report
forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -608,28 +560,26 @@ def save_failure_report_to_elasticsearch(
AlreadySaved
"""
logger.info("Saving failure report to Elasticsearch")
failure_report = failure_report.copy()
logger.info("Saving forensic report to Elasticsearch")
forensic_report = forensic_report.copy()
sample_date = None
if failure_report["parsed_sample"]["date"] is not None:
sample_date = failure_report["parsed_sample"]["date"]
if forensic_report["parsed_sample"]["date"] is not None:
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = failure_report["parsed_sample"]["headers"]
original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"])
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(index_suffix)
search_index = "dmarc_forensic_{0}*".format(index_suffix)
else:
search_index = "dmarc_failure*,dmarc_forensic*"
search_index = "dmarc_forensic*"
if index_prefix is not None:
search_index = ",".join(
"{0}{1}".format(index_prefix, part) for part in search_index.split(",")
)
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) # pyright: ignore[reportArgumentType]
@@ -670,64 +620,64 @@ def save_failure_report_to_elasticsearch(
if len(existing) > 0:
raise AlreadySaved(
"A failure sample to {0} from {1} "
"A forensic sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"Elasticsearch".format(
to_, from_, subject, failure_report["arrival_date_utc"]
to_, from_, subject, forensic_report["arrival_date_utc"]
)
)
parsed_sample = failure_report["parsed_sample"]
sample = _FailureSampleDoc(
raw=failure_report["sample"],
parsed_sample = forensic_report["parsed_sample"]
sample = _ForensicSampleDoc(
raw=forensic_report["sample"],
headers=headers,
headers_only=failure_report["sample_headers_only"],
headers_only=forensic_report["sample_headers_only"],
date=sample_date,
subject=failure_report["parsed_sample"]["subject"],
subject=forensic_report["parsed_sample"]["subject"],
filename_safe_subject=parsed_sample["filename_safe_subject"],
body=failure_report["parsed_sample"]["body"],
body=forensic_report["parsed_sample"]["body"],
)
for address in failure_report["parsed_sample"]["to"]:
for address in forensic_report["parsed_sample"]["to"]:
sample.add_to(display_name=address["display_name"], address=address["address"])
for address in failure_report["parsed_sample"]["reply_to"]:
for address in forensic_report["parsed_sample"]["reply_to"]:
sample.add_reply_to(
display_name=address["display_name"], address=address["address"]
)
for address in failure_report["parsed_sample"]["cc"]:
for address in forensic_report["parsed_sample"]["cc"]:
sample.add_cc(display_name=address["display_name"], address=address["address"])
for address in failure_report["parsed_sample"]["bcc"]:
for address in forensic_report["parsed_sample"]["bcc"]:
sample.add_bcc(display_name=address["display_name"], address=address["address"])
for attachment in failure_report["parsed_sample"]["attachments"]:
for attachment in forensic_report["parsed_sample"]["attachments"]:
sample.add_attachment(
filename=attachment["filename"],
content_type=attachment["mail_content_type"],
sha256=attachment["sha256"],
)
try:
failure_doc = _FailureReportDoc(
feedback_type=failure_report["feedback_type"],
user_agent=failure_report["user_agent"],
version=failure_report["version"],
original_mail_from=failure_report["original_mail_from"],
forensic_doc = _ForensicReportDoc(
feedback_type=forensic_report["feedback_type"],
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
arrival_date=arrival_date_epoch_milliseconds,
domain=failure_report["reported_domain"],
original_envelope_id=failure_report["original_envelope_id"],
authentication_results=failure_report["authentication_results"],
delivery_results=failure_report["delivery_result"],
source_ip_address=failure_report["source"]["ip_address"],
source_country=failure_report["source"]["country"],
source_reverse_dns=failure_report["source"]["reverse_dns"],
source_base_domain=failure_report["source"]["base_domain"],
authentication_mechanisms=failure_report["authentication_mechanisms"],
auth_failure=failure_report["auth_failure"],
dkim_domain=failure_report["dkim_domain"],
original_rcpt_to=failure_report["original_rcpt_to"],
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
delivery_results=forensic_report["delivery_result"],
source_ip_address=forensic_report["source"]["ip_address"],
source_country=forensic_report["source"]["country"],
source_reverse_dns=forensic_report["source"]["reverse_dns"],
source_base_domain=forensic_report["source"]["base_domain"],
authentication_mechanisms=forensic_report["authentication_mechanisms"],
auth_failure=forensic_report["auth_failure"],
dkim_domain=forensic_report["dkim_domain"],
original_rcpt_to=forensic_report["original_rcpt_to"],
sample=sample,
)
index = "dmarc_failure"
index = "dmarc_forensic"
if index_suffix:
index = "{0}_{1}".format(index, index_suffix)
if index_prefix:
@@ -741,14 +691,14 @@ def save_failure_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
failure_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
try:
failure_doc.save()
forensic_doc.save()
except Exception as e:
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
except KeyError as e:
raise InvalidFailureReport(
"Failure report missing required field: {0}".format(e.__str__())
raise InvalidForensicReport(
"Forensic report missing required field: {0}".format(e.__str__())
)
@@ -785,7 +735,6 @@ def save_smtp_tls_report_to_elasticsearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date
@@ -902,9 +851,3 @@ def save_smtp_tls_report_to_elasticsearch(
smtp_tls_doc.save()
except Exception as e:
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
# Backward-compatible aliases
_ForensicSampleDoc = _FailureSampleDoc
_ForensicReportDoc = _FailureReportDoc
save_forensic_report_to_elasticsearch = save_failure_report_to_elasticsearch

View File

@@ -3,18 +3,17 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_failure_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from typing import Any
from parsedmarc.types import AggregateReport, SMTPTLSReport
log_context_data = threading.local()
@@ -38,7 +37,7 @@ class GelfClient(object):
"""
self.host = host
self.port = port
self.logger = logging.getLogger("parsedmarc_gelf")
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
@@ -51,7 +50,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -59,13 +58,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_failure_report_to_gelf(self, failure_reports: list[dict[str, Any]]):
rows = parsed_failure_reports_to_csv_rows(failure_reports)
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc failure report")
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -75,7 +74,3 @@ class GelfClient(object):
"""Remove and close the GELF handler, releasing its connection."""
self.logger.removeHandler(self.handler)
self.handler.close()
# Backward-compatible aliases
GelfClient.save_forensic_report_to_gelf = GelfClient.save_failure_report_to_gelf

View File

@@ -143,31 +143,31 @@ class KafkaClient(object):
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
def save_failure_reports_to_kafka(
def save_forensic_reports_to_kafka(
self,
failure_reports: Union[dict[str, Any], list[dict[str, Any]]],
failure_topic: str,
forensic_reports: Union[dict[str, Any], list[dict[str, Any]]],
forensic_topic: str,
):
"""
Saves failure DMARC reports to Kafka, sends individual
Saves forensic DMARC reports to Kafka, sends individual
records (slices) since Kafka requires messages to be <= 1MB
by default.
Args:
failure_reports (list): A list of failure report dicts
forensic_reports (list): A list of forensic report dicts
to save to Kafka
failure_topic (str): The name of the Kafka topic
forensic_topic (str): The name of the Kafka topic
"""
if isinstance(failure_reports, dict):
failure_reports = [failure_reports]
if isinstance(forensic_reports, dict):
forensic_reports = [forensic_reports]
if len(failure_reports) < 1:
if len(forensic_reports) < 1:
return
try:
logger.debug("Saving failure reports to Kafka")
self.producer.send(failure_topic, failure_reports)
logger.debug("Saving forensic reports to Kafka")
self.producer.send(forensic_topic, forensic_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Kafka error: Unknown topic or partition on broker")
except Exception as e:
@@ -188,7 +188,7 @@ class KafkaClient(object):
by default.
Args:
smtp_tls_reports (list): A list of SMTP TLS report dicts
smtp_tls_reports (list): A list of forensic report dicts
to save to Kafka
smtp_tls_topic (str): The name of the Kafka topic
@@ -200,7 +200,7 @@ class KafkaClient(object):
return
try:
logger.debug("Saving SMTP TLS reports to Kafka")
logger.debug("Saving forensic reports to Kafka")
self.producer.send(smtp_tls_topic, smtp_tls_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Kafka error: Unknown topic or partition on broker")
@@ -210,7 +210,3 @@ class KafkaClient(object):
self.producer.flush()
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
# Backward-compatible aliases
KafkaClient.save_forensic_reports_to_kafka = KafkaClient.save_failure_reports_to_kafka

View File

@@ -38,9 +38,9 @@ class LogAnalyticsConfig:
The Stream name where
the Aggregate DMARC reports
need to be pushed.
dcr_failure_stream (str):
dcr_forensic_stream (str):
The Stream name where
the Failure DMARC reports
the Forensic DMARC reports
need to be pushed.
dcr_smtp_tls_stream (str):
The Stream name where
@@ -56,7 +56,7 @@ class LogAnalyticsConfig:
dce: str,
dcr_immutable_id: str,
dcr_aggregate_stream: str,
dcr_failure_stream: str,
dcr_forensic_stream: str,
dcr_smtp_tls_stream: str,
):
self.client_id = client_id
@@ -65,7 +65,7 @@ class LogAnalyticsConfig:
self.dce = dce
self.dcr_immutable_id = dcr_immutable_id
self.dcr_aggregate_stream = dcr_aggregate_stream
self.dcr_failure_stream = dcr_failure_stream
self.dcr_forensic_stream = dcr_forensic_stream
self.dcr_smtp_tls_stream = dcr_smtp_tls_stream
@@ -84,7 +84,7 @@ class LogAnalyticsClient(object):
dce: str,
dcr_immutable_id: str,
dcr_aggregate_stream: str,
dcr_failure_stream: str,
dcr_forensic_stream: str,
dcr_smtp_tls_stream: str,
):
self.conf = LogAnalyticsConfig(
@@ -94,7 +94,7 @@ class LogAnalyticsClient(object):
dce=dce,
dcr_immutable_id=dcr_immutable_id,
dcr_aggregate_stream=dcr_aggregate_stream,
dcr_failure_stream=dcr_failure_stream,
dcr_forensic_stream=dcr_forensic_stream,
dcr_smtp_tls_stream=dcr_smtp_tls_stream,
)
if (
@@ -135,7 +135,7 @@ class LogAnalyticsClient(object):
self,
results: dict[str, Any],
save_aggregate: bool,
save_failure: bool,
save_forensic: bool,
save_smtp_tls: bool,
):
"""
@@ -146,13 +146,13 @@ class LogAnalyticsClient(object):
Args:
results (list):
The DMARC reports (Aggregate & Failure)
The DMARC reports (Aggregate & Forensic)
save_aggregate (bool):
Whether Aggregate reports can be saved into Log Analytics
save_failure (bool):
Whether Failure reports can be saved into Log Analytics
save_forensic (bool):
Whether Forensic reports can be saved into Log Analytics
save_smtp_tls (bool):
Whether Failure reports can be saved into Log Analytics
Whether Forensic reports can be saved into Log Analytics
"""
conf = self.conf
credential = ClientSecretCredential(
@@ -173,16 +173,16 @@ class LogAnalyticsClient(object):
)
logger.info("Successfully pushed aggregate reports.")
if (
results["failure_reports"]
and conf.dcr_failure_stream
and len(results["failure_reports"]) > 0
and save_failure
results["forensic_reports"]
and conf.dcr_forensic_stream
and len(results["forensic_reports"]) > 0
and save_forensic
):
logger.info("Publishing failure reports.")
logger.info("Publishing forensic reports.")
self.publish_json(
results["failure_reports"], logs_client, conf.dcr_failure_stream
results["forensic_reports"], logs_client, conf.dcr_forensic_stream
)
logger.info("Successfully pushed failure reports.")
logger.info("Successfully pushed forensic reports.")
if (
results["smtp_tls_reports"]
and conf.dcr_smtp_tls_stream

View File

@@ -55,7 +55,6 @@ def _get_creds(
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes)
creds = flow.run_local_server(open_browser=False, oauth2_port=oauth2_port)
# Save the credentials for the next run
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
with Path(token_file).open("w") as token:
token.write(creds.to_json())
return creds
@@ -176,13 +175,13 @@ class GmailConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
if should_reload and should_reload():
return
sleep(check_timeout)
if config_reloading and config_reloading():
if should_reload and should_reload():
return
check_callback(self)

View File

@@ -56,7 +56,6 @@ def _load_token(token_path: Path) -> Optional[str]:
def _cache_auth_record(record: AuthenticationRecord, token_path: Path):
token = record.serialize()
token_path.parent.mkdir(parents=True, exist_ok=True)
with token_path.open("w") as token_file:
token_file.write(token)
@@ -279,14 +278,12 @@ class MSGraphConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
if should_reload and should_reload():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)

View File

@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
@@ -94,8 +94,6 @@ class IMAPConnection(MailboxConnection):
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
@@ -113,5 +111,5 @@ class IMAPConnection(MailboxConnection):
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if config_reloading and config_reloading():
if should_reload and should_reload():
return

View File

@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
raise NotImplementedError

View File

@@ -19,54 +19,29 @@ class MaildirConnection(MailboxConnection):
):
self._maildir_path = maildir_path
self._maildir_create = maildir_create
try:
maildir_owner = os.stat(maildir_path).st_uid
except OSError:
maildir_owner = None
current_uid = os.getuid()
if maildir_owner is not None and current_uid != maildir_owner:
if current_uid == 0:
try:
logger.warning(
"Switching uid to {} to access Maildir".format(maildir_owner)
)
os.setuid(maildir_owner)
except OSError as e:
logger.warning(
"Failed to switch uid to {}: {}".format(maildir_owner, e)
)
else:
maildir_owner = os.stat(maildir_path).st_uid
if os.getuid() != maildir_owner:
if os.getuid() == 0:
logger.warning(
"Runtime uid {} differs from maildir {} owner {}. "
"Access may fail if permissions are insufficient.".format(
current_uid, maildir_path, maildir_owner
)
"Switching uid to {} to access Maildir".format(maildir_owner)
)
if maildir_create:
for subdir in ("cur", "new", "tmp"):
os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True)
os.setuid(maildir_owner)
else:
ex = "runtime uid {} differ from maildir {} owner {}".format(
os.getuid(), maildir_path, maildir_owner
)
raise Exception(ex)
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
self._active_folder: mailbox.Maildir = self._client
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
def _get_folder(self, folder_name: str) -> mailbox.Maildir:
"""Return a cached subfolder handle, creating it if needed."""
if folder_name not in self._subfolder_client:
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
return self._subfolder_client[folder_name]
def create_folder(self, folder_name: str):
self._get_folder(folder_name)
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
def fetch_messages(self, reports_folder: str, **kwargs):
if reports_folder and reports_folder != "INBOX":
self._active_folder = self._get_folder(reports_folder)
else:
self._active_folder = self._client
return self._active_folder.keys()
return self._client.keys()
def fetch_message(self, message_id: str) -> str:
msg = self._active_folder.get(message_id)
msg = self._client.get(message_id)
if msg is not None:
msg = msg.as_string()
if msg is not None:
@@ -74,27 +49,26 @@ class MaildirConnection(MailboxConnection):
return ""
def delete_message(self, message_id: str):
self._active_folder.remove(message_id)
self._client.remove(message_id)
def move_message(self, message_id: str, folder_name: str):
message_data = self._active_folder.get(message_id)
message_data = self._client.get(message_id)
if message_data is None:
return
dest = self._get_folder(folder_name)
dest.add(message_data)
self._active_folder.remove(message_id)
if folder_name not in self._subfolder_client:
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
self._subfolder_client[folder_name].add(message_data)
self._client.remove(message_id)
def keepalive(self):
return
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if config_reloading and config_reloading():
if should_reload and should_reload():
return
sleep(check_timeout)

View File

@@ -14,7 +14,6 @@ from opensearchpy import (
InnerDoc,
Integer,
Ip,
Keyword,
Nested,
Object,
Q,
@@ -25,7 +24,7 @@ from opensearchpy import (
)
from opensearchpy.helpers import reindex
from parsedmarc import InvalidFailureReport
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -47,23 +46,18 @@ class _PublishedPolicy(InnerDoc):
sp = Text()
pct = Integer()
fo = Text()
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
class _DKIMResult(InnerDoc):
domain = Text()
selector = Text()
result = Text()
human_result = Text()
class _SPFResult(InnerDoc):
domain = Text()
scope = Text()
results = Text()
human_result = Text()
class _AggregateReportDoc(Document):
@@ -99,45 +93,17 @@ class _AggregateReportDoc(Document):
envelope_to = Text()
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
generator = Text()
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(
self,
domain: str,
selector: str,
result: _DKIMResult,
human_result: str = None,
):
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
self.dkim_results.append(
_DKIMResult(
domain=domain,
selector=selector,
result=result,
human_result=human_result,
)
_DKIMResult(domain=domain, selector=selector, result=result)
)
def add_spf_result(
self,
domain: str,
scope: str,
result: _SPFResult,
human_result: str = None,
):
self.spf_results.append(
_SPFResult(
domain=domain,
scope=scope,
result=result,
human_result=human_result,
)
)
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
self.passed_dmarc = False
@@ -157,7 +123,7 @@ class _EmailAttachmentDoc(Document):
sha256 = Text()
class _FailureSampleDoc(InnerDoc):
class _ForensicSampleDoc(InnerDoc):
raw = Text()
headers = Object()
headers_only = Boolean()
@@ -194,9 +160,9 @@ class _FailureSampleDoc(InnerDoc):
)
class _FailureReportDoc(Document):
class _ForensicReportDoc(Document):
class Index:
name = "dmarc_failure"
name = "dmarc_forensic"
feedback_type = Text()
user_agent = Text()
@@ -214,7 +180,7 @@ class _FailureReportDoc(Document):
source_auth_failures = Text()
dkim_domain = Text()
original_rcpt_to = Text()
sample = Object(_FailureSampleDoc)
sample = Object(_ForensicSampleDoc)
class _SMTPTLSFailureDetailsDoc(InnerDoc):
@@ -305,7 +271,6 @@ def set_hosts(
*,
use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -321,7 +286,6 @@ def set_hosts(
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -336,11 +300,10 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["ca_certs"] = ssl_cert_path
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
normalized_auth_type = (auth_type or "basic").strip().lower()
if normalized_auth_type == "awssigv4":
if not aws_region:
@@ -393,20 +356,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
failure_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
):
"""
Updates index mappings
Args:
aggregate_indexes (list): A list of aggregate index names
failure_indexes (list): A list of failure index names
forensic_indexes (list): A list of forensic index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if failure_indexes is None:
failure_indexes = []
if forensic_indexes is None:
forensic_indexes = []
for aggregate_index_name in aggregate_indexes:
if not Index(aggregate_index_name).exists():
continue
@@ -436,7 +399,7 @@ def migrate_indexes(
reindex(connections.get_connection(), aggregate_index_name, new_index_name)
Index(aggregate_index_name).delete()
for failure_index in failure_indexes:
for forensic_index in forensic_indexes:
pass
@@ -452,7 +415,7 @@ def save_aggregate_report_to_opensearch(
Saves a parsed DMARC aggregate report to OpenSearch
Args:
aggregate_report (dict): A parsed aggregate report
aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -520,9 +483,6 @@ def save_aggregate_report_to_opensearch(
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get("discovery_method"),
)
for record in aggregate_report["records"]:
@@ -564,12 +524,6 @@ def save_aggregate_report_to_opensearch(
header_from=record["identifiers"]["header_from"],
envelope_from=record["identifiers"]["envelope_from"],
envelope_to=record["identifiers"]["envelope_to"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get(
"discovery_method"
),
generator=metadata.get("generator"),
)
for override in record["policy_evaluated"]["policy_override_reasons"]:
@@ -582,7 +536,6 @@ def save_aggregate_report_to_opensearch(
domain=dkim_result["domain"],
selector=dkim_result["selector"],
result=dkim_result["result"],
human_result=dkim_result.get("human_result"),
)
for spf_result in record["auth_results"]["spf"]:
@@ -590,7 +543,6 @@ def save_aggregate_report_to_opensearch(
domain=spf_result["domain"],
scope=spf_result["scope"],
result=spf_result["result"],
human_result=spf_result.get("human_result"),
)
index = "dmarc_aggregate"
@@ -612,8 +564,8 @@ def save_aggregate_report_to_opensearch(
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
def save_failure_report_to_opensearch(
failure_report: dict[str, Any],
def save_forensic_report_to_opensearch(
forensic_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
@@ -621,10 +573,10 @@ def save_failure_report_to_opensearch(
number_of_replicas: int = 0,
):
"""
Saves a parsed DMARC failure report to OpenSearch
Saves a parsed DMARC forensic report to OpenSearch
Args:
failure_report (dict): A parsed failure report
forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -637,28 +589,26 @@ def save_failure_report_to_opensearch(
AlreadySaved
"""
logger.info("Saving failure report to OpenSearch")
failure_report = failure_report.copy()
logger.info("Saving forensic report to OpenSearch")
forensic_report = forensic_report.copy()
sample_date = None
if failure_report["parsed_sample"]["date"] is not None:
sample_date = failure_report["parsed_sample"]["date"]
if forensic_report["parsed_sample"]["date"] is not None:
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = failure_report["parsed_sample"]["headers"]
original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"])
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(index_suffix)
search_index = "dmarc_forensic_{0}*".format(index_suffix)
else:
search_index = "dmarc_failure*,dmarc_forensic*"
search_index = "dmarc_forensic*"
if index_prefix is not None:
search_index = ",".join(
"{0}{1}".format(index_prefix, part) for part in search_index.split(",")
)
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
@@ -699,62 +649,64 @@ def save_failure_report_to_opensearch(
if len(existing) > 0:
raise AlreadySaved(
"A failure sample to {0} from {1} "
"A forensic sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"OpenSearch".format(to_, from_, subject, failure_report["arrival_date_utc"])
"OpenSearch".format(
to_, from_, subject, forensic_report["arrival_date_utc"]
)
)
parsed_sample = failure_report["parsed_sample"]
sample = _FailureSampleDoc(
raw=failure_report["sample"],
parsed_sample = forensic_report["parsed_sample"]
sample = _ForensicSampleDoc(
raw=forensic_report["sample"],
headers=headers,
headers_only=failure_report["sample_headers_only"],
headers_only=forensic_report["sample_headers_only"],
date=sample_date,
subject=failure_report["parsed_sample"]["subject"],
subject=forensic_report["parsed_sample"]["subject"],
filename_safe_subject=parsed_sample["filename_safe_subject"],
body=failure_report["parsed_sample"]["body"],
body=forensic_report["parsed_sample"]["body"],
)
for address in failure_report["parsed_sample"]["to"]:
for address in forensic_report["parsed_sample"]["to"]:
sample.add_to(display_name=address["display_name"], address=address["address"])
for address in failure_report["parsed_sample"]["reply_to"]:
for address in forensic_report["parsed_sample"]["reply_to"]:
sample.add_reply_to(
display_name=address["display_name"], address=address["address"]
)
for address in failure_report["parsed_sample"]["cc"]:
for address in forensic_report["parsed_sample"]["cc"]:
sample.add_cc(display_name=address["display_name"], address=address["address"])
for address in failure_report["parsed_sample"]["bcc"]:
for address in forensic_report["parsed_sample"]["bcc"]:
sample.add_bcc(display_name=address["display_name"], address=address["address"])
for attachment in failure_report["parsed_sample"]["attachments"]:
for attachment in forensic_report["parsed_sample"]["attachments"]:
sample.add_attachment(
filename=attachment["filename"],
content_type=attachment["mail_content_type"],
sha256=attachment["sha256"],
)
try:
failure_doc = _FailureReportDoc(
feedback_type=failure_report["feedback_type"],
user_agent=failure_report["user_agent"],
version=failure_report["version"],
original_mail_from=failure_report["original_mail_from"],
forensic_doc = _ForensicReportDoc(
feedback_type=forensic_report["feedback_type"],
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
arrival_date=arrival_date_epoch_milliseconds,
domain=failure_report["reported_domain"],
original_envelope_id=failure_report["original_envelope_id"],
authentication_results=failure_report["authentication_results"],
delivery_results=failure_report["delivery_result"],
source_ip_address=failure_report["source"]["ip_address"],
source_country=failure_report["source"]["country"],
source_reverse_dns=failure_report["source"]["reverse_dns"],
source_base_domain=failure_report["source"]["base_domain"],
authentication_mechanisms=failure_report["authentication_mechanisms"],
auth_failure=failure_report["auth_failure"],
dkim_domain=failure_report["dkim_domain"],
original_rcpt_to=failure_report["original_rcpt_to"],
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
delivery_results=forensic_report["delivery_result"],
source_ip_address=forensic_report["source"]["ip_address"],
source_country=forensic_report["source"]["country"],
source_reverse_dns=forensic_report["source"]["reverse_dns"],
source_base_domain=forensic_report["source"]["base_domain"],
authentication_mechanisms=forensic_report["authentication_mechanisms"],
auth_failure=forensic_report["auth_failure"],
dkim_domain=forensic_report["dkim_domain"],
original_rcpt_to=forensic_report["original_rcpt_to"],
sample=sample,
)
index = "dmarc_failure"
index = "dmarc_forensic"
if index_suffix:
index = "{0}_{1}".format(index, index_suffix)
if index_prefix:
@@ -768,14 +720,14 @@ def save_failure_report_to_opensearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
failure_doc.meta.index = index
forensic_doc.meta.index = index
try:
failure_doc.save()
forensic_doc.save()
except Exception as e:
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
except KeyError as e:
raise InvalidFailureReport(
"Failure report missing required field: {0}".format(e.__str__())
raise InvalidForensicReport(
"Forensic report missing required field: {0}".format(e.__str__())
)
@@ -812,7 +764,6 @@ def save_smtp_tls_report_to_opensearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date
@@ -929,9 +880,3 @@ def save_smtp_tls_report_to_opensearch(
smtp_tls_doc.save()
except Exception as e:
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
# Backward-compatible aliases
_ForensicSampleDoc = _FailureSampleDoc
_ForensicReportDoc = _FailureReportDoc
save_forensic_report_to_opensearch = save_failure_report_to_opensearch

View File

@@ -56,8 +56,8 @@ class S3Client(object):
def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate")
def save_failure_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "failure")
def save_forensic_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "forensic")
def save_smtp_tls_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "smtp_tls")
@@ -93,15 +93,3 @@ class S3Client(object):
self.bucket.put_object(
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
)
def close(self):
"""Clean up the boto3 resource."""
try:
if self.s3.meta is not None:
self.s3.meta.client.close()
except Exception:
pass
# Backward-compatible aliases
S3Client.save_forensic_report_to_s3 = S3Client.save_failure_report_to_s3

View File

@@ -58,7 +58,7 @@ class HECClient(object):
self.source = source
self.session = requests.Session()
self.timeout = timeout
self.verify = verify
self.session.verify = verify
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
host=self.host, source=self.source, index=self.index
)
@@ -124,51 +124,47 @@ class HECClient(object):
data["event"] = new_report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def save_failure_reports_to_splunk(
def save_forensic_reports_to_splunk(
self,
failure_reports: Union[list[dict[str, Any]], dict[str, Any]],
forensic_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
"""
Saves failure DMARC reports to Splunk
Saves forensic DMARC reports to Splunk
Args:
failure_reports (list): A list of failure report dictionaries
forensic_reports (list): A list of forensic report dictionaries
to save in Splunk
"""
logger.debug("Saving failure reports to Splunk")
if isinstance(failure_reports, dict):
failure_reports = [failure_reports]
logger.debug("Saving forensic reports to Splunk")
if isinstance(forensic_reports, dict):
forensic_reports = [forensic_reports]
if len(failure_reports) < 1:
if len(forensic_reports) < 1:
return
json_str = ""
for report in failure_reports:
for report in forensic_reports:
data = self._common_data.copy()
data["sourcetype"] = "dmarc:failure"
data["sourcetype"] = "dmarc:forensic"
timestamp = human_timestamp_to_unix_timestamp(report["arrival_date_utc"])
data["time"] = timestamp
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
@@ -202,22 +198,12 @@ class HECClient(object):
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def close(self):
"""Close the underlying HTTP session."""
self.session.close()
# Backward-compatible aliases
HECClient.save_forensic_reports_to_splunk = HECClient.save_failure_reports_to_splunk

View File

@@ -13,7 +13,7 @@ from typing import Any, Optional
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_failure_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
@@ -170,8 +170,8 @@ class SyslogClient(object):
for row in rows:
self.logger.info(json.dumps(row))
def save_failure_report_to_syslog(self, failure_reports: list[dict[str, Any]]):
rows = parsed_failure_reports_to_csv_rows(failure_reports)
def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
@@ -184,7 +184,3 @@ class SyslogClient(object):
"""Remove and close the syslog handler, releasing its socket."""
self.logger.removeHandler(self.log_handler)
self.log_handler.close()
# Backward-compatible aliases
SyslogClient.save_forensic_report_to_syslog = SyslogClient.save_failure_report_to_syslog

View File

@@ -8,7 +8,7 @@ from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# For optional keys, use total=False TypedDicts.
ReportType = Literal["aggregate", "failure", "smtp_tls"]
ReportType = Literal["aggregate", "forensic", "smtp_tls"]
class AggregateReportMetadata(TypedDict):
@@ -21,7 +21,6 @@ class AggregateReportMetadata(TypedDict):
timespan_requires_normalization: bool
original_timespan_seconds: int
errors: List[str]
generator: Optional[str]
class AggregatePolicyPublished(TypedDict):
@@ -30,11 +29,8 @@ class AggregatePolicyPublished(TypedDict):
aspf: str
p: str
sp: str
pct: Optional[str]
fo: Optional[str]
np: Optional[str]
testing: Optional[str]
discovery_method: Optional[str]
pct: str
fo: str
class IPSourceInfo(TypedDict):
@@ -67,14 +63,12 @@ class AggregateAuthResultDKIM(TypedDict):
domain: str
result: str
selector: str
human_result: Optional[str]
class AggregateAuthResultSPF(TypedDict):
domain: str
result: str
scope: str
human_result: Optional[str]
class AggregateAuthResults(TypedDict):
@@ -125,7 +119,7 @@ ParsedEmail = TypedDict(
"ParsedEmail",
{
# This is a lightly-specified version of mailsuite/mailparser JSON.
# It focuses on the fields parsedmarc uses in failure report handling.
# It focuses on the fields parsedmarc uses in forensic handling.
"headers": Dict[str, Any],
"subject": Optional[str],
"filename_safe_subject": Optional[str],
@@ -144,7 +138,7 @@ ParsedEmail = TypedDict(
)
class FailureReport(TypedDict):
class ForensicReport(TypedDict):
feedback_type: Optional[str]
user_agent: Optional[str]
version: Optional[str]
@@ -165,10 +159,6 @@ class FailureReport(TypedDict):
parsed_sample: ParsedEmail
# Backward-compatible alias
ForensicReport = FailureReport
class SMTPTLSFailureDetails(TypedDict):
result_type: str
failed_session_count: int
@@ -211,13 +201,9 @@ class AggregateParsedReport(TypedDict):
report: AggregateReport
class FailureParsedReport(TypedDict):
report_type: Literal["failure"]
report: FailureReport
# Backward-compatible alias
ForensicParsedReport = FailureParsedReport
class ForensicParsedReport(TypedDict):
report_type: Literal["forensic"]
report: ForensicReport
class SMTPTLSParsedReport(TypedDict):
@@ -225,10 +211,10 @@ class SMTPTLSParsedReport(TypedDict):
report: SMTPTLSReport
ParsedReport = Union[AggregateParsedReport, FailureParsedReport, SMTPTLSParsedReport]
ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport]
class ParsingResults(TypedDict):
aggregate_reports: List[AggregateReport]
failure_reports: List[FailureReport]
forensic_reports: List[ForensicReport]
smtp_tls_reports: List[SMTPTLSReport]

View File

@@ -45,7 +45,7 @@ from parsedmarc.log import logger
parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
null_file = subprocess.DEVNULL
null_file = open(os.devnull, "w")
mailparser_logger = logging.getLogger("mailparser")
mailparser_logger.setLevel(logging.CRITICAL)
psl = publicsuffixlist.PublicSuffixList()
@@ -335,76 +335,6 @@ def get_ip_address_country(
return country
def load_reverse_dns_map(
reverse_dns_map: ReverseDNSMap,
*,
always_use_local_file: bool = False,
local_file_path: Optional[str] = None,
url: Optional[str] = None,
offline: bool = False,
) -> None:
"""
Loads the reverse DNS map from a URL or local file.
Clears and repopulates the given map dict in place. If the map is
fetched from a URL, that is tried first; on failure (or if offline/local
mode is selected) the bundled CSV is used as a fallback.
Args:
reverse_dns_map (dict): The map dict to populate (modified in place)
always_use_local_file (bool): Always use a local map file
local_file_path (str): Path to a local map file
url (str): URL to a reverse DNS map
offline (bool): Use the built-in copy of the reverse DNS map
"""
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map.clear()
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = {
"name": row["name"].strip(),
"type": row["type"].strip(),
}
csv_file = io.StringIO()
if not (offline or always_use_local_file):
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
def get_service_from_reverse_dns_base_domain(
base_domain,
*,
@@ -431,21 +361,55 @@ def get_service_from_reverse_dns_base_domain(
"""
base_domain = base_domain.lower().strip()
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None:
reverse_dns_map_value = {}
else:
reverse_dns_map_value = reverse_dns_map
if len(reverse_dns_map_value) == 0:
load_reverse_dns_map(
reverse_dns_map_value,
always_use_local_file=always_use_local_file,
local_file_path=local_file_path,
url=url,
offline=offline,
)
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map_value) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
service: ReverseDNSService
try:
service = reverse_dns_map_value[base_domain]

View File

@@ -16,7 +16,7 @@ class WebhookClient(object):
def __init__(
self,
aggregate_url: str,
failure_url: str,
forensic_url: str,
smtp_tls_url: str,
timeout: Optional[int] = 60,
):
@@ -24,12 +24,12 @@ class WebhookClient(object):
Initializes the WebhookClient
Args:
aggregate_url (str): The aggregate report webhook url
failure_url (str): The failure report webhook url
forensic_url (str): The forensic report webhook url
smtp_tls_url (str): The smtp_tls report webhook url
timeout (int): The timeout to use when calling the webhooks
"""
self.aggregate_url = aggregate_url
self.failure_url = failure_url
self.forensic_url = forensic_url
self.smtp_tls_url = smtp_tls_url
self.timeout = timeout
self.session = requests.Session()
@@ -38,9 +38,9 @@ class WebhookClient(object):
"Content-Type": "application/json",
}
def save_failure_report_to_webhook(self, report: str):
def save_forensic_report_to_webhook(self, report: str):
try:
self._send_to_webhook(self.failure_url, report)
self._send_to_webhook(self.forensic_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
@@ -67,9 +67,3 @@ class WebhookClient(object):
def close(self):
"""Close the underlying HTTP session."""
self.session.close()
# Backward-compatible aliases
WebhookClient.save_forensic_report_to_webhook = (
WebhookClient.save_failure_report_to_webhook
)

View File

@@ -50,7 +50,7 @@ dependencies = [
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=4.0.0",
"opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0",
"pygelf>=0.4.2",
"requests>=2.22.0",

View File

@@ -1,48 +0,0 @@
<feedback xmlns="urn:ietf:params:xml:ns:dmarc-2.0">
<version>1.0</version>
<report_metadata>
<org_name>Sample Reporter</org_name>
<email>report_sender@example-reporter.com</email>
<extra_contact_info>...</extra_contact_info>
<report_id>3v98abbp8ya9n3va8yr8oa3ya</report_id>
<date_range>
<begin>302832000</begin>
<end>302918399</end>
</date_range>
<generator>Example DMARC Aggregate Reporter v1.2</generator>
</report_metadata>
<policy_published>
<domain>example.com</domain>
<p>quarantine</p>
<sp>none</sp>
<np>none</np>
<testing>n</testing>
<discovery_method>treewalk</discovery_method>
</policy_published>
<record>
<row>
<source_ip>192.0.2.123</source_ip>
<count>123</count>
<policy_evaluated>
<disposition>pass</disposition>
<dkim>pass</dkim>
<spf>fail</spf>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<dkim>
<domain>example.com</domain>
<result>pass</result>
<selector>abc123</selector>
</dkim>
<spf>
<domain>example.com</domain>
<result>fail</result>
</spf>
</auth_results>
</record>
</feedback>

View File

@@ -1,77 +0,0 @@
<?xml version="1.0"?>
<feedback>
<version>2.0</version>
<report_metadata>
<org_name>example.net</org_name>
<email>postmaster@example.net</email>
<report_id>dmarcbis-test-report-001</report_id>
<date_range>
<begin>1700000000</begin>
<end>1700086399</end>
</date_range>
</report_metadata>
<policy_published>
<domain>example.com</domain>
<adkim>s</adkim>
<aspf>s</aspf>
<p>reject</p>
<sp>quarantine</sp>
<np>reject</np>
<testing>y</testing>
<discovery_method>treewalk</discovery_method>
<fo>1</fo>
</policy_published>
<record>
<row>
<source_ip>198.51.100.1</source_ip>
<count>5</count>
<policy_evaluated>
<disposition>none</disposition>
<dkim>pass</dkim>
<spf>pass</spf>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<dkim>
<domain>example.com</domain>
<selector>selector1</selector>
<result>pass</result>
</dkim>
<spf>
<domain>example.com</domain>
<scope>mfrom</scope>
<result>pass</result>
</spf>
</auth_results>
</record>
<record>
<row>
<source_ip>203.0.113.10</source_ip>
<count>2</count>
<policy_evaluated>
<disposition>reject</disposition>
<dkim>fail</dkim>
<spf>fail</spf>
<reason>
<type>other</type>
<comment>sender not authorized</comment>
</reason>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>spoofed.example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<spf>
<domain>spoofed.example.com</domain>
<scope>mfrom</scope>
<result>fail</result>
</spf>
</auth_results>
</record>
</feedback>

View File

@@ -60,10 +60,10 @@ Create Dashboards
9. Click Save
10. Click Dashboards
11. Click Create New Dashboard
12. Use a descriptive title, such as "Failure DMARC Data"
12. Use a descriptive title, such as "Forensic DMARC Data"
13. Click Create Dashboard
14. Click on the Source button
15. Paste the content of ''dmarc_failure_dashboard.xml`` into the source editor
15. Paste the content of ''dmarc_forensic_dashboard.xml`` into the source editor
16. If the index storing the DMARC data is not named email, replace index="email" accordingly
17. Click Save

View File

@@ -1,8 +1,8 @@
<form theme="dark" version="1.1">
<label>Failure DMARC Data</label>
<label>Forensic DMARC Data</label>
<search id="base_search">
<query>
index="email" (sourcetype="dmarc:failure" OR sourcetype="dmarc:forensic") parsed_sample.headers.From=$header_from$ parsed_sample.headers.To=$header_to$ parsed_sample.headers.Subject=$header_subject$ source.ip_address=$source_ip_address$ source.reverse_dns=$source_reverse_dns$ source.country=$source_country$
index="email" sourcetype="dmarc:forensic" parsed_sample.headers.From=$header_from$ parsed_sample.headers.To=$header_to$ parsed_sample.headers.Subject=$header_subject$ source.ip_address=$source_ip_address$ source.reverse_dns=$source_reverse_dns$ source.country=$source_country$
| table *
</query>
<earliest>$time_range.earliest$</earliest>
@@ -43,7 +43,7 @@
</fieldset>
<row>
<panel>
<title>Failure samples</title>
<title>Forensic samples</title>
<table>
<search base="base_search">
<query>| table arrival_date_utc authentication_results parsed_sample.headers.From,parsed_sample.headers.To,parsed_sample.headers.Subject | sort -arrival_date_utc</query>
@@ -59,7 +59,7 @@
</row>
<row>
<panel>
<title>Failure samples by country</title>
<title>Forensic samples by country</title>
<map>
<search base="base_search">
<query>| iplocation source.ip_address| stats count by Country | geom geo_countries featureIdField="Country"</query>
@@ -72,7 +72,7 @@
</row>
<row>
<panel>
<title>Failure samples by IP address</title>
<title>Forensic samples by IP address</title>
<table>
<search base="base_search">
<query>| iplocation source.ip_address | stats count by source.ip_address,source.reverse_dns | sort -count</query>
@@ -85,7 +85,7 @@
</table>
</panel>
<panel>
<title>Failure samples by country ISO code</title>
<title>Forensic samples by country ISO code</title>
<table>
<search base="base_search">
<query>| stats count by source.country | sort - count</query>

3458
tests.py

File diff suppressed because it is too large Load Diff