Compare commits

..

3 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
2eb91ed67d Address code review feedback on logging configuration
- Use exact type check (type(h) is logging.StreamHandler) instead of isinstance
  to avoid confusion with FileHandler subclass
- Catch specific exceptions (IOError, OSError, PermissionError) instead of
  bare Exception when creating FileHandler
- Kept logging.ERROR as default to maintain consistency with existing behavior

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-29 19:50:26 +00:00
copilot-swe-agent[bot]
da2cf46765 Fix logging configuration propagation to child parser processes
- Add _configure_logging() helper function to set up logging in child processes
- Modified cli_parse() to accept log_level and log_file parameters
- Pass current logging configuration from parent to child processes
- Logging warnings/errors from child processes now properly display

Fixes issue where logging handlers in parent process were not inherited by
child processes created via multiprocessing.Process(). Child processes now
configure their own logging with the same settings as the parent.

Tested with sample files and confirmed warnings from DNS exceptions in child
processes are now visible.

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-29 19:48:53 +00:00
copilot-swe-agent[bot]
359b2e9b8c Initial plan 2025-12-29 19:43:34 +00:00
57 changed files with 1334 additions and 6788 deletions

View File

@@ -1,18 +0,0 @@
{
"permissions": {
"allow": [
"Bash(git fetch:*)",
"Bash(python -c \"import py_compile; py_compile.compile\\(''parsedmarc/cli.py'', doraise=True\\)\")",
"Bash(ruff check:*)",
"Bash(ruff format:*)",
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
"Bash(ls tests*)",
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
"Bash(python -m pytest tests.py --no-header -q)"
],
"additionalDirectories": [
"/tmp"
]
}
}

View File

@@ -1,72 +0,0 @@
name: Bug report
description: Report a reproducible parsedmarc bug
title: "[Bug]: "
labels:
- bug
body:
- type: input
id: version
attributes:
label: parsedmarc version
description: Include the parsedmarc version or commit if known.
placeholder: 9.x.x
validations:
required: true
- type: dropdown
id: input_backend
attributes:
label: Input backend
description: Which input path or mailbox backend is involved?
options:
- IMAP
- MS Graph
- Gmail API
- Maildir
- mbox
- Local file / direct parse
- Other
validations:
required: true
- type: textarea
id: environment
attributes:
label: Environment
description: Runtime, container image, OS, Python version, or deployment details.
placeholder: Docker on Debian, Python 3.12, parsedmarc installed from PyPI
validations:
required: true
- type: textarea
id: config
attributes:
label: Sanitized config
description: Include the relevant config fragment with secrets removed.
render: ini
- type: textarea
id: steps
attributes:
label: Steps to reproduce
description: Describe the smallest reproducible sequence you can.
placeholder: |
1. Configure parsedmarc with ...
2. Run ...
3. Observe ...
validations:
required: true
- type: textarea
id: expected_actual
attributes:
label: Expected vs actual behavior
description: What did you expect, and what happened instead?
validations:
required: true
- type: textarea
id: logs
attributes:
label: Logs or traceback
description: Paste sanitized logs or a traceback if available.
render: text
- type: textarea
id: samples
attributes:
label: Sample report availability
description: If you can share a sanitized sample report or message, note that here.

View File

@@ -1,5 +0,0 @@
blank_issues_enabled: true
contact_links:
- name: Security issue
url: https://github.com/domainaware/parsedmarc/security/policy
about: Please use the security policy and avoid filing public issues for undisclosed vulnerabilities.

View File

@@ -1,30 +0,0 @@
name: Feature request
description: Suggest a new feature or behavior change
title: "[Feature]: "
labels:
- enhancement
body:
- type: textarea
id: problem
attributes:
label: Problem statement
description: What workflow or limitation are you trying to solve?
validations:
required: true
- type: textarea
id: proposal
attributes:
label: Proposed behavior
description: Describe the feature or behavior you want.
validations:
required: true
- type: textarea
id: alternatives
attributes:
label: Alternatives considered
description: Describe workarounds or alternative approaches you considered.
- type: textarea
id: impact
attributes:
label: Compatibility or operational impact
description: Note config, output, performance, or deployment implications if relevant.

View File

@@ -1,24 +0,0 @@
## Summary
-
## Why
-
## Testing
-
## Backward Compatibility / Risk
-
## Related Issue
- Closes #
## Checklist
- [ ] Tests added or updated if behavior changed
- [ ] Docs updated if config or user-facing behavior changed

View File

@@ -10,32 +10,7 @@ on:
branches: [ master ]
jobs:
lint-docs-build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.13"
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install .[build]
- name: Check code style
run: |
ruff check .
- name: Test building documentation
run: |
cd docs
make html
- name: Test building packages
run: |
hatch build
test:
needs: lint-docs-build
build:
runs-on: ubuntu-latest
services:
@@ -55,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v5
@@ -71,6 +46,13 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install .[build]
- name: Test building documentation
run: |
cd docs
make html
- name: Check code style
run: |
ruff check .
- name: Run unit tests
run: |
pytest --cov --cov-report=xml tests.py
@@ -78,7 +60,7 @@ jobs:
run: |
pip install -e .
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/failure/*
parsedmarc --debug -c ci.ini samples/forensic/*
- name: Test building packages
run: |
hatch build

2
.gitignore vendored
View File

@@ -137,7 +137,7 @@ samples/private
*.html
*.sqlite-journal
parsedmarc*.ini
parsedmarc.ini
scratch.py
parsedmarc/resources/maps/base_reverse_dns.csv

View File

@@ -1,68 +0,0 @@
# AGENTS.md
This file provides guidance to AI agents when working with code in this repository.
## Project Overview
parsedmarc is a Python module and CLI utility for parsing DMARC aggregate (RUA), failure/forensic (RUF), and SMTP TLS reports. It supports both RFC 7489 and DMARCbis (draft-ietf-dmarc-dmarcbis-41, draft-ietf-dmarc-aggregate-reporting-32, draft-ietf-dmarc-failure-reporting-24) report formats. It reads reports from IMAP, Microsoft Graph, Gmail API, Maildir, mbox files, or direct file paths, and outputs to JSON/CSV, Elasticsearch, OpenSearch, Splunk, Kafka, S3, Azure Log Analytics, syslog, or webhooks.
## Common Commands
```bash
# Install with dev/build dependencies
pip install .[build]
# Run all tests with coverage
pytest --cov --cov-report=xml tests.py
# Run a single test
pytest tests.py::Test::testAggregateSamples
# Lint and format
ruff check .
ruff format .
# Test CLI with sample reports
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/failure/*
# Build docs
cd docs && make html
# Build distribution
hatch build
```
To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
## Architecture
**Data flow:** Input sources → CLI (`cli.py:_main`) → Parse (`__init__.py`) → Enrich (DNS/GeoIP via `utils.py`) → Output integrations
### Key modules
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_failure_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`. Legacy aliases (`parse_forensic_report`, etc.) are preserved for backward compatibility.
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration. Accepts both old (`save_forensic`, `forensic_topic`) and new (`save_failure`, `failure_topic`) config keys.
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `FailureReport`, `SMTPTLSReport`, `ParsingResults`). Legacy alias `ForensicReport = FailureReport` preserved.
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
- `parsedmarc/{elastic,opensearch,splunk,kafkaclient,loganalytics,syslog,s3,webhook,gelf}.py` — Output integrations
### Report type system
`ReportType = Literal["aggregate", "failure", "smtp_tls"]`. Exception hierarchy: `ParserError``InvalidDMARCReport``InvalidAggregateReport`/`InvalidFailureReport`, and `InvalidSMTPTLSReport`. Legacy alias `InvalidForensicReport = InvalidFailureReport` preserved.
### DMARCbis support
Aggregate reports support both RFC 7489 and DMARCbis formats. DMARCbis adds fields: `np` (non-existent subdomain policy), `testing` (replaces `pct`), `discovery_method` (`psl`/`treewalk`), `generator` (report metadata), and `human_result` (DKIM/SPF auth results). `pct` and `fo` default to `None` when absent (DMARCbis drops these). Namespaced XML is handled automatically.
### Caching
IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour (via `ExpiringDict`).
## Code Style
- Ruff for formatting and linting (configured in `.vscode/settings.json`)
- TypedDict for structured data, type hints throughout
- Python ≥3.10 required
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`

View File

@@ -1,142 +1,5 @@
# Changelog
## 10.0.0
### Enhancements
#### Support for DMARCbis reports
New fields from the XSD schema, added to types, parsing, CSV output, and Elasticsearch/OpenSearch mappings:
- `np` — non-existent subdomain policy (`none`/`quarantine`/`reject`)
- `testing` — testing mode flag (`n`/`y`), replaces RFC7489 `pct`
- `discovery_method` — policy discovery method (`psl`/`treewalk`)
- `generator` — report generator software identifier (metadata)
- `human_result` — optional descriptive text on DKIM/SPF auth results
Backwards compatibility to RFC7489 is maintained.
### Breaking changes
#### Forensic reports have been renamed to failure reports
Forensic reports have been renamed to failure reports throughout the project to reflect the proper naming of the reports since RFC7489.
- **Core**: `types.py`, `__init__.py``ForensicReport``FailureReport`, `parse_forensic_report``parse_failure_report`, report type `"failure"`
- **Output modules**: `elastic.py`, `opensearch.py`, `splunk.py`, `kafkaclient.py`, `syslog.py`, `gelf.py`, `webhook.py`, `loganalytics.py`, `s3.py`
- **CLI**: `cli.py` — args, config keys, index names (`dmarc_failure`)
- **Docs & dashboards**: all markdown, Grafana JSON, Kibana NDJSON, Splunk XML
##### Backward compatibility
- Old function/type names preserved as aliases: `parse_forensic_report = parse_failure_report`, `ForensicReport = FailureReport`, etc.
- CLI config accepts both old (`save_forensic`, `forensic_topic`) and new keys (`save_failure`, `failure_topic`)
- RFC 7489 reports parse with `None` for DMARCbis-only fields
- **Updated dashboards with queries are backward compatible**: queries match data indexed under both old (`dmarc_forensic*` / `dmarc:forensic`) and new (`dmarc_failure*` / `dmarc:failure`) names, so dashboards show data from before and after the rename:
- **Kibana**: Index pattern uses `dmarc_f*` to match both `dmarc_forensic*` and `dmarc_failure*`
- **Splunk**: Base search queries `(sourcetype="dmarc:failure" OR sourcetype="dmarc:forensic")`
- **Elasticsearch/OpenSearch**: Duplicate-check searches query across both `dmarc_failure*` and `dmarc_forensic*` index patterns
## 9.3.1
### Breaking changes
- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
### Fixed
- Splunk HEC `skip_certificate_verification` now works correctly
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
## 9.3.0
### Added
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
- Use `systemctl reload parsedmarc` when running under `systemd`.
- On a successful reload, old output clients are closed and recreated.
- On a failed reload, the previous configuration remains fully active.
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
### Fixed
- `get_index_prefix()` crashed on failure reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
## 9.2.1
### Added
- Better checking of `msgraph` configuration (PR #695)
### Changed
- Updated `dbip-country-lite` database to version `2026-03`
- DNS query error logging level from `warning` to `debug`
## 9.2.0
### Added
- OpenSearch AWS SigV4 authentication support (PR #673)
- IMAP move/delete compatibility fallbacks (PR #671)
- `fail_on_output_error` CLI option for sink failures (PR #672)
- Gmail service account auth mode for non-interactive runs (PR #676)
- Microsoft Graph certificate authentication support (PRs #692 and #693)
- Microsoft Graph well-known folder fallback for root listing failures (PR #618 and #684 close #609)
### Fixed
- Pass mailbox since filter through `watch_inbox` callback (PR #670 closes issue #581)
- `parsedmarc.mail.gmail.GmailConnection.delete_message` now properly calls the Gmail API (PR #668)
- Avoid extra mailbox fetch in batch and test mode (PR #691 closes #533)
## 9.1.2
### Fixes
- Fix duplicate detection for normalized aggregate reports in Elasticsearch/OpenSearch (PR #666 fixes issue #665)
## 9.1.1
### Fixes
- Fix the use of Elasticsearch and OpenSearch API keys (PR #660 fixes issue #653)
### Changes
- Drop support for Python 3.9 (PR #661)
## 9.1.0
## Enhancements
- Add TCP and TLS support for syslog output. (#656)
- Skip DNS lookups in GitHub Actions to prevent DNS timeouts during tests timeouts. (#657)
- Remove microseconds from DMARC aggregate report time ranges before parsing them.
## 9.0.10
- Support Python 3.14+
## 9.0.9
### Fixes
- Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
## 9.0.8
### Fixes
- Fix logging configuration not propagating to child parser processes (#646).
- Update `mailsuite` dependency to `?=1.11.1` to solve issues with iCloud IMAP (#493).
## 9.0.7
## Fixes

View File

@@ -1,3 +0,0 @@
# CLAUD.md
@AGENTS.md

View File

@@ -1,78 +0,0 @@
# Contributing
Thanks for contributing to parsedmarc.
## Local setup
Use a virtual environment for local development.
```bash
python3 -m venv .venv
. .venv/bin/activate
python -m pip install --upgrade pip
pip install .[build]
```
## Before opening a pull request
Run the checks that match your change:
```bash
ruff check .
pytest --cov --cov-report=xml tests.py
```
If you changed documentation:
```bash
cd docs
make html
```
If you changed CLI behavior or parsing logic, it is also useful to exercise the
sample reports:
```bash
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/*
```
To skip DNS lookups during tests, set:
```bash
GITHUB_ACTIONS=true
```
## Pull request guidelines
- Keep pull requests small and focused. Separate bug fixes, docs updates, and
repo-maintenance changes where practical.
- Add or update tests when behavior changes.
- Update docs when configuration or user-facing behavior changes.
- Include a short summary, the reason for the change, and the testing you ran.
- Link the related issue when there is one.
## Branch maintenance
Upstream `master` may move quickly. Before asking for review or after another PR
lands, rebase your branch onto the current upstream branch and force-push with
lease if needed:
```bash
git fetch upstream
git rebase upstream/master
git push --force-with-lease
```
## CI and coverage
GitHub Actions is the source of truth for linting, docs, and test status.
Codecov patch coverage is usually the most relevant signal for small PRs. Project
coverage can be noisier when the base comparison is stale, so interpret it in
the context of the actual diff.
## Questions
Use GitHub issues for bugs and feature requests. If you are not sure whether a
change is wanted, opening an issue first is usually the safest path.

View File

@@ -56,9 +56,9 @@ for RHEL or Debian.
| 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies |
| 3.7 | ❌ | End of Life (EOL) |
| 3.8 | ❌ | End of Life (EOL) |
| 3.9 | | Used in Debian 11 and RHEL 9, but not supported by project dependencies |
| 3.9 | | Supported until August 2026 (Debian 11); May 2032 (RHEL 9) |
| 3.10 | ✅ | Actively maintained |
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Supported (requires `imapclient>=3.1.0`) |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|

View File

@@ -1,29 +0,0 @@
# Security Policy
## Reporting a vulnerability
Please do not open a public GitHub issue for an undisclosed security
vulnerability. Use GitHub private vulnerability reporting in the Security tab of this project instead.
When reporting a vulnerability, include:
- the affected parsedmarc version or commit
- the component or integration involved
- clear reproduction details if available
- potential impact
- any suggested mitigation or workaround
## Supported versions
Security fixes will be applied to the latest released version and
the current `master` branch.
Older versions will not receive backported fixes.
## Disclosure process
After a report is received, maintainers can validate the issue, assess impact,
and coordinate a fix before public disclosure.
Please avoid publishing proof-of-concept details until maintainers have had a
reasonable opportunity to investigate and release a fix or mitigation.

1
ci.ini
View File

@@ -3,7 +3,6 @@ save_aggregate = True
save_forensic = True
save_smtp_tls = True
debug = True
offline = True
[elasticsearch]
hosts = http://localhost:9200

View File

@@ -1,11 +0,0 @@
codecov:
require_ci_to_pass: true
coverage:
status:
project:
default:
informational: true
patch:
default:
informational: false

View File

@@ -1,47 +0,0 @@
name: parsedmarc-dashboards
include:
- docker-compose.yml
services:
kibana:
image: docker.elastic.co/kibana/kibana:8.19.7
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "127.0.0.1:5601:5601"
depends_on:
elasticsearch:
condition: service_healthy
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:2
environment:
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
ports:
- "127.0.0.1:5602:5601"
depends_on:
opensearch:
condition: service_healthy
grafana:
image: grafana/grafana:latest
environment:
- GRAFANA_PASSWORD=${GRAFANA_PASSWORD}
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
ports:
- "127.0.0.1:3000:3000"
depends_on:
elasticsearch:
condition: service_healthy
splunk:
image: splunk/splunk:latest
environment:
- SPLUNK_START_ARGS=--accept-license
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
ports:
- "127.0.0.1:8000:8000"
- "127.0.0.1:8088:8088"

View File

@@ -48,7 +48,7 @@ services:
test:
[
"CMD-SHELL",
"curl -sk -u admin:${OPENSEARCH_INITIAL_ADMIN_PASSWORD} -XGET https://localhost:9200/_cluster/health?pretty | grep status | grep -q '\\(green\\|yellow\\)'"
"curl -s -XGET http://localhost:9201/_cluster/health?pretty | grep status | grep -q '\\(green\\|yellow\\)'"
]
interval: 10s
timeout: 10s

View File

@@ -214,7 +214,7 @@ Kibana index patterns with versions that match the upgraded indexes:
1. Login in to Kibana, and click on Management
2. Under Kibana, click on Saved Objects
3. Check the checkboxes for the `dmarc_aggregate` and `dmarc_failure`
3. Check the checkboxes for the `dmarc_aggregate` and `dmarc_forensic`
index patterns
4. Click Delete
5. Click Delete on the conformation message

View File

@@ -2,7 +2,7 @@
[general]
save_aggregate = True
save_failure = True
save_forensic = True
[imap]
host = imap.example.com

View File

@@ -34,7 +34,7 @@ and Valimail.
## Features
- Parses draft and 1.0 standard aggregate/rua DMARC reports
- Parses failure/ruf DMARC reports
- Parses forensic/failure/ruf DMARC reports
- Parses reports from SMTP TLS Reporting
- Can parse reports from an inbox over IMAP, Microsoft Graph, or Gmail API
- Transparently handles gzip or zip compressed reports
@@ -56,12 +56,12 @@ for RHEL or Debian.
| 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies |
| 3.7 | ❌ | End of Life (EOL) |
| 3.8 | ❌ | End of Life (EOL) |
| 3.9 | | Used in Debian 11 and RHEL 9, but not supported by project dependencies |
| 3.9 | | Supported until August 2026 (Debian 11); May 2032 (RHEL 9) |
| 3.10 | ✅ | Actively maintained |
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Supported (requires `imapclient>=3.1.0`) |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
```{toctree}
:caption: 'Contents'

View File

@@ -162,10 +162,10 @@ sudo -u parsedmarc virtualenv /opt/parsedmarc/venv
```
CentOS/RHEL 8 systems use Python 3.6 by default, so on those systems
explicitly tell `virtualenv` to use `python3.10` instead
explicitly tell `virtualenv` to use `python3.9` instead
```bash
sudo -u parsedmarc virtualenv -p python3.10 /opt/parsedmarc/venv
sudo -u parsedmarc virtualenv -p python3.9 /opt/parsedmarc/venv
```
Activate the virtualenv

View File

@@ -74,14 +74,14 @@ the DMARC Summary dashboard. To view failures only, use the pie chart.
Any other filters work the same way. You can also add your own custom temporary
filters by clicking on Add Filter at the upper right of the page.
## DMARC Failure Samples
## DMARC Forensic Samples
The DMARC Failure Samples dashboard contains information on DMARC failure
reports (also known as ruf reports). These reports contain
The DMARC Forensic Samples dashboard contains information on DMARC forensic
reports (also known as failure reports or ruf reports). These reports contain
samples of emails that have failed to pass DMARC.
:::{note}
Most recipients do not send failure/ruf reports at all to avoid
Most recipients do not send forensic/failure/ruf reports at all to avoid
privacy leaks. Some recipients (notably Chinese webmail services) will only
supply the headers of sample emails. Very few provide the entire email.
:::

View File

@@ -96,12 +96,12 @@ draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391
```
## Sample failure report output
## Sample forensic report output
Thanks to GitHub user [xennn](https://github.com/xennn) for the anonymized
[failure report email sample](<https://github.com/domainaware/parsedmarc/raw/master/samples/failure/DMARC%20Failure%20Report%20for%20domain.de%20(mail-from%3Dsharepoint%40domain.de%2C%20ip%3D10.10.10.10).eml>).
[forensic report email sample](<https://github.com/domainaware/parsedmarc/raw/master/samples/forensic/DMARC%20Failure%20Report%20for%20domain.de%20(mail-from%3Dsharepoint%40domain.de%2C%20ip%3D10.10.10.10).eml>).
### JSON failure report
### JSON forensic report
```json
{
@@ -190,7 +190,7 @@ Thanks to GitHub user [xennn](https://github.com/xennn) for the anonymized
}
```
### CSV failure report
### CSV forensic report
```text
feedback_type,user_agent,version,original_envelope_id,original_mail_from,original_rcpt_to,arrival_date,arrival_date_utc,subject,message_id,authentication_results,dkim_domain,source_ip_address,source_country,source_reverse_dns,source_base_domain,delivery_result,auth_failure,reported_domain,authentication_mechanisms,sample_headers_only

View File

@@ -1,10 +1,10 @@
# Splunk
Starting in version 4.3.0 `parsedmarc` supports sending aggregate and/or
failure DMARC data to a Splunk [HTTP Event collector (HEC)].
forensic DMARC data to a Splunk [HTTP Event collector (HEC)].
The project repository contains [XML files] for premade Splunk
dashboards for aggregate and failure DMARC reports.
dashboards for aggregate and forensic DMARC reports.
Copy and paste the contents of each file into a separate Splunk
dashboard XML editor.

View File

@@ -4,9 +4,9 @@
```text
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--failure-json-filename FAILURE_JSON_FILENAME]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME] [--forensic-json-filename FORENSIC_JSON_FILENAME]
[--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME] [--aggregate-csv-filename AGGREGATE_CSV_FILENAME]
[--failure-csv-filename FAILURE_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME]
[--forensic-csv-filename FORENSIC_CSV_FILENAME] [--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME]
[-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] [-s] [-w] [--verbose] [--debug]
[--log-file LOG_FILE] [--no-prettify-json] [-v]
[file_path ...]
@@ -14,26 +14,26 @@ usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT
Parses DMARC reports
positional arguments:
file_path one or more paths to aggregate or failure report files, emails, or mbox files'
file_path one or more paths to aggregate or forensic report files, emails, or mbox files'
options:
-h, --help show this help message and exit
-c CONFIG_FILE, --config-file CONFIG_FILE
a path to a configuration file (--silent implied)
--strip-attachment-payloads
remove attachment payloads from failure report output
remove attachment payloads from forensic report output
-o OUTPUT, --output OUTPUT
write output files to the given directory
--aggregate-json-filename AGGREGATE_JSON_FILENAME
filename for the aggregate JSON output file
--failure-json-filename FAILURE_JSON_FILENAME
filename for the failure JSON output file
--forensic-json-filename FORENSIC_JSON_FILENAME
filename for the forensic JSON output file
--smtp-tls-json-filename SMTP_TLS_JSON_FILENAME
filename for the SMTP TLS JSON output file
--aggregate-csv-filename AGGREGATE_CSV_FILENAME
filename for the aggregate CSV output file
--failure-csv-filename FAILURE_CSV_FILENAME
filename for the failure CSV output file
--forensic-csv-filename FORENSIC_CSV_FILENAME
filename for the forensic CSV output file
--smtp-tls-csv-filename SMTP_TLS_CSV_FILENAME
filename for the SMTP TLS CSV output file
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
@@ -70,7 +70,7 @@ For example
[general]
save_aggregate = True
save_failure = True
save_forensic = True
[imap]
host = imap.example.com
@@ -109,7 +109,7 @@ mode = tcp
[webhook]
aggregate_url = https://aggregate_url.example.com
failure_url = https://failure_url.example.com
forensic_url = https://forensic_url.example.com
smtp_tls_url = https://smtp_tls_url.example.com
timeout = 60
```
@@ -119,7 +119,7 @@ The full set of configuration options are:
- `general`
- `save_aggregate` - bool: Save aggregate report data to
Elasticsearch, Splunk and/or S3
- `save_failure` - bool: Save failure report data to
- `save_forensic` - bool: Save forensic report data to
Elasticsearch, Splunk and/or S3
- `save_smtp_tls` - bool: Save SMTP-STS report data to
Elasticsearch, Splunk and/or S3
@@ -130,7 +130,7 @@ The full set of configuration options are:
- `output` - str: Directory to place JSON and CSV files in. This is required if you set either of the JSON output file options.
- `aggregate_json_filename` - str: filename for the aggregate
JSON output file
- `failure_json_filename` - str: filename for the failure
- `forensic_json_filename` - str: filename for the forensic
JSON output file
- `ip_db_path` - str: An optional custom path to a MMDB file
from MaxMind or DBIP
@@ -146,9 +146,6 @@ The full set of configuration options are:
- `dns_timeout` - float: DNS timeout period
- `debug` - bool: Print debugging messages
- `silent` - bool: Only print errors (Default: `True`)
- `fail_on_output_error` - bool: Exit with a non-zero status code if
any configured output destination fails while saving/publishing
reports (Default: `False`)
- `log_file` - str: Write log messages to a file at this path
- `n_procs` - int: Number of process to run in parallel when
parsing in CLI mode (Default: `1`)
@@ -174,8 +171,8 @@ The full set of configuration options are:
- `check_timeout` - int: Number of seconds to wait for a IMAP
IDLE response or the number of seconds until the next
mail check (Default: `30`)
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
Defaults to `1d` if incorrect value is provided.
- `imap`
- `host` - str: The IMAP server hostname or IP address
@@ -203,7 +200,7 @@ The full set of configuration options are:
- `password` - str: The IMAP password
- `msgraph`
- `auth_method` - str: Authentication method, valid types are
`UsernamePassword`, `DeviceCode`, `ClientSecret`, or `Certificate`
`UsernamePassword`, `DeviceCode`, or `ClientSecret`
(Default: `UsernamePassword`).
- `user` - str: The M365 user, required when the auth method is
UsernamePassword
@@ -211,11 +208,6 @@ The full set of configuration options are:
method is UsernamePassword
- `client_id` - str: The app registration's client ID
- `client_secret` - str: The app registration's secret
- `certificate_path` - str: Path to a PEM or PKCS12 certificate
including the private key. Required when the auth method is
`Certificate`
- `certificate_password` - str: Optional password for the
certificate file when using `Certificate` auth
- `tenant_id` - str: The Azure AD tenant ID. This is required
for all auth methods except UsernamePassword.
- `mailbox` - str: The mailbox name. This defaults to the
@@ -248,14 +240,11 @@ The full set of configuration options are:
group and use that as the group id.
```powershell
New-ApplicationAccessPolicy -AccessRight RestrictAccess
New-ApplicationAccessPolicy -AccessRight RestrictAccess
-AppId "<CLIENT_ID>" -PolicyScopeGroupId "<MAILBOX>"
-Description "Restrict access to dmarc reports mailbox."
```
The same application permission and mailbox scoping guidance
applies to the `Certificate` auth method.
:::
- `elasticsearch`
- `hosts` - str: A comma separated list of hostnames and ports
@@ -273,8 +262,6 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -294,16 +281,10 @@ The full set of configuration options are:
- `user` - str: Basic auth username
- `password` - str: Basic auth password
- `api_key` - str: API key
- `auth_type` - str: Authentication type: `basic` (default) or `awssigv4` (the key `authentication_type` is accepted as an alias for this option)
- `aws_region` - str: AWS region for SigV4 authentication
(required when `auth_type = awssigv4`)
- `aws_service` - str: AWS service for SigV4 signing (Default: `es`)
- `ssl` - bool: Use an encrypted SSL/TLS connection
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -325,7 +306,7 @@ The full set of configuration options are:
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `aggregate_topic` - str: The Kafka topic for aggregate reports
- `failure_topic` - str: The Kafka topic for failure reports
- `forensic_topic` - str: The Kafka topic for forensic reports
- `smtp`
- `host` - str: The SMTP hostname
- `port` - int: The SMTP port (Default: `25`)
@@ -355,78 +336,16 @@ The full set of configuration options are:
- `secret_access_key` - str: The secret access key (Optional)
- `syslog`
- `server` - str: The Syslog server name or IP address
- `port` - int: The port to use (Default: `514`)
- `protocol` - str: The protocol to use: `udp`, `tcp`, or `tls` (Default: `udp`)
- `cafile_path` - str: Path to CA certificate file for TLS server verification (Optional)
- `certfile_path` - str: Path to client certificate file for TLS authentication (Optional)
- `keyfile_path` - str: Path to client private key file for TLS authentication (Optional)
- `timeout` - float: Connection timeout in seconds for TCP/TLS (Default: `5.0`)
- `retry_attempts` - int: Number of retry attempts for failed connections (Default: `3`)
- `retry_delay` - int: Delay in seconds between retry attempts (Default: `5`)
**Example UDP configuration (default):**
```ini
[syslog]
server = syslog.example.com
port = 514
```
**Example TCP configuration:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tcp
timeout = 10.0
retry_attempts = 5
```
**Example TLS configuration with server verification:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
timeout = 10.0
```
**Example TLS configuration with mutual authentication:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
certfile_path = /path/to/client-cert.pem
keyfile_path = /path/to/client-key.pem
timeout = 10.0
retry_attempts = 3
retry_delay = 5
```
- `port` - int: The UDP port to use (Default: `514`)
- `gmail_api`
- `credentials_file` - str: Path to file containing the
credentials, None to disable (Default: `None`)
- `token_file` - str: Path to save the token file
(Default: `.token`)
- `auth_mode` - str: Authentication mode, `installed_app` (default)
or `service_account`
- `service_account_user` - str: Delegated mailbox user for Gmail
service account auth (required for domain-wide delegation). Also
accepted as `delegated_user` for backward compatibility.
:::{note}
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
:::
:::{note}
When `auth_mode = service_account`, `credentials_file` must point to a
Google service account key JSON file, and `token_file` is not used.
:::
- `include_spam_trash` - bool: Include messages in Spam and
Trash when searching reports (Default: `False`)
- `scopes` - str: Comma separated list of scopes to use when
@@ -443,11 +362,11 @@ The full set of configuration options are:
- `dce` - str: The Data Collection Endpoint (DCE). Example: `https://{DCE-NAME}.{REGION}.ingest.monitor.azure.com`.
- `dcr_immutable_id` - str: The immutable ID of the Data Collection Rule (DCR)
- `dcr_aggregate_stream` - str: The stream name for aggregate reports in the DCR
- `dcr_failure_stream` - str: The stream name for the failure reports in the DCR
- `dcr_forensic_stream` - str: The stream name for the forensic reports in the DCR
- `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR
:::{note}
Information regarding the setup of the Data Collection Rule can be found [in the Azure documentation](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
:::
- `gelf`
- `host` - str: The GELF server name or IP address
@@ -460,7 +379,7 @@ The full set of configuration options are:
- `webhook` - Post the individual reports to a webhook url with the report as the JSON body
- `aggregate_url` - str: URL of the webhook which should receive the aggregate reports
- `failure_url` - str: URL of the webhook which should receive the failure reports
- `forensic_url` - str: URL of the webhook which should receive the forensic reports
- `smtp_tls_url` - str: URL of the webhook which should receive the smtp_tls reports
- `timeout` - int: Interval in which the webhook call should timeout
@@ -475,26 +394,26 @@ blocks DNS requests to outside resolvers.
:::
:::{note}
`save_aggregate` and `save_failure` are separate options
because you may not want to save failure reports
(formerly known as forensic reports) to your Elasticsearch instance,
`save_aggregate` and `save_forensic` are separate options
because you may not want to save forensic reports
(also known as failure reports) to your Elasticsearch instance,
particularly if you are in a highly-regulated industry that
handles sensitive data, such as healthcare or finance. If your
legitimate outgoing email fails DMARC, it is possible
that email may appear later in a failure report.
that email may appear later in a forensic report.
Failure reports contain the original headers of an email that
Forensic reports contain the original headers of an email that
failed a DMARC check, and sometimes may also include the
full message body, depending on the policy of the reporting
organization.
Most reporting organizations do not send failure reports of any
Most reporting organizations do not send forensic reports of any
kind for privacy reasons. While aggregate DMARC reports are sent
at least daily, it is normal to receive very few failure reports.
at least daily, it is normal to receive very few forensic reports.
An alternative approach is to still collect failure/ruf
An alternative approach is to still collect forensic/failure/ruf
reports in your DMARC inbox, but run `parsedmarc` with
```save_failure = True``` manually on a separate IMAP folder (using
```save_forensic = True``` manually on a separate IMAP folder (using
the ```reports_folder``` option), after you have manually moved
known samples you want to save to that folder
(e.g. malicious samples and non-sensitive legitimate samples).
@@ -523,7 +442,7 @@ Update the limit to 2k per example:
PUT _cluster/settings
{
"persistent" : {
"cluster.max_shards_per_node" : 2000
"cluster.max_shards_per_node" : 2000
}
}
```
@@ -531,33 +450,6 @@ PUT _cluster/settings
Increasing this value increases resource usage.
:::
## Performance tuning
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
of memory, especially when it runs on the same host as Elasticsearch or
OpenSearch. The following settings can reduce peak memory usage and make long
imports more predictable:
- Reduce `mailbox.batch_size` to smaller values such as `100-500` instead of
processing a very large message set at once. Smaller batches trade throughput
for lower peak memory use and less sink pressure.
- Keep `n_procs` low for mailbox-heavy runs. In practice, `1-2` workers is often
a safer starting point for large backfills than aggressive parallelism.
- Use `mailbox.since` to process reports in smaller time windows such as `1d`,
`7d`, or another interval that fits the backlog. This makes it easier to catch
up incrementally instead of loading an entire mailbox history in one run.
- Set `strip_attachment_payloads = True` when forensic reports contain large
attachments and you do not need to retain the raw payloads in the parsed
output.
- Prefer running parsedmarc separately from Elasticsearch or OpenSearch, or
reserve enough RAM for both services if they must share a host.
- For very large imports, prefer incremental supervised runs, such as a
scheduler or systemd service, over infrequent massive backfills.
These are operational tuning recommendations rather than hard requirements, but
they are often enough to avoid memory pressure and reduce failures during
high-volume mailbox processing.
## Multi-tenant support
Starting in `8.19.0`, ParseDMARC provides multi-tenant support by placing data into separate OpenSearch or Elasticsearch index prefixes. To set this up, create a YAML file that is formatted where each key is a tenant name, and the value is a list of domains related to that tenant, not including subdomains, like this:
@@ -607,7 +499,6 @@ After=network.target network-online.target elasticsearch.service
[Service]
ExecStart=/opt/parsedmarc/venv/bin/parsedmarc -c /etc/parsedmarc.ini
ExecReload=/bin/kill -HUP $MAINPID
User=parsedmarc
Group=parsedmarc
Restart=always
@@ -640,51 +531,6 @@ sudo service parsedmarc restart
:::
### Reloading configuration without restarting
When running in watch mode, `parsedmarc` supports reloading its
configuration file without restarting the service or interrupting
report processing that is already in progress. Send a `SIGHUP` signal
to the process, or use `systemctl reload` if the unit file includes
the `ExecReload` line shown above:
```bash
sudo systemctl reload parsedmarc
```
The reload takes effect after the current batch of reports finishes
processing and all output operations (Elasticsearch, Kafka, S3, etc.)
for that batch have completed. The following settings are reloaded:
- All output destinations (Elasticsearch, OpenSearch, Kafka, S3,
Splunk, syslog, GELF, webhooks, Log Analytics)
- Multi-tenant index prefix domain map (`index_prefix_domain_map` —
the referenced YAML file is re-read on reload)
- DNS and GeoIP settings (`nameservers`, `dns_timeout`, `ip_db_path`,
`offline`, etc.)
- Processing flags (`strip_attachment_payloads`, `batch_size`,
`check_timeout`, etc.)
- Log level (`debug`, `verbose`, `warnings`, `silent`)
Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
Gmail API, Maildir path) are **not** reloaded — changing those still
requires a full restart.
On a **successful** reload, existing output client connections are
closed and new ones are created from the updated configuration. The
service then resumes watching with the new settings.
If the new configuration file contains errors (missing required
settings, unreachable output destinations, etc.), the **entire reload
is aborted** — no output clients are replaced and the previous
configuration remains fully active. This means a typo in one section
will not take down an otherwise working setup. Check the logs for
details:
```bash
journalctl -u parsedmarc.service -r
```
To check the status of the service, run:
```bash

View File

@@ -83,7 +83,7 @@
"id": 28,
"panels": [
{
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Failure Samples\r\nThe DMARC Failure Samples section contains information on DMARC failure reports (also known as ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is “External Destination Verification”?](https://dmarcian.com/what-is-external-destination-verification/)",
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Forensic Samples\r\nThe DMARC Forensic Samples section contains information on DMARC forensic reports (also known as failure reports or ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send forensic/failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is “External Destination Verification”?](https://dmarcian.com/what-is-external-destination-verification/)",
"datasource": null,
"fieldConfig": {
"defaults": {
@@ -101,7 +101,7 @@
"links": [],
"mode": "markdown",
"options": {
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Failure Samples\r\nThe DMARC Failure Samples section contains information on DMARC failure reports (also known as ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is “External Destination Verification”?](https://dmarcian.com/what-is-external-destination-verification/)",
"content": "# DMARC Summary\r\nAs the name suggests, this dashboard is the best place to start reviewing your aggregate DMARC data.\r\n\r\nAcross the top of the dashboard, three pie charts display the percentage of alignment pass/fail for SPF, DKIM, and DMARC. Clicking on any chart segment will filter for that value.\r\n\r\n***Note***\r\nMessages should not be considered malicious just because they failed to pass DMARC; especially if you have just started collecting data. It may be a legitimate service that needs SPF and DKIM configured correctly.\r\n\r\nStart by filtering the results to only show failed DKIM alignment. While DMARC passes if a message passes SPF or DKIM alignment, only DKIM alignment remains valid when a message is forwarded without changing the from address, which is often caused by a mailbox forwarding rule. This is because DKIM signatures are part of the message headers, whereas SPF relies on SMTP session headers.\r\n\r\nUnderneath the pie charts. you can see graphs of DMARC passage and message disposition over time.\r\n\r\nUnder the graphs you will find the most useful data tables on the dashboard. On the left, there is a list of organizations that are sending you DMARC reports. In the center, there is a list of sending servers grouped by the base domain in their reverse DNS. On the right, there is a list of email from domains, sorted by message volume.\r\n\r\nBy hovering your mouse over a data table value and using the magnifying glass icons, you can filter on or filter out different values. Start by looking at the Message Sources by Reverse DNS table. Find a sender that you recognize, such as an email marketing service, hover over it, and click on the plus (+) magnifying glass icon, to add a filter that only shows results for that sender. Now, look at the Message From Header table to the right. That shows you the domains that a sender is sending as, which might tell you which brand/business is using a particular service. With that information, you can contact them and have them set up DKIM.\r\n\r\n***Note***\r\nIf you have a lot of B2C customers, you may see a high volume of emails as your domains coming from consumer email services, such as Google/Gmail and Yahoo! This occurs when customers have mailbox rules in place that forward emails from an old account to a new account, which is why DKIM authentication is so important, as mentioned earlier. Similar patterns may be observed with businesses who send from reverse DNS addressees of parent, subsidiary, and outdated brands.\r\n\r\n***Note***\r\nYou can add your own custom temporary filters by clicking on Add Filter at the upper right of the page.\r\n\r\n# DMARC Forensic Samples\r\nThe DMARC Forensic Samples section contains information on DMARC forensic reports (also known as failure reports or ruf reports). These reports contain samples of emails that have failed to pass DMARC.\r\n\r\n***Note***\r\nMost recipients do not send forensic/failure/ruf reports at all to avoid privacy leaks. Some recipients (notably Chinese webmail services) will only supply the headers of sample emails. Very few provide the entire email.\r\n\r\n# DMARC Alignment Guide\r\nDMARC ensures that SPF and DKIM authentication mechanisms actually authenticate against the same domain that the end user sees.\r\n\r\nA message passes a DMARC check by passing DKIM or SPF, **as long as the related indicators are also in alignment.**\r\n\r\n| \t| DKIM \t| SPF \t|\r\n|-----------\t|--------------------------------------------------------------------------------------------------------------------------------------------------\t|----------------------------------------------------------------------------------------------------------------\t|\r\n| **Passing** \t| The signature in the DKIM header is validated using a public key that is published as a DNS record of the domain name specified in the signature \t| The mail server's IP address is listed in the SPF record of the domain in the SMTP envelope's mail from header \t|\r\n| **Alignment** \t| The signing domain aligns with the domain in the message's from header \t| The domain in the SMTP envelope's mail from header aligns with the domain in the message's from header \t|\r\n\r\n\r\n# Further Reading\r\n[Demystifying DMARC: A guide to preventing email spoofing](https://seanthegeek.net/459/demystifying-dmarc/amp/)\r\n\r\n[DMARC Manual](https://menainfosec.com/wp-content/uploads/2017/12/DMARC_Service_Manual.pdf)\r\n\r\n[What is “External Destination Verification”?](https://dmarcian.com/what-is-external-destination-verification/)",
"mode": "markdown"
},
"pluginVersion": "7.1.0",

File diff suppressed because one or more lines are too long

View File

@@ -48,8 +48,7 @@ from parsedmarc.mail import (
)
from parsedmarc.types import (
AggregateReport,
FailureReport,
ForensicReport as ForensicReport,
ForensicReport,
ParsedReport,
ParsingResults,
SMTPTLSReport,
@@ -74,7 +73,6 @@ text_report_regex = re.compile(r"\s*([a-zA-Z\s]+):\s(.+)", re.MULTILINE)
MAGIC_ZIP = b"\x50\x4b\x03\x04"
MAGIC_GZIP = b"\x1f\x8b"
MAGIC_XML = b"\x3c\x3f\x78\x6d\x6c\x20"
MAGIC_XML_TAG = b"\x3c" # '<' - XML starting with an element tag (no declaration)
MAGIC_JSON = b"\7b"
EMAIL_SAMPLE_CONTENT_TYPES = (
@@ -109,12 +107,8 @@ class InvalidAggregateReport(InvalidDMARCReport):
"""Raised when an invalid DMARC aggregate report is encountered"""
class InvalidFailureReport(InvalidDMARCReport):
"""Raised when an invalid DMARC failure report is encountered"""
# Backward-compatible alias
InvalidForensicReport = InvalidFailureReport
class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered"""
def _bucket_interval_by_day(
@@ -354,6 +348,8 @@ def _parse_report_record(
}
if "disposition" in policy_evaluated:
new_policy_evaluated["disposition"] = policy_evaluated["disposition"]
if new_policy_evaluated["disposition"].strip().lower() == "pass":
new_policy_evaluated["disposition"] = "none"
if "dkim" in policy_evaluated:
new_policy_evaluated["dkim"] = policy_evaluated["dkim"]
if "spf" in policy_evaluated:
@@ -414,7 +410,6 @@ def _parse_report_record(
new_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_result["human_result"] = result.get("human_result", None)
new_record["auth_results"]["dkim"].append(new_result)
if not isinstance(auth_results["spf"], list):
@@ -430,7 +425,6 @@ def _parse_report_record(
new_result["result"] = result["result"]
else:
new_result["result"] = "none"
new_result["human_result"] = result.get("human_result", None)
new_record["auth_results"]["spf"].append(new_result)
if "envelope_from" not in new_record["identifiers"]:
@@ -757,8 +751,8 @@ def parse_aggregate_report_xml(
new_report_metadata["report_id"] = report_id
date_range = report["report_metadata"]["date_range"]
begin_ts = int(date_range["begin"].split(".")[0])
end_ts = int(date_range["end"].split(".")[0])
begin_ts = int(date_range["begin"])
end_ts = int(date_range["end"])
span_seconds = end_ts - begin_ts
normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600
@@ -783,10 +777,6 @@ def parse_aggregate_report_xml(
else:
errors = report["report_metadata"]["error"]
new_report_metadata["errors"] = errors
generator = None
if "generator" in report_metadata:
generator = report_metadata["generator"]
new_report_metadata["generator"] = generator
new_report["report_metadata"] = new_report_metadata
records = []
policy_published = report["policy_published"]
@@ -810,39 +800,16 @@ def parse_aggregate_report_xml(
if policy_published["sp"] is not None:
sp = policy_published["sp"]
new_policy_published["sp"] = sp
pct = None
pct = "100"
if "pct" in policy_published:
if policy_published["pct"] is not None:
pct = policy_published["pct"]
new_policy_published["pct"] = pct
fo = None
fo = "0"
if "fo" in policy_published:
if policy_published["fo"] is not None:
fo = policy_published["fo"]
new_policy_published["fo"] = fo
np_ = None
if "np" in policy_published:
if policy_published["np"] is not None:
np_ = policy_published["np"]
if np_ not in ("none", "quarantine", "reject"):
logger.warning("Invalid np value: {0}".format(np_))
new_policy_published["np"] = np_
testing = None
if "testing" in policy_published:
if policy_published["testing"] is not None:
testing = policy_published["testing"]
if testing not in ("n", "y"):
logger.warning("Invalid testing value: {0}".format(testing))
new_policy_published["testing"] = testing
discovery_method = None
if "discovery_method" in policy_published:
if policy_published["discovery_method"] is not None:
discovery_method = policy_published["discovery_method"]
if discovery_method not in ("psl", "treewalk"):
logger.warning(
"Invalid discovery_method value: {0}".format(discovery_method)
)
new_policy_published["discovery_method"] = discovery_method
new_report["policy_published"] = new_policy_published
if type(report["record"]) is list:
@@ -925,11 +892,7 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
try:
if isinstance(content, str):
try:
file_object = BytesIO(
b64decode(
content.replace("\n", "").replace("\r", ""), validate=True
)
)
file_object = BytesIO(b64decode(content))
except binascii.Error:
return content
header = file_object.read(6)
@@ -975,7 +938,6 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
)
elif (
header[: len(MAGIC_XML)] == MAGIC_XML
or header[: len(MAGIC_XML_TAG)] == MAGIC_XML_TAG
or header[: len(MAGIC_JSON)] == MAGIC_JSON
):
report = file_object.read().decode(errors="ignore")
@@ -996,12 +958,10 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
return report
def extract_report_from_file_path(
file_path: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]],
) -> str:
def extract_report_from_file_path(file_path: str):
"""Extracts report from a file at the given file_path"""
try:
with open(os.fspath(file_path), "rb") as report_file:
with open(file_path, "rb") as report_file:
return extract_report(report_file.read())
except FileNotFoundError:
raise ParserError("File was not found")
@@ -1101,9 +1061,6 @@ def parsed_aggregate_reports_to_csv_rows(
sp = report["policy_published"]["sp"]
pct = report["policy_published"]["pct"]
fo = report["policy_published"]["fo"]
np_ = report["policy_published"].get("np", None)
testing = report["policy_published"].get("testing", None)
discovery_method = report["policy_published"].get("discovery_method", None)
report_dict: dict[str, Any] = dict(
xml_schema=xml_schema,
@@ -1120,11 +1077,8 @@ def parsed_aggregate_reports_to_csv_rows(
aspf=aspf,
p=p,
sp=sp,
np=np_,
pct=pct,
fo=fo,
testing=testing,
discovery_method=discovery_method,
)
for record in report["records"]:
@@ -1220,11 +1174,8 @@ def parsed_aggregate_reports_to_csv(
"aspf",
"p",
"sp",
"np",
"pct",
"fo",
"testing",
"discovery_method",
"source_ip_address",
"source_country",
"source_reverse_dns",
@@ -1262,7 +1213,7 @@ def parsed_aggregate_reports_to_csv(
return csv_file_object.getvalue()
def parse_failure_report(
def parse_forensic_report(
feedback_report: str,
sample: str,
msg_date: datetime,
@@ -1275,9 +1226,9 @@ def parse_failure_report(
nameservers: Optional[list[str]] = None,
dns_timeout: float = 2.0,
strip_attachment_payloads: bool = False,
) -> FailureReport:
) -> ForensicReport:
"""
Converts a DMARC failure report and sample to a dict
Converts a DMARC forensic report and sample to a dict
Args:
feedback_report (str): A message's feedback report as a string
@@ -1292,7 +1243,7 @@ def parse_failure_report(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
Returns:
dict: A parsed report and sample
@@ -1308,7 +1259,7 @@ def parse_failure_report(
if "arrival_date" not in parsed_report:
if msg_date is None:
raise InvalidFailureReport("Failure sample is not a valid email")
raise InvalidForensicReport("Forensic sample is not a valid email")
parsed_report["arrival_date"] = msg_date.isoformat()
if "version" not in parsed_report:
@@ -1394,27 +1345,27 @@ def parse_failure_report(
parsed_report["sample"] = sample
parsed_report["parsed_sample"] = parsed_sample
return cast(FailureReport, parsed_report)
return cast(ForensicReport, parsed_report)
except KeyError as error:
raise InvalidFailureReport("Missing value: {0}".format(error.__str__()))
raise InvalidForensicReport("Missing value: {0}".format(error.__str__()))
except Exception as error:
raise InvalidFailureReport("Unexpected error: {0}".format(error.__str__()))
raise InvalidForensicReport("Unexpected error: {0}".format(error.__str__()))
def parsed_failure_reports_to_csv_rows(
reports: Union[FailureReport, list[FailureReport]],
def parsed_forensic_reports_to_csv_rows(
reports: Union[ForensicReport, list[ForensicReport]],
) -> list[dict[str, Any]]:
"""
Converts one or more parsed failure reports to a list of dicts in flat CSV
Converts one or more parsed forensic reports to a list of dicts in flat CSV
format
Args:
reports: A parsed failure report or list of parsed failure reports
reports: A parsed forensic report or list of parsed forensic reports
Returns:
list: Parsed failure report data as a list of dicts in flat CSV format
list: Parsed forensic report data as a list of dicts in flat CSV format
"""
if isinstance(reports, dict):
reports = [reports]
@@ -1441,18 +1392,18 @@ def parsed_failure_reports_to_csv_rows(
return rows
def parsed_failure_reports_to_csv(
reports: Union[FailureReport, list[FailureReport]],
def parsed_forensic_reports_to_csv(
reports: Union[ForensicReport, list[ForensicReport]],
) -> str:
"""
Converts one or more parsed failure reports to flat CSV format, including
Converts one or more parsed forensic reports to flat CSV format, including
headers
Args:
reports: A parsed failure report or list of parsed failure reports
reports: A parsed forensic report or list of parsed forensic reports
Returns:
str: Parsed failure report data in flat CSV format, including headers
str: Parsed forensic report data in flat CSV format, including headers
"""
fields = [
"feedback_type",
@@ -1484,7 +1435,7 @@ def parsed_failure_reports_to_csv(
csv_writer = DictWriter(csv_file, fieldnames=fields)
csv_writer.writeheader()
rows = parsed_failure_reports_to_csv_rows(reports)
rows = parsed_forensic_reports_to_csv_rows(reports)
for row in rows:
new_row: dict[str, Any] = {}
@@ -1522,13 +1473,13 @@ def parse_report_email(
nameservers (list): A list of one or more nameservers to use
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
keep_alive (callable): keep alive function
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict:
* ``report_type``: ``aggregate`` or ``failure``
* ``report_type``: ``aggregate`` or ``forensic``
* ``report``: The parsed report
"""
result: Optional[ParsedReport] = None
@@ -1671,7 +1622,7 @@ def parse_report_email(
if feedback_report and sample:
try:
failure_report = parse_failure_report(
forensic_report = parse_forensic_report(
feedback_report,
sample,
msg_date,
@@ -1684,17 +1635,17 @@ def parse_report_email(
dns_timeout=dns_timeout,
strip_attachment_payloads=strip_attachment_payloads,
)
except InvalidFailureReport as e:
except InvalidForensicReport as e:
error = (
'Message with subject "{0}" '
"is not a valid "
"failure DMARC report: {1}".format(subject, e)
"forensic DMARC report: {1}".format(subject, e)
)
raise InvalidFailureReport(error)
raise InvalidForensicReport(error)
except Exception as e:
raise InvalidFailureReport(e.__str__())
raise InvalidForensicReport(e.__str__())
result = {"report_type": "failure", "report": failure_report}
result = {"report_type": "forensic", "report": forensic_report}
return result
if result is None:
@@ -1705,7 +1656,7 @@ def parse_report_email(
def parse_report_file(
input_: Union[bytes, str, os.PathLike[str], os.PathLike[bytes], BinaryIO],
input_: Union[bytes, str, BinaryIO],
*,
nameservers: Optional[list[str]] = None,
dns_timeout: float = 2.0,
@@ -1718,17 +1669,16 @@ def parse_report_file(
keep_alive: Optional[Callable] = None,
normalize_timespan_threshold_hours: float = 24,
) -> ParsedReport:
"""Parses a DMARC aggregate or failure file at the given path, a
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
Args:
input_ (str | os.PathLike | bytes | BinaryIO): A path to a file,
a file-like object, or bytes
input_ (str | bytes | BinaryIO): A path to a file, a file like object, or bytes
nameservers (list): A list of one or more nameservers to use
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
ip_db_path (str): Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map
@@ -1740,10 +1690,9 @@ def parse_report_file(
dict: The parsed DMARC report
"""
file_object: BinaryIO
if isinstance(input_, (str, os.PathLike)):
file_path = os.fspath(input_)
logger.debug("Parsing {0}".format(file_path))
file_object = open(file_path, "rb")
if isinstance(input_, str):
logger.debug("Parsing {0}".format(input_))
file_object = open(input_, "rb")
elif isinstance(input_, (bytes, bytearray, memoryview)):
file_object = BytesIO(bytes(input_))
else:
@@ -1819,7 +1768,7 @@ def get_dmarc_reports_from_mbox(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
always_use_local_files (bool): Do not download files
reverse_dns_map_path (str): Path to a reverse DNS map file
reverse_dns_map_url (str): URL to a reverse DNS map file
@@ -1828,11 +1777,11 @@ def get_dmarc_reports_from_mbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
"""
aggregate_reports: list[AggregateReport] = []
failure_reports: list[FailureReport] = []
forensic_reports: list[ForensicReport] = []
smtp_tls_reports: list[SMTPTLSReport] = []
try:
mbox = mailbox.mbox(input_)
@@ -1869,8 +1818,8 @@ def get_dmarc_reports_from_mbox(
"Skipping duplicate aggregate report "
f"from {report_org} with ID: {report_id}"
)
elif parsed_email["report_type"] == "failure":
failure_reports.append(parsed_email["report"])
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
elif parsed_email["report_type"] == "smtp_tls":
smtp_tls_reports.append(parsed_email["report"])
except InvalidDMARCReport as error:
@@ -1879,7 +1828,7 @@ def get_dmarc_reports_from_mbox(
raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_))
return {
"aggregate_reports": aggregate_reports,
"failure_reports": failure_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
@@ -1922,7 +1871,7 @@ def get_dmarc_reports_from_mailbox(
nameservers (list): A list of DNS nameservers to query
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Remove attachment payloads from
failure report results
forensic report results
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
(use 0 for no limit)
@@ -1933,7 +1882,7 @@ def get_dmarc_reports_from_mailbox(
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
Returns:
dict: Lists of ``aggregate_reports``, ``failure_reports``, and ``smtp_tls_reports``
dict: Lists of ``aggregate_reports``, ``forensic_reports``, and ``smtp_tls_reports``
"""
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
@@ -1945,25 +1894,25 @@ def get_dmarc_reports_from_mailbox(
current_time: Optional[Union[datetime, date, str]] = None
aggregate_reports: list[AggregateReport] = []
failure_reports: list[FailureReport] = []
forensic_reports: list[ForensicReport] = []
smtp_tls_reports: list[SMTPTLSReport] = []
aggregate_report_msg_uids = []
failure_report_msg_uids = []
forensic_report_msg_uids = []
smtp_tls_msg_uids = []
aggregate_reports_folder = "{0}/Aggregate".format(archive_folder)
failure_reports_folder = "{0}/Forensic".format(archive_folder)
forensic_reports_folder = "{0}/Forensic".format(archive_folder)
smtp_tls_reports_folder = "{0}/SMTP-TLS".format(archive_folder)
invalid_reports_folder = "{0}/Invalid".format(archive_folder)
if results:
aggregate_reports = results["aggregate_reports"].copy()
failure_reports = results["failure_reports"].copy()
forensic_reports = results["forensic_reports"].copy()
smtp_tls_reports = results["smtp_tls_reports"].copy()
if not test and create_folders:
connection.create_folder(archive_folder)
connection.create_folder(aggregate_reports_folder)
connection.create_folder(failure_reports_folder)
connection.create_folder(forensic_reports_folder)
connection.create_folder(smtp_tls_reports_folder)
connection.create_folder(invalid_reports_folder)
@@ -2067,9 +2016,9 @@ def get_dmarc_reports_from_mailbox(
f"Skipping duplicate aggregate report with ID: {report_id}"
)
aggregate_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "failure":
failure_reports.append(parsed_email["report"])
failure_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
forensic_report_msg_uids.append(message_id)
elif parsed_email["report_type"] == "smtp_tls":
smtp_tls_reports.append(parsed_email["report"])
smtp_tls_msg_uids.append(message_id)
@@ -2096,7 +2045,7 @@ def get_dmarc_reports_from_mailbox(
if not test:
if delete:
processed_messages = (
aggregate_report_msg_uids + failure_report_msg_uids + smtp_tls_msg_uids
aggregate_report_msg_uids + forensic_report_msg_uids + smtp_tls_msg_uids
)
number_of_processed_msgs = len(processed_messages)
@@ -2136,24 +2085,24 @@ def get_dmarc_reports_from_mailbox(
message = "Error moving message UID"
e = "{0} {1}: {2}".format(message, msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
if len(failure_report_msg_uids) > 0:
message = "Moving failure report messages from"
if len(forensic_report_msg_uids) > 0:
message = "Moving forensic report messages from"
logger.debug(
"{0} {1} to {2}".format(
message, reports_folder, failure_reports_folder
message, reports_folder, forensic_reports_folder
)
)
number_of_failure_msgs = len(failure_report_msg_uids)
for i in range(number_of_failure_msgs):
msg_uid = failure_report_msg_uids[i]
number_of_forensic_msgs = len(forensic_report_msg_uids)
for i in range(number_of_forensic_msgs):
msg_uid = forensic_report_msg_uids[i]
message = "Moving message"
logger.debug(
"{0} {1} of {2}: UID {3}".format(
message, i + 1, number_of_failure_msgs, msg_uid
message, i + 1, number_of_forensic_msgs, msg_uid
)
)
try:
connection.move_message(msg_uid, failure_reports_folder)
connection.move_message(msg_uid, forensic_reports_folder)
except Exception as e:
e = "Error moving message UID {0}: {1}".format(msg_uid, e)
logger.error("Mailbox error: {0}".format(e))
@@ -2180,21 +2129,18 @@ def get_dmarc_reports_from_mailbox(
logger.error("Mailbox error: {0}".format(e))
results = {
"aggregate_reports": aggregate_reports,
"failure_reports": failure_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
if not test and not batch_size:
if current_time:
total_messages = len(
connection.fetch_messages(reports_folder, since=current_time)
)
else:
total_messages = len(connection.fetch_messages(reports_folder))
if current_time:
total_messages = len(
connection.fetch_messages(reports_folder, since=current_time)
)
else:
total_messages = 0
total_messages = len(connection.fetch_messages(reports_folder))
if total_messages > 0:
if not test and not batch_size and total_messages > 0:
# Process emails that came in during the last run
results = get_dmarc_reports_from_mailbox(
connection=connection,
@@ -2236,9 +2182,7 @@ def watch_inbox(
dns_timeout: float = 6.0,
strip_attachment_payloads: bool = False,
batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24,
config_reloading: Optional[Callable] = None,
):
"""
Watches the mailbox for new messages and
@@ -2262,12 +2206,9 @@ def watch_inbox(
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Replace attachment payloads in
failure report samples with None
forensic report samples with None
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
config_reloading: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
"""
def check_callback(connection):
@@ -2286,27 +2227,19 @@ def watch_inbox(
dns_timeout=dns_timeout,
strip_attachment_payloads=strip_attachment_payloads,
batch_size=batch_size,
since=since,
create_folders=False,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
callback(res)
watch_kwargs: dict = {
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if config_reloading is not None:
watch_kwargs["config_reloading"] = config_reloading
mailbox_connection.watch(**watch_kwargs)
mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout)
def append_json(
filename: str,
reports: Union[
Sequence[AggregateReport],
Sequence[FailureReport],
Sequence[ForensicReport],
Sequence[SMTPTLSReport],
],
) -> None:
@@ -2349,10 +2282,10 @@ def save_output(
*,
output_directory: str = "output",
aggregate_json_filename: str = "aggregate.json",
failure_json_filename: str = "failure.json",
forensic_json_filename: str = "forensic.json",
smtp_tls_json_filename: str = "smtp_tls.json",
aggregate_csv_filename: str = "aggregate.csv",
failure_csv_filename: str = "failure.csv",
forensic_csv_filename: str = "forensic.csv",
smtp_tls_csv_filename: str = "smtp_tls.csv",
):
"""
@@ -2362,15 +2295,15 @@ def save_output(
results: Parsing results
output_directory (str): The path to the directory to save in
aggregate_json_filename (str): Filename for the aggregate JSON file
failure_json_filename (str): Filename for the failure JSON file
forensic_json_filename (str): Filename for the forensic JSON file
smtp_tls_json_filename (str): Filename for the SMTP TLS JSON file
aggregate_csv_filename (str): Filename for the aggregate CSV file
failure_csv_filename (str): Filename for the failure CSV file
forensic_csv_filename (str): Filename for the forensic CSV file
smtp_tls_csv_filename (str): Filename for the SMTP TLS CSV file
"""
aggregate_reports = results["aggregate_reports"]
failure_reports = results["failure_reports"]
forensic_reports = results["forensic_reports"]
smtp_tls_reports = results["smtp_tls_reports"]
output_directory = os.path.expanduser(output_directory)
@@ -2389,11 +2322,13 @@ def save_output(
parsed_aggregate_reports_to_csv(aggregate_reports),
)
append_json(os.path.join(output_directory, failure_json_filename), failure_reports)
append_json(
os.path.join(output_directory, forensic_json_filename), forensic_reports
)
append_csv(
os.path.join(output_directory, failure_csv_filename),
parsed_failure_reports_to_csv(failure_reports),
os.path.join(output_directory, forensic_csv_filename),
parsed_forensic_reports_to_csv(forensic_reports),
)
append_json(
@@ -2410,10 +2345,10 @@ def save_output(
os.makedirs(samples_directory)
sample_filenames = []
for failure_report in failure_reports:
sample = failure_report["sample"]
for forensic_report in forensic_reports:
sample = forensic_report["sample"]
message_count = 0
parsed_sample = failure_report["parsed_sample"]
parsed_sample = forensic_report["parsed_sample"]
subject = (
parsed_sample.get("filename_safe_subject")
or parsed_sample.get("subject")
@@ -2547,9 +2482,3 @@ def email_results(
attachments=attachments,
plain_message=message,
)
# Backward-compatible aliases
parse_forensic_report = parse_failure_report
parsed_forensic_reports_to_csv_rows = parsed_failure_reports_to_csv_rows
parsed_forensic_reports_to_csv = parsed_failure_reports_to_csv

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,3 @@
__version__ = "10.0.0"
__version__ = "9.0.7"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -13,7 +13,6 @@ from elasticsearch_dsl import (
InnerDoc,
Integer,
Ip,
Keyword,
Nested,
Object,
Search,
@@ -22,7 +21,7 @@ from elasticsearch_dsl import (
)
from elasticsearch_dsl.search import Q
from parsedmarc import InvalidFailureReport
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -44,23 +43,18 @@ class _PublishedPolicy(InnerDoc):
sp = Text()
pct = Integer()
fo = Text()
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
class _DKIMResult(InnerDoc):
domain = Text()
selector = Text()
result = Text()
human_result = Text()
class _SPFResult(InnerDoc):
domain = Text()
scope = Text()
results = Text()
human_result = Text()
class _AggregateReportDoc(Document):
@@ -96,45 +90,17 @@ class _AggregateReportDoc(Document):
envelope_to = Text()
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
generator = Text()
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue]
def add_dkim_result(
self,
domain: str,
selector: str,
result: _DKIMResult,
human_result: str = None,
):
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
self.dkim_results.append(
_DKIMResult(
domain=domain,
selector=selector,
result=result,
human_result=human_result,
)
_DKIMResult(domain=domain, selector=selector, result=result)
) # pyright: ignore[reportCallIssue]
def add_spf_result(
self,
domain: str,
scope: str,
result: _SPFResult,
human_result: str = None,
):
self.spf_results.append(
_SPFResult(
domain=domain,
scope=scope,
result=result,
human_result=human_result,
)
) # pyright: ignore[reportCallIssue]
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) # pyright: ignore[reportCallIssue]
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
self.passed_dmarc = False
@@ -154,7 +120,7 @@ class _EmailAttachmentDoc(Document):
sha256 = Text()
class _FailureSampleDoc(InnerDoc):
class _ForensicSampleDoc(InnerDoc):
raw = Text()
headers = Object()
headers_only = Boolean()
@@ -191,9 +157,9 @@ class _FailureSampleDoc(InnerDoc):
) # pyright: ignore[reportCallIssue]
class _FailureReportDoc(Document):
class _ForensicReportDoc(Document):
class Index:
name = "dmarc_failure"
name = "dmarc_forensic"
feedback_type = Text()
user_agent = Text()
@@ -211,7 +177,7 @@ class _FailureReportDoc(Document):
source_auth_failures = Text()
dkim_domain = Text()
original_rcpt_to = Text()
sample = Object(_FailureSampleDoc)
sample = Object(_ForensicSampleDoc)
class _SMTPTLSFailureDetailsDoc(InnerDoc):
@@ -302,7 +268,6 @@ def set_hosts(
*,
use_ssl: bool = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -315,7 +280,6 @@ def set_hosts(
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -327,11 +291,10 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["ca_certs"] = ssl_cert_path
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
@@ -364,20 +327,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
failure_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
):
"""
Updates index mappings
Args:
aggregate_indexes (list): A list of aggregate index names
failure_indexes (list): A list of failure index names
forensic_indexes (list): A list of forensic index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if failure_indexes is None:
failure_indexes = []
if forensic_indexes is None:
forensic_indexes = []
for aggregate_index_name in aggregate_indexes:
if not Index(aggregate_index_name).exists():
continue
@@ -407,7 +370,7 @@ def migrate_indexes(
reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType]
Index(aggregate_index_name).delete()
for failure_index in failure_indexes:
for forensic_index in forensic_indexes:
pass
@@ -423,7 +386,7 @@ def save_aggregate_report_to_elasticsearch(
Saves a parsed DMARC aggregate report to Elasticsearch
Args:
aggregate_report (dict): A parsed aggregate report
aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -450,8 +413,8 @@ def save_aggregate_report_to_elasticsearch(
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # type: ignore
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(range=dict(date_begin=dict(gte=begin_date)))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(range=dict(date_end=dict(lte=end_date)))) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType]
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
@@ -491,9 +454,6 @@ def save_aggregate_report_to_elasticsearch(
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get("discovery_method"),
)
for record in aggregate_report["records"]:
@@ -535,12 +495,6 @@ def save_aggregate_report_to_elasticsearch(
header_from=record["identifiers"]["header_from"],
envelope_from=record["identifiers"]["envelope_from"],
envelope_to=record["identifiers"]["envelope_to"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get(
"discovery_method"
),
generator=metadata.get("generator"),
)
for override in record["policy_evaluated"]["policy_override_reasons"]:
@@ -553,7 +507,6 @@ def save_aggregate_report_to_elasticsearch(
domain=dkim_result["domain"],
selector=dkim_result["selector"],
result=dkim_result["result"],
human_result=dkim_result.get("human_result"),
)
for spf_result in record["auth_results"]["spf"]:
@@ -561,7 +514,6 @@ def save_aggregate_report_to_elasticsearch(
domain=spf_result["domain"],
scope=spf_result["scope"],
result=spf_result["result"],
human_result=spf_result.get("human_result"),
)
index = "dmarc_aggregate"
@@ -583,8 +535,8 @@ def save_aggregate_report_to_elasticsearch(
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
def save_failure_report_to_elasticsearch(
failure_report: dict[str, Any],
def save_forensic_report_to_elasticsearch(
forensic_report: dict[str, Any],
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -592,10 +544,10 @@ def save_failure_report_to_elasticsearch(
number_of_replicas: int = 0,
):
"""
Saves a parsed DMARC failure report to Elasticsearch
Saves a parsed DMARC forensic report to Elasticsearch
Args:
failure_report (dict): A parsed failure report
forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -608,28 +560,26 @@ def save_failure_report_to_elasticsearch(
AlreadySaved
"""
logger.info("Saving failure report to Elasticsearch")
failure_report = failure_report.copy()
logger.info("Saving forensic report to Elasticsearch")
forensic_report = forensic_report.copy()
sample_date = None
if failure_report["parsed_sample"]["date"] is not None:
sample_date = failure_report["parsed_sample"]["date"]
if forensic_report["parsed_sample"]["date"] is not None:
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = failure_report["parsed_sample"]["headers"]
original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"])
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(index_suffix)
search_index = "dmarc_forensic_{0}*".format(index_suffix)
else:
search_index = "dmarc_failure*,dmarc_forensic*"
search_index = "dmarc_forensic*"
if index_prefix is not None:
search_index = ",".join(
"{0}{1}".format(index_prefix, part) for part in search_index.split(",")
)
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) # pyright: ignore[reportArgumentType]
@@ -670,64 +620,64 @@ def save_failure_report_to_elasticsearch(
if len(existing) > 0:
raise AlreadySaved(
"A failure sample to {0} from {1} "
"A forensic sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"Elasticsearch".format(
to_, from_, subject, failure_report["arrival_date_utc"]
to_, from_, subject, forensic_report["arrival_date_utc"]
)
)
parsed_sample = failure_report["parsed_sample"]
sample = _FailureSampleDoc(
raw=failure_report["sample"],
parsed_sample = forensic_report["parsed_sample"]
sample = _ForensicSampleDoc(
raw=forensic_report["sample"],
headers=headers,
headers_only=failure_report["sample_headers_only"],
headers_only=forensic_report["sample_headers_only"],
date=sample_date,
subject=failure_report["parsed_sample"]["subject"],
subject=forensic_report["parsed_sample"]["subject"],
filename_safe_subject=parsed_sample["filename_safe_subject"],
body=failure_report["parsed_sample"]["body"],
body=forensic_report["parsed_sample"]["body"],
)
for address in failure_report["parsed_sample"]["to"]:
for address in forensic_report["parsed_sample"]["to"]:
sample.add_to(display_name=address["display_name"], address=address["address"])
for address in failure_report["parsed_sample"]["reply_to"]:
for address in forensic_report["parsed_sample"]["reply_to"]:
sample.add_reply_to(
display_name=address["display_name"], address=address["address"]
)
for address in failure_report["parsed_sample"]["cc"]:
for address in forensic_report["parsed_sample"]["cc"]:
sample.add_cc(display_name=address["display_name"], address=address["address"])
for address in failure_report["parsed_sample"]["bcc"]:
for address in forensic_report["parsed_sample"]["bcc"]:
sample.add_bcc(display_name=address["display_name"], address=address["address"])
for attachment in failure_report["parsed_sample"]["attachments"]:
for attachment in forensic_report["parsed_sample"]["attachments"]:
sample.add_attachment(
filename=attachment["filename"],
content_type=attachment["mail_content_type"],
sha256=attachment["sha256"],
)
try:
failure_doc = _FailureReportDoc(
feedback_type=failure_report["feedback_type"],
user_agent=failure_report["user_agent"],
version=failure_report["version"],
original_mail_from=failure_report["original_mail_from"],
forensic_doc = _ForensicReportDoc(
feedback_type=forensic_report["feedback_type"],
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
arrival_date=arrival_date_epoch_milliseconds,
domain=failure_report["reported_domain"],
original_envelope_id=failure_report["original_envelope_id"],
authentication_results=failure_report["authentication_results"],
delivery_results=failure_report["delivery_result"],
source_ip_address=failure_report["source"]["ip_address"],
source_country=failure_report["source"]["country"],
source_reverse_dns=failure_report["source"]["reverse_dns"],
source_base_domain=failure_report["source"]["base_domain"],
authentication_mechanisms=failure_report["authentication_mechanisms"],
auth_failure=failure_report["auth_failure"],
dkim_domain=failure_report["dkim_domain"],
original_rcpt_to=failure_report["original_rcpt_to"],
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
delivery_results=forensic_report["delivery_result"],
source_ip_address=forensic_report["source"]["ip_address"],
source_country=forensic_report["source"]["country"],
source_reverse_dns=forensic_report["source"]["reverse_dns"],
source_base_domain=forensic_report["source"]["base_domain"],
authentication_mechanisms=forensic_report["authentication_mechanisms"],
auth_failure=forensic_report["auth_failure"],
dkim_domain=forensic_report["dkim_domain"],
original_rcpt_to=forensic_report["original_rcpt_to"],
sample=sample,
)
index = "dmarc_failure"
index = "dmarc_forensic"
if index_suffix:
index = "{0}_{1}".format(index, index_suffix)
if index_prefix:
@@ -741,14 +691,14 @@ def save_failure_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
failure_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
try:
failure_doc.save()
forensic_doc.save()
except Exception as e:
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
except KeyError as e:
raise InvalidFailureReport(
"Failure report missing required field: {0}".format(e.__str__())
raise InvalidForensicReport(
"Forensic report missing required field: {0}".format(e.__str__())
)
@@ -785,7 +735,6 @@ def save_smtp_tls_report_to_elasticsearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date
@@ -902,9 +851,3 @@ def save_smtp_tls_report_to_elasticsearch(
smtp_tls_doc.save()
except Exception as e:
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
# Backward-compatible aliases
_ForensicSampleDoc = _FailureSampleDoc
_ForensicReportDoc = _FailureReportDoc
save_forensic_report_to_elasticsearch = save_failure_report_to_elasticsearch

View File

@@ -3,18 +3,17 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_failure_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from typing import Any
from parsedmarc.types import AggregateReport, SMTPTLSReport
log_context_data = threading.local()
@@ -38,7 +37,7 @@ class GelfClient(object):
"""
self.host = host
self.port = port
self.logger = logging.getLogger("parsedmarc_gelf")
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
@@ -51,7 +50,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -59,23 +58,14 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_failure_report_to_gelf(self, failure_reports: list[dict[str, Any]]):
rows = parsed_failure_reports_to_csv_rows(failure_reports)
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc failure report")
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc smtptls report")
def close(self):
"""Remove and close the GELF handler, releasing its connection."""
self.logger.removeHandler(self.handler)
self.handler.close()
# Backward-compatible aliases
GelfClient.save_forensic_report_to_gelf = GelfClient.save_failure_report_to_gelf

View File

@@ -62,10 +62,6 @@ class KafkaClient(object):
except NoBrokersAvailable:
raise KafkaError("No Kafka brokers available")
def close(self):
"""Close the Kafka producer, releasing background threads and sockets."""
self.producer.close()
@staticmethod
def strip_metadata(report: dict[str, Any]):
"""
@@ -143,31 +139,31 @@ class KafkaClient(object):
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
def save_failure_reports_to_kafka(
def save_forensic_reports_to_kafka(
self,
failure_reports: Union[dict[str, Any], list[dict[str, Any]]],
failure_topic: str,
forensic_reports: Union[dict[str, Any], list[dict[str, Any]]],
forensic_topic: str,
):
"""
Saves failure DMARC reports to Kafka, sends individual
Saves forensic DMARC reports to Kafka, sends individual
records (slices) since Kafka requires messages to be <= 1MB
by default.
Args:
failure_reports (list): A list of failure report dicts
forensic_reports (list): A list of forensic report dicts
to save to Kafka
failure_topic (str): The name of the Kafka topic
forensic_topic (str): The name of the Kafka topic
"""
if isinstance(failure_reports, dict):
failure_reports = [failure_reports]
if isinstance(forensic_reports, dict):
forensic_reports = [forensic_reports]
if len(failure_reports) < 1:
if len(forensic_reports) < 1:
return
try:
logger.debug("Saving failure reports to Kafka")
self.producer.send(failure_topic, failure_reports)
logger.debug("Saving forensic reports to Kafka")
self.producer.send(forensic_topic, forensic_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Kafka error: Unknown topic or partition on broker")
except Exception as e:
@@ -188,7 +184,7 @@ class KafkaClient(object):
by default.
Args:
smtp_tls_reports (list): A list of SMTP TLS report dicts
smtp_tls_reports (list): A list of forensic report dicts
to save to Kafka
smtp_tls_topic (str): The name of the Kafka topic
@@ -200,7 +196,7 @@ class KafkaClient(object):
return
try:
logger.debug("Saving SMTP TLS reports to Kafka")
logger.debug("Saving forensic reports to Kafka")
self.producer.send(smtp_tls_topic, smtp_tls_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Kafka error: Unknown topic or partition on broker")
@@ -210,7 +206,3 @@ class KafkaClient(object):
self.producer.flush()
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
# Backward-compatible aliases
KafkaClient.save_forensic_reports_to_kafka = KafkaClient.save_failure_reports_to_kafka

View File

@@ -38,9 +38,9 @@ class LogAnalyticsConfig:
The Stream name where
the Aggregate DMARC reports
need to be pushed.
dcr_failure_stream (str):
dcr_forensic_stream (str):
The Stream name where
the Failure DMARC reports
the Forensic DMARC reports
need to be pushed.
dcr_smtp_tls_stream (str):
The Stream name where
@@ -56,7 +56,7 @@ class LogAnalyticsConfig:
dce: str,
dcr_immutable_id: str,
dcr_aggregate_stream: str,
dcr_failure_stream: str,
dcr_forensic_stream: str,
dcr_smtp_tls_stream: str,
):
self.client_id = client_id
@@ -65,7 +65,7 @@ class LogAnalyticsConfig:
self.dce = dce
self.dcr_immutable_id = dcr_immutable_id
self.dcr_aggregate_stream = dcr_aggregate_stream
self.dcr_failure_stream = dcr_failure_stream
self.dcr_forensic_stream = dcr_forensic_stream
self.dcr_smtp_tls_stream = dcr_smtp_tls_stream
@@ -84,7 +84,7 @@ class LogAnalyticsClient(object):
dce: str,
dcr_immutable_id: str,
dcr_aggregate_stream: str,
dcr_failure_stream: str,
dcr_forensic_stream: str,
dcr_smtp_tls_stream: str,
):
self.conf = LogAnalyticsConfig(
@@ -94,7 +94,7 @@ class LogAnalyticsClient(object):
dce=dce,
dcr_immutable_id=dcr_immutable_id,
dcr_aggregate_stream=dcr_aggregate_stream,
dcr_failure_stream=dcr_failure_stream,
dcr_forensic_stream=dcr_forensic_stream,
dcr_smtp_tls_stream=dcr_smtp_tls_stream,
)
if (
@@ -135,7 +135,7 @@ class LogAnalyticsClient(object):
self,
results: dict[str, Any],
save_aggregate: bool,
save_failure: bool,
save_forensic: bool,
save_smtp_tls: bool,
):
"""
@@ -146,13 +146,13 @@ class LogAnalyticsClient(object):
Args:
results (list):
The DMARC reports (Aggregate & Failure)
The DMARC reports (Aggregate & Forensic)
save_aggregate (bool):
Whether Aggregate reports can be saved into Log Analytics
save_failure (bool):
Whether Failure reports can be saved into Log Analytics
save_forensic (bool):
Whether Forensic reports can be saved into Log Analytics
save_smtp_tls (bool):
Whether Failure reports can be saved into Log Analytics
Whether Forensic reports can be saved into Log Analytics
"""
conf = self.conf
credential = ClientSecretCredential(
@@ -173,16 +173,16 @@ class LogAnalyticsClient(object):
)
logger.info("Successfully pushed aggregate reports.")
if (
results["failure_reports"]
and conf.dcr_failure_stream
and len(results["failure_reports"]) > 0
and save_failure
results["forensic_reports"]
and conf.dcr_forensic_stream
and len(results["forensic_reports"]) > 0
and save_forensic
):
logger.info("Publishing failure reports.")
logger.info("Publishing forensic reports.")
self.publish_json(
results["failure_reports"], logs_client, conf.dcr_failure_stream
results["forensic_reports"], logs_client, conf.dcr_forensic_stream
)
logger.info("Successfully pushed failure reports.")
logger.info("Successfully pushed forensic reports.")
if (
results["smtp_tls_reports"]
and conf.dcr_smtp_tls_stream

View File

@@ -10,7 +10,6 @@ from typing import List
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
@@ -19,29 +18,7 @@ from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
def _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode="installed_app",
service_account_user=None,
):
normalized_auth_mode = (auth_mode or "installed_app").strip().lower()
if normalized_auth_mode == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_file,
scopes=scopes,
)
if service_account_user:
creds = creds.with_subject(service_account_user)
return creds
if normalized_auth_mode != "installed_app":
raise ValueError(
f"Unsupported Gmail auth_mode '{auth_mode}'. "
"Expected 'installed_app' or 'service_account'."
)
def _get_creds(token_file, credentials_file, scopes, oauth2_port):
creds = None
if Path(token_file).exists():
@@ -70,17 +47,8 @@ class GmailConnection(MailboxConnection):
reports_folder: str,
oauth2_port: int,
paginate_messages: bool,
auth_mode: str = "installed_app",
service_account_user: str | None = None,
):
creds = _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode=auth_mode,
service_account_user=service_account_user,
)
creds = _get_creds(token_file, credentials_file, scopes, oauth2_port)
self.service = build("gmail", "v1", credentials=creds)
self.include_spam_trash = include_spam_trash
self.reports_label_id = self._find_label_id_for_label(reports_folder)
@@ -158,7 +126,7 @@ class GmailConnection(MailboxConnection):
return urlsafe_b64decode(msg["raw"]).decode(errors="replace")
def delete_message(self, message_id: str):
self.service.users().messages().delete(userId="me", id=message_id).execute()
self.service.users().messages().delete(userId="me", id=message_id)
def move_message(self, message_id: str, folder_name: str):
label_id = self._find_label_id_for_label(folder_name)
@@ -175,14 +143,10 @@ class GmailConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)

View File

@@ -12,25 +12,19 @@ from azure.identity import (
UsernamePasswordCredential,
DeviceCodeCredential,
ClientSecretCredential,
CertificateCredential,
TokenCachePersistenceOptions,
AuthenticationRecord,
)
from msgraph.core import GraphClient
from requests.exceptions import RequestException
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
GRAPH_REQUEST_RETRY_ATTEMPTS = 3
GRAPH_REQUEST_RETRY_DELAY_SECONDS = 5
class AuthMethod(Enum):
DeviceCode = 1
UsernamePassword = 2
ClientSecret = 3
Certificate = 4
def _get_cache_args(token_path: Path, allow_unencrypted_storage):
@@ -89,55 +83,30 @@ def _generate_credential(auth_method: str, token_path: Path, **kwargs):
tenant_id=kwargs["tenant_id"],
client_secret=kwargs["client_secret"],
)
elif auth_method == AuthMethod.Certificate.name:
cert_path = kwargs.get("certificate_path")
if not cert_path:
raise ValueError(
"certificate_path is required when auth_method is 'Certificate'"
)
credential = CertificateCredential(
client_id=kwargs["client_id"],
tenant_id=kwargs["tenant_id"],
certificate_path=cert_path,
password=kwargs.get("certificate_password"),
)
else:
raise RuntimeError(f"Auth method {auth_method} not found")
return credential
class MSGraphConnection(MailboxConnection):
_WELL_KNOWN_FOLDERS = {
"inbox": "inbox",
"archive": "archive",
"drafts": "drafts",
"sentitems": "sentitems",
"deleteditems": "deleteditems",
"junkemail": "junkemail",
}
def __init__(
self,
auth_method: str,
mailbox: str,
graph_url: str,
client_id: str,
client_secret: Optional[str],
username: Optional[str],
password: Optional[str],
client_secret: str,
username: str,
password: str,
tenant_id: str,
token_file: str,
allow_unencrypted_storage: bool,
certificate_path: Optional[str] = None,
certificate_password: Optional[Union[str, bytes]] = None,
):
token_path = Path(token_file)
credential = _generate_credential(
auth_method,
client_id=client_id,
client_secret=client_secret,
certificate_path=certificate_path,
certificate_password=certificate_password,
username=username,
password=password,
tenant_id=tenant_id,
@@ -148,10 +117,10 @@ class MSGraphConnection(MailboxConnection):
"credential": credential,
"cloud": graph_url,
}
if not isinstance(credential, (ClientSecretCredential, CertificateCredential)):
if not isinstance(credential, ClientSecretCredential):
scopes = ["Mail.ReadWrite"]
# Detect if mailbox is shared
if mailbox and username and username != mailbox:
if mailbox and username != mailbox:
scopes = ["Mail.ReadWrite.Shared"]
auth_record = credential.authenticate(scopes=scopes)
_cache_auth_record(auth_record, token_path)
@@ -160,23 +129,6 @@ class MSGraphConnection(MailboxConnection):
self._client = GraphClient(**client_params)
self.mailbox_name = mailbox
def _request_with_retries(self, method_name: str, *args, **kwargs):
for attempt in range(1, GRAPH_REQUEST_RETRY_ATTEMPTS + 1):
try:
return getattr(self._client, method_name)(*args, **kwargs)
except RequestException as error:
if attempt == GRAPH_REQUEST_RETRY_ATTEMPTS:
raise
logger.warning(
"Transient MS Graph %s error on attempt %s/%s: %s",
method_name.upper(),
attempt,
GRAPH_REQUEST_RETRY_ATTEMPTS,
error,
)
sleep(GRAPH_REQUEST_RETRY_DELAY_SECONDS)
raise RuntimeError("no retry attempts configured")
def create_folder(self, folder_name: str):
sub_url = ""
path_parts = folder_name.split("/")
@@ -191,7 +143,7 @@ class MSGraphConnection(MailboxConnection):
request_body = {"displayName": folder_name}
request_url = f"/users/{self.mailbox_name}/mailFolders{sub_url}"
resp = self._request_with_retries("post", request_url, json=request_body)
resp = self._client.post(request_url, json=request_body)
if resp.status_code == 409:
logger.debug(f"Folder {folder_name} already exists, skipping creation")
elif resp.status_code == 201:
@@ -221,7 +173,7 @@ class MSGraphConnection(MailboxConnection):
params["$top"] = batch_size
else:
params["$top"] = 100
result = self._request_with_retries("get", url, params=params)
result = self._client.get(url, params=params)
if result.status_code != 200:
raise RuntimeError(f"Failed to fetch messages {result.text}")
messages = result.json()["value"]
@@ -229,7 +181,7 @@ class MSGraphConnection(MailboxConnection):
while "@odata.nextLink" in result.json() and (
since is not None or (batch_size == 0 or batch_size - len(messages) > 0)
):
result = self._request_with_retries("get", result.json()["@odata.nextLink"])
result = self._client.get(result.json()["@odata.nextLink"])
if result.status_code != 200:
raise RuntimeError(f"Failed to fetch messages {result.text}")
messages.extend(result.json()["value"])
@@ -238,7 +190,7 @@ class MSGraphConnection(MailboxConnection):
def mark_message_read(self, message_id: str):
"""Marks a message as read"""
url = f"/users/{self.mailbox_name}/messages/{message_id}"
resp = self._request_with_retries("patch", url, json={"isRead": "true"})
resp = self._client.patch(url, json={"isRead": "true"})
if resp.status_code != 200:
raise RuntimeWarning(
f"Failed to mark message read{resp.status_code}: {resp.json()}"
@@ -246,7 +198,7 @@ class MSGraphConnection(MailboxConnection):
def fetch_message(self, message_id: str, **kwargs):
url = f"/users/{self.mailbox_name}/messages/{message_id}/$value"
result = self._request_with_retries("get", url)
result = self._client.get(url)
if result.status_code != 200:
raise RuntimeWarning(
f"Failed to fetch message{result.status_code}: {result.json()}"
@@ -258,7 +210,7 @@ class MSGraphConnection(MailboxConnection):
def delete_message(self, message_id: str):
url = f"/users/{self.mailbox_name}/messages/{message_id}"
resp = self._request_with_retries("delete", url)
resp = self._client.delete(url)
if resp.status_code != 204:
raise RuntimeWarning(
f"Failed to delete message {resp.status_code}: {resp.json()}"
@@ -268,7 +220,7 @@ class MSGraphConnection(MailboxConnection):
folder_id = self._find_folder_id_from_folder_path(folder_name)
request_body = {"destinationId": folder_id}
url = f"/users/{self.mailbox_name}/messages/{message_id}/move"
resp = self._request_with_retries("post", url, json=request_body)
resp = self._client.post(url, json=request_body)
if resp.status_code != 201:
raise RuntimeWarning(
f"Failed to move message {resp.status_code}: {resp.json()}"
@@ -278,14 +230,10 @@ class MSGraphConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)
@@ -300,19 +248,6 @@ class MSGraphConnection(MailboxConnection):
else:
return self._find_folder_id_with_parent(folder_name, None)
def _get_well_known_folder_id(self, folder_name: str) -> Optional[str]:
folder_key = folder_name.lower().replace(" ", "").replace("-", "")
alias = self._WELL_KNOWN_FOLDERS.get(folder_key)
if alias is None:
return None
url = f"/users/{self.mailbox_name}/mailFolders/{alias}?$select=id,displayName"
folder_resp = self._request_with_retries("get", url)
if folder_resp.status_code != 200:
return None
payload = folder_resp.json()
return payload.get("id")
def _find_folder_id_with_parent(
self, folder_name: str, parent_folder_id: Optional[str]
):
@@ -321,12 +256,8 @@ class MSGraphConnection(MailboxConnection):
sub_url = f"/{parent_folder_id}/childFolders"
url = f"/users/{self.mailbox_name}/mailFolders{sub_url}"
filter = f"?$filter=displayName eq '{folder_name}'"
folders_resp = self._request_with_retries("get", url + filter)
folders_resp = self._client.get(url + filter)
if folders_resp.status_code != 200:
if parent_folder_id is None:
well_known_folder_id = self._get_well_known_folder_id(folder_name)
if well_known_folder_id:
return well_known_folder_id
raise RuntimeWarning(f"Failed to list folders.{folders_resp.json()}")
folders: list = folders_resp.json()["value"]
matched_folders = [

View File

@@ -55,33 +55,15 @@ class IMAPConnection(MailboxConnection):
return cast(str, self._client.fetch_message(message_id, parse=False))
def delete_message(self, message_id: int):
try:
self._client.delete_messages([message_id])
except IMAPClientError as error:
logger.warning(
"IMAP delete fallback for message %s due to server error: %s",
message_id,
error,
)
self._client.add_flags([message_id], [r"\Deleted"], silent=True)
self._client.expunge()
self._client.delete_messages([message_id])
def move_message(self, message_id: int, folder_name: str):
try:
self._client.move_messages([message_id], folder_name)
except IMAPClientError as error:
logger.warning(
"IMAP move fallback for message %s due to server error: %s",
message_id,
error,
)
self._client.copy([message_id], folder_name)
self.delete_message(message_id)
self._client.move_messages([message_id], folder_name)
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
@@ -94,8 +76,6 @@ class IMAPConnection(MailboxConnection):
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
@@ -113,5 +93,3 @@ class IMAPConnection(MailboxConnection):
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if config_reloading and config_reloading():
return

View File

@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout):
raise NotImplementedError

View File

@@ -63,14 +63,10 @@ class MaildirConnection(MailboxConnection):
def keepalive(self):
return
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if config_reloading and config_reloading():
return
sleep(check_timeout)

View File

@@ -4,9 +4,7 @@ from __future__ import annotations
from typing import Any, Optional, Union
import boto3
from opensearchpy import (
AWSV4SignerAuth,
Boolean,
Date,
Document,
@@ -14,18 +12,16 @@ from opensearchpy import (
InnerDoc,
Integer,
Ip,
Keyword,
Nested,
Object,
Q,
RequestsHttpConnection,
Search,
Text,
connections,
)
from opensearchpy.helpers import reindex
from parsedmarc import InvalidFailureReport
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -47,23 +43,18 @@ class _PublishedPolicy(InnerDoc):
sp = Text()
pct = Integer()
fo = Text()
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
class _DKIMResult(InnerDoc):
domain = Text()
selector = Text()
result = Text()
human_result = Text()
class _SPFResult(InnerDoc):
domain = Text()
scope = Text()
results = Text()
human_result = Text()
class _AggregateReportDoc(Document):
@@ -99,45 +90,17 @@ class _AggregateReportDoc(Document):
envelope_to = Text()
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
generator = Text()
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(
self,
domain: str,
selector: str,
result: _DKIMResult,
human_result: str = None,
):
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
self.dkim_results.append(
_DKIMResult(
domain=domain,
selector=selector,
result=result,
human_result=human_result,
)
_DKIMResult(domain=domain, selector=selector, result=result)
)
def add_spf_result(
self,
domain: str,
scope: str,
result: _SPFResult,
human_result: str = None,
):
self.spf_results.append(
_SPFResult(
domain=domain,
scope=scope,
result=result,
human_result=human_result,
)
)
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
self.passed_dmarc = False
@@ -157,7 +120,7 @@ class _EmailAttachmentDoc(Document):
sha256 = Text()
class _FailureSampleDoc(InnerDoc):
class _ForensicSampleDoc(InnerDoc):
raw = Text()
headers = Object()
headers_only = Boolean()
@@ -194,9 +157,9 @@ class _FailureSampleDoc(InnerDoc):
)
class _FailureReportDoc(Document):
class _ForensicReportDoc(Document):
class Index:
name = "dmarc_failure"
name = "dmarc_forensic"
feedback_type = Text()
user_agent = Text()
@@ -214,7 +177,7 @@ class _FailureReportDoc(Document):
source_auth_failures = Text()
dkim_domain = Text()
original_rcpt_to = Text()
sample = Object(_FailureSampleDoc)
sample = Object(_ForensicSampleDoc)
class _SMTPTLSFailureDetailsDoc(InnerDoc):
@@ -305,14 +268,10 @@ def set_hosts(
*,
use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
timeout: Optional[float] = 60.0,
auth_type: str = "basic",
aws_region: Optional[str] = None,
aws_service: str = "es",
):
"""
Sets the OpenSearch hosts to use
@@ -321,14 +280,10 @@ def set_hosts(
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
timeout (float): Timeout in seconds
auth_type (str): OpenSearch auth mode: basic (default) or awssigv4
aws_region (str): AWS region for SigV4 auth (required for awssigv4)
aws_service (str): AWS service for SigV4 signing (default: es)
"""
if not isinstance(hosts, list):
hosts = [hosts]
@@ -336,35 +291,14 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["ca_certs"] = ssl_cert_path
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
normalized_auth_type = (auth_type or "basic").strip().lower()
if normalized_auth_type == "awssigv4":
if not aws_region:
raise OpenSearchError(
"OpenSearch AWS SigV4 auth requires 'aws_region' to be set"
)
session = boto3.Session()
credentials = session.get_credentials()
if credentials is None:
raise OpenSearchError(
"Unable to load AWS credentials for OpenSearch SigV4 authentication"
)
conn_params["http_auth"] = AWSV4SignerAuth(credentials, aws_region, aws_service)
conn_params["connection_class"] = RequestsHttpConnection
elif normalized_auth_type == "basic":
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
conn_params["api_key"] = api_key
else:
raise OpenSearchError(
f"Unsupported OpenSearch auth_type '{auth_type}'. "
"Expected 'basic' or 'awssigv4'."
)
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
conn_params["api_key"] = api_key
connections.create_connection(**conn_params)
@@ -393,20 +327,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
failure_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
):
"""
Updates index mappings
Args:
aggregate_indexes (list): A list of aggregate index names
failure_indexes (list): A list of failure index names
forensic_indexes (list): A list of forensic index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if failure_indexes is None:
failure_indexes = []
if forensic_indexes is None:
forensic_indexes = []
for aggregate_index_name in aggregate_indexes:
if not Index(aggregate_index_name).exists():
continue
@@ -436,7 +370,7 @@ def migrate_indexes(
reindex(connections.get_connection(), aggregate_index_name, new_index_name)
Index(aggregate_index_name).delete()
for failure_index in failure_indexes:
for forensic_index in forensic_indexes:
pass
@@ -452,7 +386,7 @@ def save_aggregate_report_to_opensearch(
Saves a parsed DMARC aggregate report to OpenSearch
Args:
aggregate_report (dict): A parsed aggregate report
aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -479,8 +413,8 @@ def save_aggregate_report_to_opensearch(
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(range=dict(date_begin=dict(gte=begin_date))))
end_date_query = Q(dict(range=dict(date_end=dict(lte=end_date))))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
@@ -520,9 +454,6 @@ def save_aggregate_report_to_opensearch(
sp=aggregate_report["policy_published"]["sp"],
pct=aggregate_report["policy_published"]["pct"],
fo=aggregate_report["policy_published"]["fo"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get("discovery_method"),
)
for record in aggregate_report["records"]:
@@ -564,12 +495,6 @@ def save_aggregate_report_to_opensearch(
header_from=record["identifiers"]["header_from"],
envelope_from=record["identifiers"]["envelope_from"],
envelope_to=record["identifiers"]["envelope_to"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get(
"discovery_method"
),
generator=metadata.get("generator"),
)
for override in record["policy_evaluated"]["policy_override_reasons"]:
@@ -582,7 +507,6 @@ def save_aggregate_report_to_opensearch(
domain=dkim_result["domain"],
selector=dkim_result["selector"],
result=dkim_result["result"],
human_result=dkim_result.get("human_result"),
)
for spf_result in record["auth_results"]["spf"]:
@@ -590,7 +514,6 @@ def save_aggregate_report_to_opensearch(
domain=spf_result["domain"],
scope=spf_result["scope"],
result=spf_result["result"],
human_result=spf_result.get("human_result"),
)
index = "dmarc_aggregate"
@@ -612,8 +535,8 @@ def save_aggregate_report_to_opensearch(
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
def save_failure_report_to_opensearch(
failure_report: dict[str, Any],
def save_forensic_report_to_opensearch(
forensic_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
@@ -621,10 +544,10 @@ def save_failure_report_to_opensearch(
number_of_replicas: int = 0,
):
"""
Saves a parsed DMARC failure report to OpenSearch
Saves a parsed DMARC forensic report to OpenSearch
Args:
failure_report (dict): A parsed failure report
forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -637,28 +560,26 @@ def save_failure_report_to_opensearch(
AlreadySaved
"""
logger.info("Saving failure report to OpenSearch")
failure_report = failure_report.copy()
logger.info("Saving forensic report to OpenSearch")
forensic_report = forensic_report.copy()
sample_date = None
if failure_report["parsed_sample"]["date"] is not None:
sample_date = failure_report["parsed_sample"]["date"]
if forensic_report["parsed_sample"]["date"] is not None:
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = failure_report["parsed_sample"]["headers"]
original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"])
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(index_suffix)
search_index = "dmarc_forensic_{0}*".format(index_suffix)
else:
search_index = "dmarc_failure*,dmarc_forensic*"
search_index = "dmarc_forensic*"
if index_prefix is not None:
search_index = ",".join(
"{0}{1}".format(index_prefix, part) for part in search_index.split(",")
)
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
@@ -699,62 +620,64 @@ def save_failure_report_to_opensearch(
if len(existing) > 0:
raise AlreadySaved(
"A failure sample to {0} from {1} "
"A forensic sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"OpenSearch".format(to_, from_, subject, failure_report["arrival_date_utc"])
"OpenSearch".format(
to_, from_, subject, forensic_report["arrival_date_utc"]
)
)
parsed_sample = failure_report["parsed_sample"]
sample = _FailureSampleDoc(
raw=failure_report["sample"],
parsed_sample = forensic_report["parsed_sample"]
sample = _ForensicSampleDoc(
raw=forensic_report["sample"],
headers=headers,
headers_only=failure_report["sample_headers_only"],
headers_only=forensic_report["sample_headers_only"],
date=sample_date,
subject=failure_report["parsed_sample"]["subject"],
subject=forensic_report["parsed_sample"]["subject"],
filename_safe_subject=parsed_sample["filename_safe_subject"],
body=failure_report["parsed_sample"]["body"],
body=forensic_report["parsed_sample"]["body"],
)
for address in failure_report["parsed_sample"]["to"]:
for address in forensic_report["parsed_sample"]["to"]:
sample.add_to(display_name=address["display_name"], address=address["address"])
for address in failure_report["parsed_sample"]["reply_to"]:
for address in forensic_report["parsed_sample"]["reply_to"]:
sample.add_reply_to(
display_name=address["display_name"], address=address["address"]
)
for address in failure_report["parsed_sample"]["cc"]:
for address in forensic_report["parsed_sample"]["cc"]:
sample.add_cc(display_name=address["display_name"], address=address["address"])
for address in failure_report["parsed_sample"]["bcc"]:
for address in forensic_report["parsed_sample"]["bcc"]:
sample.add_bcc(display_name=address["display_name"], address=address["address"])
for attachment in failure_report["parsed_sample"]["attachments"]:
for attachment in forensic_report["parsed_sample"]["attachments"]:
sample.add_attachment(
filename=attachment["filename"],
content_type=attachment["mail_content_type"],
sha256=attachment["sha256"],
)
try:
failure_doc = _FailureReportDoc(
feedback_type=failure_report["feedback_type"],
user_agent=failure_report["user_agent"],
version=failure_report["version"],
original_mail_from=failure_report["original_mail_from"],
forensic_doc = _ForensicReportDoc(
feedback_type=forensic_report["feedback_type"],
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
arrival_date=arrival_date_epoch_milliseconds,
domain=failure_report["reported_domain"],
original_envelope_id=failure_report["original_envelope_id"],
authentication_results=failure_report["authentication_results"],
delivery_results=failure_report["delivery_result"],
source_ip_address=failure_report["source"]["ip_address"],
source_country=failure_report["source"]["country"],
source_reverse_dns=failure_report["source"]["reverse_dns"],
source_base_domain=failure_report["source"]["base_domain"],
authentication_mechanisms=failure_report["authentication_mechanisms"],
auth_failure=failure_report["auth_failure"],
dkim_domain=failure_report["dkim_domain"],
original_rcpt_to=failure_report["original_rcpt_to"],
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
delivery_results=forensic_report["delivery_result"],
source_ip_address=forensic_report["source"]["ip_address"],
source_country=forensic_report["source"]["country"],
source_reverse_dns=forensic_report["source"]["reverse_dns"],
source_base_domain=forensic_report["source"]["base_domain"],
authentication_mechanisms=forensic_report["authentication_mechanisms"],
auth_failure=forensic_report["auth_failure"],
dkim_domain=forensic_report["dkim_domain"],
original_rcpt_to=forensic_report["original_rcpt_to"],
sample=sample,
)
index = "dmarc_failure"
index = "dmarc_forensic"
if index_suffix:
index = "{0}_{1}".format(index, index_suffix)
if index_prefix:
@@ -768,14 +691,14 @@ def save_failure_report_to_opensearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
failure_doc.meta.index = index
forensic_doc.meta.index = index
try:
failure_doc.save()
forensic_doc.save()
except Exception as e:
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
except KeyError as e:
raise InvalidFailureReport(
"Failure report missing required field: {0}".format(e.__str__())
raise InvalidForensicReport(
"Forensic report missing required field: {0}".format(e.__str__())
)
@@ -812,7 +735,6 @@ def save_smtp_tls_report_to_opensearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date
@@ -929,9 +851,3 @@ def save_smtp_tls_report_to_opensearch(
smtp_tls_doc.save()
except Exception as e:
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
# Backward-compatible aliases
_ForensicSampleDoc = _FailureSampleDoc
_ForensicReportDoc = _FailureReportDoc
save_forensic_report_to_opensearch = save_failure_report_to_opensearch

View File

@@ -56,8 +56,8 @@ class S3Client(object):
def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate")
def save_failure_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "failure")
def save_forensic_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "forensic")
def save_smtp_tls_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "smtp_tls")
@@ -93,15 +93,3 @@ class S3Client(object):
self.bucket.put_object(
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
)
def close(self):
"""Clean up the boto3 resource."""
try:
if self.s3.meta is not None:
self.s3.meta.client.close()
except Exception:
pass
# Backward-compatible aliases
S3Client.save_forensic_report_to_s3 = S3Client.save_failure_report_to_s3

View File

@@ -58,7 +58,7 @@ class HECClient(object):
self.source = source
self.session = requests.Session()
self.timeout = timeout
self.verify = verify
self.session.verify = verify
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
host=self.host, source=self.source, index=self.index
)
@@ -124,51 +124,47 @@ class HECClient(object):
data["event"] = new_report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def save_failure_reports_to_splunk(
def save_forensic_reports_to_splunk(
self,
failure_reports: Union[list[dict[str, Any]], dict[str, Any]],
forensic_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
"""
Saves failure DMARC reports to Splunk
Saves forensic DMARC reports to Splunk
Args:
failure_reports (list): A list of failure report dictionaries
forensic_reports (list): A list of forensic report dictionaries
to save in Splunk
"""
logger.debug("Saving failure reports to Splunk")
if isinstance(failure_reports, dict):
failure_reports = [failure_reports]
logger.debug("Saving forensic reports to Splunk")
if isinstance(forensic_reports, dict):
forensic_reports = [forensic_reports]
if len(failure_reports) < 1:
if len(forensic_reports) < 1:
return
json_str = ""
for report in failure_reports:
for report in forensic_reports:
data = self._common_data.copy()
data["sourcetype"] = "dmarc:failure"
data["sourcetype"] = "dmarc:forensic"
timestamp = human_timestamp_to_unix_timestamp(report["arrival_date_utc"])
data["time"] = timestamp
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
@@ -202,22 +198,12 @@ class HECClient(object):
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def close(self):
"""Close the underlying HTTP session."""
self.session.close()
# Backward-compatible aliases
HECClient.save_forensic_reports_to_splunk = HECClient.save_failure_reports_to_splunk

View File

@@ -6,14 +6,11 @@ from __future__ import annotations
import json
import logging
import logging.handlers
import socket
import ssl
import time
from typing import Any, Optional
from typing import Any
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_failure_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
@@ -21,157 +18,27 @@ from parsedmarc import (
class SyslogClient(object):
"""A client for Syslog"""
def __init__(
self,
server_name: str,
server_port: int,
protocol: str = "udp",
cafile_path: Optional[str] = None,
certfile_path: Optional[str] = None,
keyfile_path: Optional[str] = None,
timeout: float = 5.0,
retry_attempts: int = 3,
retry_delay: int = 5,
):
def __init__(self, server_name: str, server_port: int):
"""
Initializes the SyslogClient
Args:
server_name (str): The Syslog server
server_port (int): The Syslog port
protocol (str): The protocol to use: "udp", "tcp", or "tls" (Default: "udp")
cafile_path (str): Path to CA certificate file for TLS server verification (Optional)
certfile_path (str): Path to client certificate file for TLS authentication (Optional)
keyfile_path (str): Path to client private key file for TLS authentication (Optional)
timeout (float): Connection timeout in seconds for TCP/TLS (Default: 5.0)
retry_attempts (int): Number of retry attempts for failed connections (Default: 3)
retry_delay (int): Delay in seconds between retry attempts (Default: 5)
server_port (int): The Syslog UDP port
"""
self.server_name = server_name
self.server_port = server_port
self.protocol = protocol.lower()
self.timeout = timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO)
# Create the appropriate syslog handler based on protocol
self.log_handler = self._create_syslog_handler(
server_name,
server_port,
self.protocol,
cafile_path,
certfile_path,
keyfile_path,
timeout,
retry_attempts,
retry_delay,
)
self.logger.addHandler(self.log_handler)
def _create_syslog_handler(
self,
server_name: str,
server_port: int,
protocol: str,
cafile_path: Optional[str],
certfile_path: Optional[str],
keyfile_path: Optional[str],
timeout: float,
retry_attempts: int,
retry_delay: int,
) -> logging.handlers.SysLogHandler:
"""
Creates a SysLogHandler with the specified protocol and TLS settings
"""
if protocol == "udp":
# UDP protocol (default, backward compatible)
return logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_DGRAM,
)
elif protocol in ["tcp", "tls"]:
# TCP or TLS protocol with retry logic
for attempt in range(1, retry_attempts + 1):
try:
if protocol == "tcp":
# TCP without TLS
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Set timeout on the socket
if hasattr(handler, "socket") and handler.socket:
handler.socket.settimeout(timeout)
return handler
else:
# TLS protocol
# Create SSL context with secure defaults
ssl_context = ssl.create_default_context()
# Explicitly set minimum TLS version to 1.2 for security
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
# Configure server certificate verification
if cafile_path:
ssl_context.load_verify_locations(cafile=cafile_path)
# Configure client certificate authentication
if certfile_path and keyfile_path:
ssl_context.load_cert_chain(
certfile=certfile_path,
keyfile=keyfile_path,
)
elif certfile_path or keyfile_path:
# Warn if only one of the two required parameters is provided
self.logger.warning(
"Both certfile_path and keyfile_path are required for "
"client certificate authentication. Client authentication "
"will not be used."
)
# Create TCP handler first
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Wrap socket with TLS
if hasattr(handler, "socket") and handler.socket:
handler.socket = ssl_context.wrap_socket(
handler.socket,
server_hostname=server_name,
)
handler.socket.settimeout(timeout)
return handler
except Exception as e:
if attempt < retry_attempts:
self.logger.warning(
f"Syslog connection attempt {attempt}/{retry_attempts} failed: {e}. "
f"Retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
self.logger.error(
f"Syslog connection failed after {retry_attempts} attempts: {e}"
)
raise
else:
raise ValueError(
f"Invalid protocol '{protocol}'. Must be 'udp', 'tcp', or 'tls'."
)
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
self.logger.addHandler(log_handler)
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_failure_report_to_syslog(self, failure_reports: list[dict[str, Any]]):
rows = parsed_failure_reports_to_csv_rows(failure_reports)
def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
@@ -179,12 +46,3 @@ class SyslogClient(object):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))
def close(self):
"""Remove and close the syslog handler, releasing its socket."""
self.logger.removeHandler(self.log_handler)
self.log_handler.close()
# Backward-compatible aliases
SyslogClient.save_forensic_report_to_syslog = SyslogClient.save_failure_report_to_syslog

View File

@@ -2,13 +2,13 @@ from __future__ import annotations
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# NOTE: This module is intentionally Python 3.10 compatible.
# NOTE: This module is intentionally Python 3.9 compatible.
# - No PEP 604 unions (A | B)
# - No typing.NotRequired / Required (3.11+) to avoid an extra dependency.
# For optional keys, use total=False TypedDicts.
ReportType = Literal["aggregate", "failure", "smtp_tls"]
ReportType = Literal["aggregate", "forensic", "smtp_tls"]
class AggregateReportMetadata(TypedDict):
@@ -21,7 +21,6 @@ class AggregateReportMetadata(TypedDict):
timespan_requires_normalization: bool
original_timespan_seconds: int
errors: List[str]
generator: Optional[str]
class AggregatePolicyPublished(TypedDict):
@@ -30,11 +29,8 @@ class AggregatePolicyPublished(TypedDict):
aspf: str
p: str
sp: str
pct: Optional[str]
fo: Optional[str]
np: Optional[str]
testing: Optional[str]
discovery_method: Optional[str]
pct: str
fo: str
class IPSourceInfo(TypedDict):
@@ -67,14 +63,12 @@ class AggregateAuthResultDKIM(TypedDict):
domain: str
result: str
selector: str
human_result: Optional[str]
class AggregateAuthResultSPF(TypedDict):
domain: str
result: str
scope: str
human_result: Optional[str]
class AggregateAuthResults(TypedDict):
@@ -125,7 +119,7 @@ ParsedEmail = TypedDict(
"ParsedEmail",
{
# This is a lightly-specified version of mailsuite/mailparser JSON.
# It focuses on the fields parsedmarc uses in failure report handling.
# It focuses on the fields parsedmarc uses in forensic handling.
"headers": Dict[str, Any],
"subject": Optional[str],
"filename_safe_subject": Optional[str],
@@ -144,7 +138,7 @@ ParsedEmail = TypedDict(
)
class FailureReport(TypedDict):
class ForensicReport(TypedDict):
feedback_type: Optional[str]
user_agent: Optional[str]
version: Optional[str]
@@ -165,10 +159,6 @@ class FailureReport(TypedDict):
parsed_sample: ParsedEmail
# Backward-compatible alias
ForensicReport = FailureReport
class SMTPTLSFailureDetails(TypedDict):
result_type: str
failed_session_count: int
@@ -211,13 +201,9 @@ class AggregateParsedReport(TypedDict):
report: AggregateReport
class FailureParsedReport(TypedDict):
report_type: Literal["failure"]
report: FailureReport
# Backward-compatible alias
ForensicParsedReport = FailureParsedReport
class ForensicParsedReport(TypedDict):
report_type: Literal["forensic"]
report: ForensicReport
class SMTPTLSParsedReport(TypedDict):
@@ -225,10 +211,10 @@ class SMTPTLSParsedReport(TypedDict):
report: SMTPTLSReport
ParsedReport = Union[AggregateParsedReport, FailureParsedReport, SMTPTLSParsedReport]
ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport]
class ParsingResults(TypedDict):
aggregate_reports: List[AggregateReport]
failure_reports: List[FailureReport]
forensic_reports: List[ForensicReport]
smtp_tls_reports: List[SMTPTLSReport]

View File

@@ -205,7 +205,8 @@ def get_reverse_dns(
)[0]
except dns.exception.DNSException as e:
logger.debug(f"get_reverse_dns({ip_address}) exception: {e}")
logger.warning(f"get_reverse_dns({ip_address}) exception: {e}")
pass
return hostname

View File

@@ -16,7 +16,7 @@ class WebhookClient(object):
def __init__(
self,
aggregate_url: str,
failure_url: str,
forensic_url: str,
smtp_tls_url: str,
timeout: Optional[int] = 60,
):
@@ -24,12 +24,12 @@ class WebhookClient(object):
Initializes the WebhookClient
Args:
aggregate_url (str): The aggregate report webhook url
failure_url (str): The failure report webhook url
forensic_url (str): The forensic report webhook url
smtp_tls_url (str): The smtp_tls report webhook url
timeout (int): The timeout to use when calling the webhooks
"""
self.aggregate_url = aggregate_url
self.failure_url = failure_url
self.forensic_url = forensic_url
self.smtp_tls_url = smtp_tls_url
self.timeout = timeout
self.session = requests.Session()
@@ -38,9 +38,9 @@ class WebhookClient(object):
"Content-Type": "application/json",
}
def save_failure_report_to_webhook(self, report: str):
def save_forensic_report_to_webhook(self, report: str):
try:
self._send_to_webhook(self.failure_url, report)
self._send_to_webhook(self.forensic_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
@@ -63,13 +63,3 @@ class WebhookClient(object):
self.session.post(webhook_url, data=payload, timeout=self.timeout)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
def close(self):
"""Close the underlying HTTP session."""
self.session.close()
# Backward-compatible aliases
WebhookClient.save_forensic_report_to_webhook = (
WebhookClient.save_failure_report_to_webhook
)

View File

@@ -2,7 +2,7 @@
requires = [
"hatchling>=1.27.0",
]
requires_python = ">=3.10,<3.15"
requires_python = ">=3.9,<3.14"
build-backend = "hatchling.build"
[project]
@@ -29,7 +29,7 @@ classifiers = [
"Operating System :: OS Independent",
"Programming Language :: Python :: 3"
]
requires-python = ">=3.10"
requires-python = ">=3.9, <3.14"
dependencies = [
"azure-identity>=1.8.0",
"azure-monitor-ingestion>=1.0.0",
@@ -45,10 +45,10 @@ dependencies = [
"google-auth-httplib2>=0.1.0",
"google-auth-oauthlib>=0.4.6",
"google-auth>=2.3.3",
"imapclient>=3.1.0",
"imapclient>=2.1.0",
"kafka-python-ng>=2.2.2",
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"mailsuite>=1.11.0",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0",

View File

@@ -1,48 +0,0 @@
<feedback xmlns="urn:ietf:params:xml:ns:dmarc-2.0">
<version>1.0</version>
<report_metadata>
<org_name>Sample Reporter</org_name>
<email>report_sender@example-reporter.com</email>
<extra_contact_info>...</extra_contact_info>
<report_id>3v98abbp8ya9n3va8yr8oa3ya</report_id>
<date_range>
<begin>302832000</begin>
<end>302918399</end>
</date_range>
<generator>Example DMARC Aggregate Reporter v1.2</generator>
</report_metadata>
<policy_published>
<domain>example.com</domain>
<p>quarantine</p>
<sp>none</sp>
<np>none</np>
<testing>n</testing>
<discovery_method>treewalk</discovery_method>
</policy_published>
<record>
<row>
<source_ip>192.0.2.123</source_ip>
<count>123</count>
<policy_evaluated>
<disposition>pass</disposition>
<dkim>pass</dkim>
<spf>fail</spf>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<dkim>
<domain>example.com</domain>
<result>pass</result>
<selector>abc123</selector>
</dkim>
<spf>
<domain>example.com</domain>
<result>fail</result>
</spf>
</auth_results>
</record>
</feedback>

View File

@@ -1,77 +0,0 @@
<?xml version="1.0"?>
<feedback>
<version>2.0</version>
<report_metadata>
<org_name>example.net</org_name>
<email>postmaster@example.net</email>
<report_id>dmarcbis-test-report-001</report_id>
<date_range>
<begin>1700000000</begin>
<end>1700086399</end>
</date_range>
</report_metadata>
<policy_published>
<domain>example.com</domain>
<adkim>s</adkim>
<aspf>s</aspf>
<p>reject</p>
<sp>quarantine</sp>
<np>reject</np>
<testing>y</testing>
<discovery_method>treewalk</discovery_method>
<fo>1</fo>
</policy_published>
<record>
<row>
<source_ip>198.51.100.1</source_ip>
<count>5</count>
<policy_evaluated>
<disposition>none</disposition>
<dkim>pass</dkim>
<spf>pass</spf>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<dkim>
<domain>example.com</domain>
<selector>selector1</selector>
<result>pass</result>
</dkim>
<spf>
<domain>example.com</domain>
<scope>mfrom</scope>
<result>pass</result>
</spf>
</auth_results>
</record>
<record>
<row>
<source_ip>203.0.113.10</source_ip>
<count>2</count>
<policy_evaluated>
<disposition>reject</disposition>
<dkim>fail</dkim>
<spf>fail</spf>
<reason>
<type>other</type>
<comment>sender not authorized</comment>
</reason>
</policy_evaluated>
</row>
<identifiers>
<envelope_from>spoofed.example.com</envelope_from>
<header_from>example.com</header_from>
</identifiers>
<auth_results>
<spf>
<domain>spoofed.example.com</domain>
<scope>mfrom</scope>
<result>fail</result>
</spf>
</auth_results>
</record>
</feedback>

View File

@@ -60,10 +60,10 @@ Create Dashboards
9. Click Save
10. Click Dashboards
11. Click Create New Dashboard
12. Use a descriptive title, such as "Failure DMARC Data"
12. Use a descriptive title, such as "Forensic DMARC Data"
13. Click Create Dashboard
14. Click on the Source button
15. Paste the content of ''dmarc_failure_dashboard.xml`` into the source editor
15. Paste the content of ''dmarc_forensic_dashboard.xml`` into the source editor
16. If the index storing the DMARC data is not named email, replace index="email" accordingly
17. Click Save

View File

@@ -1,8 +1,8 @@
<form theme="dark" version="1.1">
<label>Failure DMARC Data</label>
<label>Forensic DMARC Data</label>
<search id="base_search">
<query>
index="email" (sourcetype="dmarc:failure" OR sourcetype="dmarc:forensic") parsed_sample.headers.From=$header_from$ parsed_sample.headers.To=$header_to$ parsed_sample.headers.Subject=$header_subject$ source.ip_address=$source_ip_address$ source.reverse_dns=$source_reverse_dns$ source.country=$source_country$
index="email" sourcetype="dmarc:forensic" parsed_sample.headers.From=$header_from$ parsed_sample.headers.To=$header_to$ parsed_sample.headers.Subject=$header_subject$ source.ip_address=$source_ip_address$ source.reverse_dns=$source_reverse_dns$ source.country=$source_country$
| table *
</query>
<earliest>$time_range.earliest$</earliest>
@@ -43,7 +43,7 @@
</fieldset>
<row>
<panel>
<title>Failure samples</title>
<title>Forensic samples</title>
<table>
<search base="base_search">
<query>| table arrival_date_utc authentication_results parsed_sample.headers.From,parsed_sample.headers.To,parsed_sample.headers.Subject | sort -arrival_date_utc</query>
@@ -59,7 +59,7 @@
</row>
<row>
<panel>
<title>Failure samples by country</title>
<title>Forensic samples by country</title>
<map>
<search base="base_search">
<query>| iplocation source.ip_address| stats count by Country | geom geo_countries featureIdField="Country"</query>
@@ -72,7 +72,7 @@
</row>
<row>
<panel>
<title>Failure samples by IP address</title>
<title>Forensic samples by IP address</title>
<table>
<search base="base_search">
<query>| iplocation source.ip_address | stats count by source.ip_address,source.reverse_dns | sort -count</query>
@@ -85,7 +85,7 @@
</table>
</panel>
<panel>
<title>Failure samples by country ISO code</title>
<title>Forensic samples by country ISO code</title>
<table>
<search base="base_search">
<query>| stats count by source.country | sort - count</query>

3825
tests.py

File diff suppressed because it is too large Load Diff