Compare commits

...

68 Commits

Author SHA1 Message Date
Sean Whalen
691b0fcd41 Fix changelog headings 2026-03-10 20:34:13 -04:00
Sean Whalen
b9343a295f 9.2.1
- Better checking of `msconfig` configuration (PR #695)
- Updated `dbip-country-lite` database to version `2026-03`
- Changed - DNS query error logging level from `warning` to `debug`
2026-03-10 20:32:33 -04:00
Kili
b51a62463f Fail fast on invalid MS Graph username/password config (#695) 2026-03-10 19:34:16 -04:00
Kili
66ba5b0e5e Add MS Graph auth matrix regression tests (#696)
* Rebase MS Graph auth matrix tests onto current master

* Expand ClientSecret auth matrix coverage
2026-03-10 19:33:37 -04:00
Sean Whalen
7929919223 9.2.0
### Added

- OpenSearch AWS SigV4 authentication support (PR #673)
- IMAP move/delete compatibility fallbacks (PR #671)
- `fail_on_output_error` CLI option for sink failures (PR #672)
- Gmail service account auth mode for non-interactive runs (PR #676)
- Microsoft Graph certificate authentication support (PRs #692 and #693)
- Microsoft Graph well-known folder fallback for root listing failures (PR #618 and #684 close #609)

### Fixed

- Pass mailbox since filter through `watch_inbox` callback (PR #670 closes issue #581)
- `parsedmarc.mail.gmail.GmailConnection.delete_message` now properly calls the Gmail API (PR #668)
- Avoid extra mailbox fetch in batch and test mode (PR #691 closes #533)
2026-03-10 11:41:37 -04:00
Kili
faa68333a9 Avoid extra mailbox fetch in batch/test mode and add regression test (#691)
Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com>
2026-03-10 11:22:39 -04:00
Kili
d34a33e980 Validate MS Graph certificate auth inputs (#693)
* Validate MS Graph certificate auth inputs

* Fix MS Graph shared scope detection without username
2026-03-10 11:22:09 -04:00
Kili
9040a38842 Refine MS Graph well-known folder fallback (#694)
* Refine MS Graph well-known folder fallback

* Make MS Graph retry test doubles method-aware
2026-03-10 11:20:43 -04:00
Kili
ea0e3b11c1 Add MS Graph certificate authentication support (#692)
* Add MS Graph certificate authentication support

* Preserve MS Graph constructor compatibility

---------

Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com>
2026-03-10 09:30:39 -04:00
Kili
199b782191 Add MS Graph well-known folder fallback for root listing failures (#689)
* Add MS Graph well-known folder fallback for root listing failures

* Resolve test merge cleanup for MS Graph folder fallback
2026-03-10 09:25:37 -04:00
Kili
25f3c3e1d0 Add security policy (#688)
* Add security policy

* Update SECURITY.md for vulnerability reporting clarity

Clarified instructions for reporting vulnerabilities and updated language regarding security fixes.

---------

Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com>
2026-03-09 18:24:16 -04:00
Kili
a14ff66f5a Add GitHub issue templates (#686) 2026-03-09 18:17:06 -04:00
Kili
fb738bf9c4 Add contributing guide (#685) 2026-03-09 18:16:47 -04:00
Kili
0e811fe0ff Add pull request template (#687) 2026-03-09 18:15:40 -04:00
Kili
56eb565ad2 Accept pathlib.Path in report parsing APIs (#680)
* Accept pathlib.Path in report parsing APIs

* Polish PathLike typing and test names
2026-03-09 18:08:57 -04:00
Kili
2c3abb3e8c Retry transient MS Graph request errors (#679)
* Retry transient MS Graph request errors

* Handle zero MS Graph retry attempts explicitly
2026-03-09 17:56:22 -04:00
Kili
326e630f50 Add performance tuning guidance for large mailbox runs (#677) 2026-03-09 17:44:42 -04:00
Kili
cdc30e6780 Tune Codecov statuses for small PRs (#678) 2026-03-09 17:43:34 -04:00
Kili
f2febf21d3 Add fail_on_output_error CLI option for sink failures (#672)
* Add fail-on-output-error option and CLI regression test

* Broaden fail_on_output_error coverage for disabled and multi-sink paths
2026-03-09 17:35:38 -04:00
Kili
79f47121a4 Pass mailbox since filter through watch_inbox callback (#670)
* Pass mailbox since through watch loop and add regression test

* Add CLI regression test for mailbox since in watch mode
2026-03-09 17:33:42 -04:00
Kili
6e6c90e19b Add IMAP move/delete compatibility fallbacks (#671)
* Add IMAP move/delete compatibility fallbacks with tests

* Expand IMAP fallback tests for success and error paths
2026-03-09 17:29:01 -04:00
Kili
c4d7455839 Add OpenSearch AWS SigV4 authentication support (#673)
* Add OpenSearch AWS SigV4 authentication support

* Increase SigV4 coverage for auth validation and CLI config wiring

* Update parsedmarc/opensearch.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update docs/source/usage.md

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-09 17:21:22 -04:00
Kili
95e6fb85a1 Fix Gmail delete_message to execute API request (#668)
* Fix Gmail delete to execute request and add regression test

* Fix duplicate GmailConnection import in tests
2026-03-09 17:11:35 -04:00
Kili
298d5b6e6e CI: split lint/docs/build from integration tests matrix (#669)
* Optimize CI: split lint/docs/build from integration tests

* Trim unnecessary package install from lint job
2026-03-09 17:09:02 -04:00
Kili
a3c5bb906b Add Gmail service account auth mode with delegated user support (#676) 2026-03-09 17:04:30 -04:00
Kili
d49ce6a13f Increase unit test coverage for Gmail/Graph/IMAP connectors (#664)
* Increase coverage for Gmail, Graph, and IMAP mail connectors

* Make testLoadTokenMissing use guaranteed-missing temp path

* Expand coverage for Gmail token refresh and Graph pagination error paths
2026-03-09 11:54:43 -04:00
Sean Whalen
adb0d31382 9.1.2
- Fix duplicate detection for normalized aggregate reports in Elasticsearch/OpenSearch (PR #666 fixes issue #665)
2026-03-06 13:41:33 -05:00
Copilot
ae5d20ecf5 Fix duplicate detection for normalized aggregate reports in Elasticsearch/OpenSearch (#666)
Change date_begin/date_end queries from exact match to range queries
(gte/lte) so that previously saved normalized time buckets are correctly
detected as duplicates within the original report's date range.

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-06 13:21:54 -05:00
Kili
e98fdfa96b Fix Python 3.14 support metadata and require imapclient 3.1.0 (#662) 2026-03-04 12:36:15 -05:00
Sean Whalen
9551c8b467 Add AGENTS.md for AI agent guidance and link from CLAUDE.md 2026-03-03 21:00:55 -05:00
Sean Whalen
d987943c22 Update changelog formatting for version 9.1.1 2026-03-03 11:46:13 -05:00
Sean Whalen
3d8a99b5d3 9.1.1
- Fix the use of Elasticsearch and OpenSearch API keys (PR #660 fixes issue #653)
- Drop support for Python 3.9 (PR #661)
2026-03-03 11:43:53 -05:00
Majid Burney
5aaaedf463 Use correct key names for elasticsearch/opensearch api keys (#660) 2026-03-03 11:35:05 -05:00
Copilot
2e3ee25ec9 Drop Python 3.9 support (#661)
* Initial plan

* Drop Python 3.9 support: update CI matrix, pyproject.toml, docs, and README

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update Python 3.9 version table entry to note Debian 11/RHEL 9 usage

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-03 11:34:35 -05:00
Sean Whalen
33eb2aaf62 9.1.0
## Enhancements

- Add TCP and TLS support for syslog output. (#656)
- Skip DNS lookups in GitHub Actions to prevent DNS timeouts during tests timeouts. (#657)
- Remove microseconds from DMARC aggregate report time ranges before parsing them.
2026-02-20 14:36:37 -05:00
Sean Whalen
1387fb4899 9.0.11
- Remove microseconds from DMARC aggregate report time ranges before parsing them.
2026-02-20 14:27:51 -05:00
Copilot
4d97bd25aa Skip DNS lookups in GitHub Actions to prevent test timeouts (#657)
* Add offline mode for tests in GitHub Actions to skip DNS lookups

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-02-18 18:19:28 -05:00
Copilot
17a612df0c Add TCP and TLS transport support to syslog module (#656)
- Updated parsedmarc/syslog.py to support UDP, TCP, and TLS protocols
- Added protocol parameter with UDP as default for backward compatibility
- Implemented TLS support with CA verification and client certificate auth
- Added retry logic for TCP/TLS connections with configurable attempts and delays
- Updated parsedmarc/cli.py with new config file parsing
- Updated documentation with examples for TCP and TLS configurations

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Remove CLI arguments for syslog options, keep config-file only

Per user request, removed command-line argument options for syslog parameters.
All new syslog options (protocol, TLS settings, timeout, retry) are now only
available via configuration file, consistent with other similar options.

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Fix code review issues: remove trailing whitespace and add cert validation

- Removed trailing whitespace from syslog.py and usage.md
- Added warning when only one of certfile_path/keyfile_path is provided
- Improved error handling for incomplete TLS client certificate configuration

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Set minimum TLS version to 1.2 for enhanced security

Explicitly configured ssl_context.minimum_version = TLSVersion.TLSv1_2
to ensure only secure TLS versions are used for syslog connections.

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-02-18 18:12:59 -05:00
Blackmoon
221bc332ef Fixed a typo in policies.successful_session_count (#654) 2026-02-09 13:57:11 -05:00
Sean Whalen
a2a75f7a81 Fix timestamp parsing in aggregate report by removing fractional seconds 2026-01-21 13:08:48 -05:00
Anael Mobilia
50fcb51577 Update supported Python versions in docs + readme (#652)
* Update README.md

* Update index.md

* Update python-tests.yml
2026-01-19 14:40:01 -05:00
Sean Whalen
dd9ef90773 9.0.10
- Support Python 3.14+
2026-01-17 14:09:18 -05:00
Sean Whalen
0e3a4b0f06 9.0.9
Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
2026-01-08 13:29:23 -05:00
maraspr
343b53ef18 remove newlines before b64decode (#649) 2026-01-08 12:24:20 -05:00
maraspr
792079a3e8 Validate that string is base64 (#648) 2026-01-08 10:15:27 -05:00
Sean Whalen
1f3a1fc843 Better typing 2025-12-29 17:14:54 -05:00
Sean Whalen
34fa0c145d 9.0.8
- Fix logging configuration not propagating to child parser processes (#646).
- Update `mailsuite` dependency to `?=1.11.1` to solve issues with iCloud IMAP (#493).
2025-12-29 17:07:38 -05:00
Copilot
6719a06388 Fix logging configuration not propagating to child parser processes (#646)
* Initial plan

* Fix logging configuration propagation to child parser processes

- Add _configure_logging() helper function to set up logging in child processes
- Modified cli_parse() to accept log_level and log_file parameters
- Pass current logging configuration from parent to child processes
- Logging warnings/errors from child processes now properly display

Fixes issue where logging handlers in parent process were not inherited by
child processes created via multiprocessing.Process(). Child processes now
configure their own logging with the same settings as the parent.

Tested with sample files and confirmed warnings from DNS exceptions in child
processes are now visible.

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Address code review feedback on logging configuration

- Use exact type check (type(h) is logging.StreamHandler) instead of isinstance
  to avoid confusion with FileHandler subclass
- Catch specific exceptions (IOError, OSError, PermissionError) instead of
  bare Exception when creating FileHandler
- Kept logging.ERROR as default to maintain consistency with existing behavior

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-29 15:07:22 -05:00
Sean Whalen
eafa435868 Code cleanup 2025-12-29 14:32:05 -05:00
Sean Whalen
5d772c3b36 Bump version to 9.0.7 and update changelog with IMAP since option fix 2025-12-29 14:23:50 -05:00
Copilot
72cabbef23 Fix IMAP SEARCH SINCE date format to RFC 3501 DD-Mon-YYYY (#645)
* Initial plan

* Fix IMAP since option date format to use RFC 3501 compliant DD-Mon-YYYY format

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-29 14:18:48 -05:00
Sean Whalen
3d74cd6ac0 Update CHANGELOG with issue reference for email read status
Added a reference to issue #625 regarding email read status.
2025-12-29 12:10:19 -05:00
Tomáš Kováčik
d1ac59a016 fix #641 (#642)
* fix smtptls and forensic reports for GELF

* add policy_domain, policy_type and failed_session_count to record row

* Remove unused import of json in gelf.py

---------

Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com>
2025-12-29 12:05:07 -05:00
Anael Mobilia
7fdd53008f Update README.md (#644) 2025-12-29 10:36:21 -05:00
Sean Whalen
35331d4b84 Add parsedmarc.types module to API reference documentation 2025-12-25 17:24:45 -05:00
Sean Whalen
de9edd3590 Add note about email read status in Microsoft 365 to changelog 2025-12-25 17:16:39 -05:00
Sean Whalen
abf4bdba13 Add type annotations for SMTP TLS and forensic report structures 2025-12-25 16:39:33 -05:00
Sean Whalen
7b842740f5 Change file permissions for tests.py to make it executable 2025-12-25 16:02:33 -05:00
Sean Whalen
ebe3ccf40a Update changelog for version 9.0.6 and set version in constants.py 2025-12-25 16:01:25 -05:00
Sean Whalen
808285658f Refactor function parameters to use non-Optional types where applicable 2025-12-25 16:01:12 -05:00
Sean Whalen
bc1dae29bd Update mailsuite dependency version to 1.11.0 2025-12-25 15:32:27 -05:00
Sean Whalen
4b904444e5 Refactor and improve parsing and extraction functions
- Updated `extract_report` to handle various input types more robustly, removing unnecessary complexity and improving error handling.
- Simplified the handling of file-like objects and added checks for binary mode.
- Enhanced the `parse_report_email` function to streamline input processing and improve type handling.
- Introduced TypedDicts for better type safety in `utils.py`, specifically for reverse DNS and IP address information.
- Refined the configuration loading in `cli.py` to ensure boolean values are consistently cast to `bool`.
- Improved overall code readability and maintainability by restructuring and clarifying logic in several functions.
2025-12-25 15:30:20 -05:00
Sean Whalen
3608bce344 Remove unused import of Union and cast from cli.py 2025-12-24 16:53:22 -05:00
Sean Whalen
fe809c4c3f Add type ignore comments for Pyright in elastic.py and opensearch.py 2025-12-24 16:49:42 -05:00
Sean Whalen
a76c2f9621 More code cleanup 2025-12-24 16:36:59 -05:00
Sean Whalen
bb8f4002bf Use literal dicts instead of ordered dicts and other code cleanup 2025-12-24 15:04:10 -05:00
Sean Whalen
b5773c6b4a Fix etree import to type checkers don't complain 2025-12-24 14:37:38 -05:00
Sean Whalen
b99bd67225 Fix get_base_domain() typing 2025-12-24 14:32:05 -05:00
40 changed files with 4061 additions and 938 deletions

72
.github/ISSUE_TEMPLATE/bug_report.yml vendored Normal file
View File

@@ -0,0 +1,72 @@
name: Bug report
description: Report a reproducible parsedmarc bug
title: "[Bug]: "
labels:
- bug
body:
- type: input
id: version
attributes:
label: parsedmarc version
description: Include the parsedmarc version or commit if known.
placeholder: 9.x.x
validations:
required: true
- type: dropdown
id: input_backend
attributes:
label: Input backend
description: Which input path or mailbox backend is involved?
options:
- IMAP
- MS Graph
- Gmail API
- Maildir
- mbox
- Local file / direct parse
- Other
validations:
required: true
- type: textarea
id: environment
attributes:
label: Environment
description: Runtime, container image, OS, Python version, or deployment details.
placeholder: Docker on Debian, Python 3.12, parsedmarc installed from PyPI
validations:
required: true
- type: textarea
id: config
attributes:
label: Sanitized config
description: Include the relevant config fragment with secrets removed.
render: ini
- type: textarea
id: steps
attributes:
label: Steps to reproduce
description: Describe the smallest reproducible sequence you can.
placeholder: |
1. Configure parsedmarc with ...
2. Run ...
3. Observe ...
validations:
required: true
- type: textarea
id: expected_actual
attributes:
label: Expected vs actual behavior
description: What did you expect, and what happened instead?
validations:
required: true
- type: textarea
id: logs
attributes:
label: Logs or traceback
description: Paste sanitized logs or a traceback if available.
render: text
- type: textarea
id: samples
attributes:
label: Sample report availability
description: If you can share a sanitized sample report or message, note that here.

5
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@@ -0,0 +1,5 @@
blank_issues_enabled: true
contact_links:
- name: Security issue
url: https://github.com/domainaware/parsedmarc/security/policy
about: Please use the security policy and avoid filing public issues for undisclosed vulnerabilities.

View File

@@ -0,0 +1,30 @@
name: Feature request
description: Suggest a new feature or behavior change
title: "[Feature]: "
labels:
- enhancement
body:
- type: textarea
id: problem
attributes:
label: Problem statement
description: What workflow or limitation are you trying to solve?
validations:
required: true
- type: textarea
id: proposal
attributes:
label: Proposed behavior
description: Describe the feature or behavior you want.
validations:
required: true
- type: textarea
id: alternatives
attributes:
label: Alternatives considered
description: Describe workarounds or alternative approaches you considered.
- type: textarea
id: impact
attributes:
label: Compatibility or operational impact
description: Note config, output, performance, or deployment implications if relevant.

24
.github/pull_request_template.md vendored Normal file
View File

@@ -0,0 +1,24 @@
## Summary
-
## Why
-
## Testing
-
## Backward Compatibility / Risk
-
## Related Issue
- Closes #
## Checklist
- [ ] Tests added or updated if behavior changed
- [ ] Docs updated if config or user-facing behavior changed

View File

@@ -10,7 +10,32 @@ on:
branches: [ master ] branches: [ master ]
jobs: jobs:
build: lint-docs-build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.13"
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install .[build]
- name: Check code style
run: |
ruff check .
- name: Test building documentation
run: |
cd docs
make html
- name: Test building packages
run: |
hatch build
test:
needs: lint-docs-build
runs-on: ubuntu-latest runs-on: ubuntu-latest
services: services:
@@ -30,7 +55,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v5
@@ -46,13 +71,6 @@ jobs:
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install .[build] pip install .[build]
- name: Test building documentation
run: |
cd docs
make html
- name: Check code style
run: |
ruff check .
- name: Run unit tests - name: Run unit tests
run: | run: |
pytest --cov --cov-report=xml tests.py pytest --cov --cov-report=xml tests.py
@@ -61,9 +79,6 @@ jobs:
pip install -e . pip install -e .
parsedmarc --debug -c ci.ini samples/aggregate/* parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/* parsedmarc --debug -c ci.ini samples/forensic/*
- name: Test building packages
run: |
hatch build
- name: Upload coverage to Codecov - name: Upload coverage to Codecov
uses: codecov/codecov-action@v5 uses: codecov/codecov-action@v5
with: with:

292
.vscode/settings.json vendored
View File

@@ -13,148 +13,154 @@
"MD024": false "MD024": false
}, },
"cSpell.words": [ "cSpell.words": [
"adkim", "adkim",
"akamaiedge", "akamaiedge",
"amsmath", "amsmath",
"andrewmcgilvray", "andrewmcgilvray",
"arcname", "arcname",
"aspf", "aspf",
"autoclass", "autoclass",
"automodule", "automodule",
"backported", "backported",
"bellsouth", "bellsouth",
"boto", "boto",
"brakhane", "brakhane",
"Brightmail", "Brightmail",
"CEST", "CEST",
"CHACHA", "CHACHA",
"checkdmarc", "checkdmarc",
"Codecov", "Codecov",
"confnew", "confnew",
"dateparser", "dateparser",
"dateutil", "dateutil",
"Davmail", "Davmail",
"DBIP", "DBIP",
"dearmor", "dearmor",
"deflist", "deflist",
"devel", "devel",
"DMARC", "DMARC",
"Dmarcian", "Dmarcian",
"dnspython", "dnspython",
"dollarmath", "dollarmath",
"dpkg", "dpkg",
"exampleuser", "exampleuser",
"expiringdict", "expiringdict",
"fieldlist", "fieldlist",
"GELF", "GELF",
"genindex", "genindex",
"geoip", "geoip",
"geoipupdate", "geoipupdate",
"Geolite", "Geolite",
"geolocation", "geolocation",
"githubpages", "githubpages",
"Grafana", "Grafana",
"hostnames", "hostnames",
"htpasswd", "htpasswd",
"httpasswd", "httpasswd",
"httplib", "httplib",
"IMAP", "ifhost",
"imapclient", "IMAP",
"infile", "imapclient",
"Interaktive", "infile",
"IPDB", "Interaktive",
"journalctl", "IPDB",
"keepalive", "journalctl",
"keyout", "kafkaclient",
"keyrings", "keepalive",
"Leeman", "keyout",
"libemail", "keyrings",
"linkify", "Leeman",
"LISTSERV", "libemail",
"lxml", "linkify",
"mailparser", "LISTSERV",
"mailrelay", "loganalytics",
"mailsuite", "lxml",
"maxdepth", "mailparser",
"MAXHEADERS", "mailrelay",
"maxmind", "mailsuite",
"mbox", "maxdepth",
"mfrom", "MAXHEADERS",
"michaeldavie", "maxmind",
"mikesiegel", "mbox",
"Mimecast", "mfrom",
"mitigations", "mhdw",
"MMDB", "michaeldavie",
"modindex", "mikesiegel",
"msgconvert", "Mimecast",
"msgraph", "mitigations",
"MSSP", "MMDB",
"multiprocess", "modindex",
"Munge", "msgconvert",
"ndjson", "msgraph",
"newkey", "MSSP",
"Nhcm", "multiprocess",
"nojekyll", "Munge",
"nondigest", "ndjson",
"nosecureimap", "newkey",
"nosniff", "Nhcm",
"nwettbewerb", "nojekyll",
"opensearch", "nondigest",
"opensearchpy", "nosecureimap",
"parsedmarc", "nosniff",
"passsword", "nwettbewerb",
"Postorius", "opensearch",
"premade", "opensearchpy",
"procs", "parsedmarc",
"publicsuffix", "passsword",
"publicsuffixlist", "pbar",
"publixsuffix", "Postorius",
"pygelf", "premade",
"pypy", "privatesuffix",
"pytest", "procs",
"quickstart", "publicsuffix",
"Reindex", "publicsuffixlist",
"replyto", "publixsuffix",
"reversename", "pygelf",
"Rollup", "pypy",
"Rpdm", "pytest",
"SAMEORIGIN", "quickstart",
"sdist", "Reindex",
"Servernameone", "replyto",
"setuptools", "reversename",
"smartquotes", "Rollup",
"SMTPTLS", "Rpdm",
"sortlists", "SAMEORIGIN",
"sortmaps", "sdist",
"sourcetype", "Servernameone",
"STARTTLS", "setuptools",
"tasklist", "smartquotes",
"timespan", "SMTPTLS",
"tlsa", "sortlists",
"tlsrpt", "sortmaps",
"toctree", "sourcetype",
"TQDDM", "STARTTLS",
"tqdm", "tasklist",
"truststore", "timespan",
"Übersicht", "tlsa",
"uids", "tlsrpt",
"Uncategorized", "toctree",
"unparasable", "TQDDM",
"uper", "tqdm",
"urllib", "truststore",
"Valimail", "Übersicht",
"venv", "uids",
"Vhcw", "Uncategorized",
"viewcode", "unparasable",
"virtualenv", "uper",
"WBITS", "urllib",
"webmail", "Valimail",
"Wettbewerber", "venv",
"Whalen", "Vhcw",
"whitespaces", "viewcode",
"xennn", "virtualenv",
"xmltodict", "WBITS",
"xpack", "webmail",
"zscholl" "Wettbewerber",
"Whalen",
"whitespaces",
"xennn",
"xmltodict",
"xpack",
"zscholl"
], ],
} }

64
AGENTS.md Normal file
View File

@@ -0,0 +1,64 @@
# AGENTS.md
This file provides guidance to AI agents when working with code in this repository.
## Project Overview
parsedmarc is a Python module and CLI utility for parsing DMARC aggregate (RUA), forensic (RUF), and SMTP TLS reports. It reads reports from IMAP, Microsoft Graph, Gmail API, Maildir, mbox files, or direct file paths, and outputs to JSON/CSV, Elasticsearch, OpenSearch, Splunk, Kafka, S3, Azure Log Analytics, syslog, or webhooks.
## Common Commands
```bash
# Install with dev/build dependencies
pip install .[build]
# Run all tests with coverage
pytest --cov --cov-report=xml tests.py
# Run a single test
pytest tests.py::Test::testAggregateSamples
# Lint and format
ruff check .
ruff format .
# Test CLI with sample reports
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/*
# Build docs
cd docs && make html
# Build distribution
hatch build
```
To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
## Architecture
**Data flow:** Input sources → CLI (`cli.py:_main`) → Parse (`__init__.py`) → Enrich (DNS/GeoIP via `utils.py`) → Output integrations
### Key modules
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
- `parsedmarc/{elastic,opensearch,splunk,kafkaclient,loganalytics,syslog,s3,webhook,gelf}.py` — Output integrations
### Report type system
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError``InvalidDMARCReport``InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
### Caching
IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour (via `ExpiringDict`).
## Code Style
- Ruff for formatting and linting (configured in `.vscode/settings.json`)
- TypedDict for structured data, type hints throughout
- Python ≥3.10 required
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`

View File

@@ -1,5 +1,98 @@
# Changelog # Changelog
## 9.2.1
### Added
- Better checking of `msconfig` configuration (PR #695)
### Changed
- Updated `dbip-country-lite` database to version `2026-03`
- DNS query error logging level from `warning` to `debug`
## 9.2.0
### Added
- OpenSearch AWS SigV4 authentication support (PR #673)
- IMAP move/delete compatibility fallbacks (PR #671)
- `fail_on_output_error` CLI option for sink failures (PR #672)
- Gmail service account auth mode for non-interactive runs (PR #676)
- Microsoft Graph certificate authentication support (PRs #692 and #693)
- Microsoft Graph well-known folder fallback for root listing failures (PR #618 and #684 close #609)
### Fixed
- Pass mailbox since filter through `watch_inbox` callback (PR #670 closes issue #581)
- `parsedmarc.mail.gmail.GmailConnection.delete_message` now properly calls the Gmail API (PR #668)
- Avoid extra mailbox fetch in batch and test mode (PR #691 closes #533)
## 9.1.2
### Fixes
- Fix duplicate detection for normalized aggregate reports in Elasticsearch/OpenSearch (PR #666 fixes issue #665)
## 9.1.1
### Fixes
- Fix the use of Elasticsearch and OpenSearch API keys (PR #660 fixes issue #653)
### Changes
- Drop support for Python 3.9 (PR #661)
## 9.1.0
## Enhancements
- Add TCP and TLS support for syslog output. (#656)
- Skip DNS lookups in GitHub Actions to prevent DNS timeouts during tests timeouts. (#657)
- Remove microseconds from DMARC aggregate report time ranges before parsing them.
## 9.0.10
- Support Python 3.14+
## 9.0.9
### Fixes
- Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
## 9.0.8
### Fixes
- Fix logging configuration not propagating to child parser processes (#646).
- Update `mailsuite` dependency to `?=1.11.1` to solve issues with iCloud IMAP (#493).
## 9.0.7
## Fixes
- Fix IMAP `since` option (#PR 645 closes issues #581 and #643).
## 9.0.6
### Fixes
- Fix #638.
- Fix/clarify report extraction and parsing behavior for multiple input types (bytes, base64 strings, and file-like objects).
- Fix type mismatches that could cause runtime issues in SMTP emailing and CLI option handling.
### Improvements
- Improve type hints across the library (Pylance/Pyright friendliness) and reduce false-positive linter errors.
- Emails in Microsoft 365 are now marked read as they are read. This provides constancy with other mailbox types, and gives you a indication of when emails are being read as they are processed in batches. (Close #625)
### Compatibility / Dependencies
- Set Python requirement to `>=3.9,<3.14`.
- Bump `mailsuite` requirement to `>=1.11.0`.
## 9.0.5 ## 9.0.5
## Fixes ## Fixes

3
CLAUDE.md Normal file
View File

@@ -0,0 +1,3 @@
# CLAUD.md
@AGENTS.md

78
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,78 @@
# Contributing
Thanks for contributing to parsedmarc.
## Local setup
Use a virtual environment for local development.
```bash
python3 -m venv .venv
. .venv/bin/activate
python -m pip install --upgrade pip
pip install .[build]
```
## Before opening a pull request
Run the checks that match your change:
```bash
ruff check .
pytest --cov --cov-report=xml tests.py
```
If you changed documentation:
```bash
cd docs
make html
```
If you changed CLI behavior or parsing logic, it is also useful to exercise the
sample reports:
```bash
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/*
```
To skip DNS lookups during tests, set:
```bash
GITHUB_ACTIONS=true
```
## Pull request guidelines
- Keep pull requests small and focused. Separate bug fixes, docs updates, and
repo-maintenance changes where practical.
- Add or update tests when behavior changes.
- Update docs when configuration or user-facing behavior changes.
- Include a short summary, the reason for the change, and the testing you ran.
- Link the related issue when there is one.
## Branch maintenance
Upstream `master` may move quickly. Before asking for review or after another PR
lands, rebase your branch onto the current upstream branch and force-push with
lease if needed:
```bash
git fetch upstream
git rebase upstream/master
git push --force-with-lease
```
## CI and coverage
GitHub Actions is the source of truth for linting, docs, and test status.
Codecov patch coverage is usually the most relevant signal for small PRs. Project
coverage can be noisier when the base comparison is stale, so interpret it in
the context of the actual diff.
## Questions
Use GitHub issues for bugs and feature requests. If you are not sure whether a
change is wanted, opening an issue first is usually the safest path.

View File

@@ -56,9 +56,9 @@ for RHEL or Debian.
| 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies | | 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies |
| 3.7 | ❌ | End of Life (EOL) | | 3.7 | ❌ | End of Life (EOL) |
| 3.8 | ❌ | End of Life (EOL) | | 3.8 | ❌ | End of Life (EOL) |
| 3.9 | | Supported until August 2026 (Debian 11); May 2032 (RHEL 9) | | 3.9 | | Used in Debian 11 and RHEL 9, but not supported by project dependencies |
| 3.10 | ✅ | Actively maintained | | 3.10 | ✅ | Actively maintained |
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) | | 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) | | 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) | | 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Not currently supported due to Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)| | 3.14 | | Supported (requires `imapclient>=3.1.0`) |

29
SECURITY.md Normal file
View File

@@ -0,0 +1,29 @@
# Security Policy
## Reporting a vulnerability
Please do not open a public GitHub issue for an undisclosed security
vulnerability. Use GitHub private vulnerability reporting in the Security tab of this project instead.
When reporting a vulnerability, include:
- the affected parsedmarc version or commit
- the component or integration involved
- clear reproduction details if available
- potential impact
- any suggested mitigation or workaround
## Supported versions
Security fixes will be applied to the latest released version and
the current `master` branch.
Older versions will not receive backported fixes.
## Disclosure process
After a report is received, maintainers can validate the issue, assess impact,
and coordinate a fix before public disclosure.
Please avoid publishing proof-of-concept details until maintainers have had a
reasonable opportunity to investigate and release a fix or mitigation.

1
ci.ini
View File

@@ -3,6 +3,7 @@ save_aggregate = True
save_forensic = True save_forensic = True
save_smtp_tls = True save_smtp_tls = True
debug = True debug = True
offline = True
[elasticsearch] [elasticsearch]
hosts = http://localhost:9200 hosts = http://localhost:9200

11
codecov.yml Normal file
View File

@@ -0,0 +1,11 @@
codecov:
require_ci_to_pass: true
coverage:
status:
project:
default:
informational: true
patch:
default:
informational: false

View File

@@ -28,6 +28,13 @@
:members: :members:
``` ```
## parsedmarc.types
```{eval-rst}
.. automodule:: parsedmarc.types
:members:
```
## parsedmarc.utils ## parsedmarc.utils
```{eval-rst} ```{eval-rst}

View File

@@ -56,12 +56,12 @@ for RHEL or Debian.
| 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies | | 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies |
| 3.7 | ❌ | End of Life (EOL) | | 3.7 | ❌ | End of Life (EOL) |
| 3.8 | ❌ | End of Life (EOL) | | 3.8 | ❌ | End of Life (EOL) |
| 3.9 | | Supported until August 2026 (Debian 11); May 2032 (RHEL 9) | | 3.9 | | Used in Debian 11 and RHEL 9, but not supported by project dependencies |
| 3.10 | ✅ | Actively maintained | | 3.10 | ✅ | Actively maintained |
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) | | 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) | | 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) | | 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)| | 3.14 | | Supported (requires `imapclient>=3.1.0`) |
```{toctree} ```{toctree}
:caption: 'Contents' :caption: 'Contents'

View File

@@ -162,10 +162,10 @@ sudo -u parsedmarc virtualenv /opt/parsedmarc/venv
``` ```
CentOS/RHEL 8 systems use Python 3.6 by default, so on those systems CentOS/RHEL 8 systems use Python 3.6 by default, so on those systems
explicitly tell `virtualenv` to use `python3.9` instead explicitly tell `virtualenv` to use `python3.10` instead
```bash ```bash
sudo -u parsedmarc virtualenv -p python3.9 /opt/parsedmarc/venv sudo -u parsedmarc virtualenv -p python3.10 /opt/parsedmarc/venv
``` ```
Activate the virtualenv Activate the virtualenv

View File

@@ -146,6 +146,9 @@ The full set of configuration options are:
- `dns_timeout` - float: DNS timeout period - `dns_timeout` - float: DNS timeout period
- `debug` - bool: Print debugging messages - `debug` - bool: Print debugging messages
- `silent` - bool: Only print errors (Default: `True`) - `silent` - bool: Only print errors (Default: `True`)
- `fail_on_output_error` - bool: Exit with a non-zero status code if
any configured output destination fails while saving/publishing
reports (Default: `False`)
- `log_file` - str: Write log messages to a file at this path - `log_file` - str: Write log messages to a file at this path
- `n_procs` - int: Number of process to run in parallel when - `n_procs` - int: Number of process to run in parallel when
parsing in CLI mode (Default: `1`) parsing in CLI mode (Default: `1`)
@@ -171,8 +174,8 @@ The full set of configuration options are:
- `check_timeout` - int: Number of seconds to wait for a IMAP - `check_timeout` - int: Number of seconds to wait for a IMAP
IDLE response or the number of seconds until the next IDLE response or the number of seconds until the next
mail check (Default: `30`) mail check (Default: `30`)
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`) - `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}. Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
Defaults to `1d` if incorrect value is provided. Defaults to `1d` if incorrect value is provided.
- `imap` - `imap`
- `host` - str: The IMAP server hostname or IP address - `host` - str: The IMAP server hostname or IP address
@@ -200,7 +203,7 @@ The full set of configuration options are:
- `password` - str: The IMAP password - `password` - str: The IMAP password
- `msgraph` - `msgraph`
- `auth_method` - str: Authentication method, valid types are - `auth_method` - str: Authentication method, valid types are
`UsernamePassword`, `DeviceCode`, or `ClientSecret` `UsernamePassword`, `DeviceCode`, `ClientSecret`, or `Certificate`
(Default: `UsernamePassword`). (Default: `UsernamePassword`).
- `user` - str: The M365 user, required when the auth method is - `user` - str: The M365 user, required when the auth method is
UsernamePassword UsernamePassword
@@ -208,6 +211,11 @@ The full set of configuration options are:
method is UsernamePassword method is UsernamePassword
- `client_id` - str: The app registration's client ID - `client_id` - str: The app registration's client ID
- `client_secret` - str: The app registration's secret - `client_secret` - str: The app registration's secret
- `certificate_path` - str: Path to a PEM or PKCS12 certificate
including the private key. Required when the auth method is
`Certificate`
- `certificate_password` - str: Optional password for the
certificate file when using `Certificate` auth
- `tenant_id` - str: The Azure AD tenant ID. This is required - `tenant_id` - str: The Azure AD tenant ID. This is required
for all auth methods except UsernamePassword. for all auth methods except UsernamePassword.
- `mailbox` - str: The mailbox name. This defaults to the - `mailbox` - str: The mailbox name. This defaults to the
@@ -240,11 +248,14 @@ The full set of configuration options are:
group and use that as the group id. group and use that as the group id.
```powershell ```powershell
New-ApplicationAccessPolicy -AccessRight RestrictAccess New-ApplicationAccessPolicy -AccessRight RestrictAccess
-AppId "<CLIENT_ID>" -PolicyScopeGroupId "<MAILBOX>" -AppId "<CLIENT_ID>" -PolicyScopeGroupId "<MAILBOX>"
-Description "Restrict access to dmarc reports mailbox." -Description "Restrict access to dmarc reports mailbox."
``` ```
The same application permission and mailbox scoping guidance
applies to the `Certificate` auth method.
::: :::
- `elasticsearch` - `elasticsearch`
- `hosts` - str: A comma separated list of hostnames and ports - `hosts` - str: A comma separated list of hostnames and ports
@@ -281,6 +292,10 @@ The full set of configuration options are:
- `user` - str: Basic auth username - `user` - str: Basic auth username
- `password` - str: Basic auth password - `password` - str: Basic auth password
- `api_key` - str: API key - `api_key` - str: API key
- `auth_type` - str: Authentication type: `basic` (default) or `awssigv4` (the key `authentication_type` is accepted as an alias for this option)
- `aws_region` - str: AWS region for SigV4 authentication
(required when `auth_type = awssigv4`)
- `aws_service` - str: AWS service for SigV4 signing (Default: `es`)
- `ssl` - bool: Use an encrypted SSL/TLS connection - `ssl` - bool: Use an encrypted SSL/TLS connection
(Default: `True`) (Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60) - `timeout` - float: Timeout in seconds (Default: 60)
@@ -336,16 +351,77 @@ The full set of configuration options are:
- `secret_access_key` - str: The secret access key (Optional) - `secret_access_key` - str: The secret access key (Optional)
- `syslog` - `syslog`
- `server` - str: The Syslog server name or IP address - `server` - str: The Syslog server name or IP address
- `port` - int: The UDP port to use (Default: `514`) - `port` - int: The port to use (Default: `514`)
- `protocol` - str: The protocol to use: `udp`, `tcp`, or `tls` (Default: `udp`)
- `cafile_path` - str: Path to CA certificate file for TLS server verification (Optional)
- `certfile_path` - str: Path to client certificate file for TLS authentication (Optional)
- `keyfile_path` - str: Path to client private key file for TLS authentication (Optional)
- `timeout` - float: Connection timeout in seconds for TCP/TLS (Default: `5.0`)
- `retry_attempts` - int: Number of retry attempts for failed connections (Default: `3`)
- `retry_delay` - int: Delay in seconds between retry attempts (Default: `5`)
**Example UDP configuration (default):**
```ini
[syslog]
server = syslog.example.com
port = 514
```
**Example TCP configuration:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tcp
timeout = 10.0
retry_attempts = 5
```
**Example TLS configuration with server verification:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
timeout = 10.0
```
**Example TLS configuration with mutual authentication:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
certfile_path = /path/to/client-cert.pem
keyfile_path = /path/to/client-key.pem
timeout = 10.0
retry_attempts = 3
retry_delay = 5
```
- `gmail_api` - `gmail_api`
- `credentials_file` - str: Path to file containing the - `credentials_file` - str: Path to file containing the
credentials, None to disable (Default: `None`) credentials, None to disable (Default: `None`)
- `token_file` - str: Path to save the token file - `token_file` - str: Path to save the token file
(Default: `.token`) (Default: `.token`)
- `auth_mode` - str: Authentication mode, `installed_app` (default)
or `service_account`
- `service_account_user` - str: Delegated mailbox user for Gmail
service account auth (required for domain-wide delegation). Also
accepted as `delegated_user` for backward compatibility.
:::{note} :::{note}
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`. credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
::: :::
:::{note}
When `auth_mode = service_account`, `credentials_file` must point to a
Google service account key JSON file, and `token_file` is not used.
:::
- `include_spam_trash` - bool: Include messages in Spam and - `include_spam_trash` - bool: Include messages in Spam and
Trash when searching reports (Default: `False`) Trash when searching reports (Default: `False`)
- `scopes` - str: Comma separated list of scopes to use when - `scopes` - str: Comma separated list of scopes to use when
@@ -442,7 +518,7 @@ Update the limit to 2k per example:
PUT _cluster/settings PUT _cluster/settings
{ {
"persistent" : { "persistent" : {
"cluster.max_shards_per_node" : 2000 "cluster.max_shards_per_node" : 2000
} }
} }
``` ```
@@ -450,6 +526,33 @@ PUT _cluster/settings
Increasing this value increases resource usage. Increasing this value increases resource usage.
::: :::
## Performance tuning
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
of memory, especially when it runs on the same host as Elasticsearch or
OpenSearch. The following settings can reduce peak memory usage and make long
imports more predictable:
- Reduce `mailbox.batch_size` to smaller values such as `100-500` instead of
processing a very large message set at once. Smaller batches trade throughput
for lower peak memory use and less sink pressure.
- Keep `n_procs` low for mailbox-heavy runs. In practice, `1-2` workers is often
a safer starting point for large backfills than aggressive parallelism.
- Use `mailbox.since` to process reports in smaller time windows such as `1d`,
`7d`, or another interval that fits the backlog. This makes it easier to catch
up incrementally instead of loading an entire mailbox history in one run.
- Set `strip_attachment_payloads = True` when forensic reports contain large
attachments and you do not need to retain the raw payloads in the parsed
output.
- Prefer running parsedmarc separately from Elasticsearch or OpenSearch, or
reserve enough RAM for both services if they must share a host.
- For very large imports, prefer incremental supervised runs, such as a
scheduler or systemd service, over infrequent massive backfills.
These are operational tuning recommendations rather than hard requirements, but
they are often enough to avoid memory pressure and reduce failures during
high-volume mailbox processing.
## Multi-tenant support ## Multi-tenant support
Starting in `8.19.0`, ParseDMARC provides multi-tenant support by placing data into separate OpenSearch or Elasticsearch index prefixes. To set this up, create a YAML file that is formatted where each key is a tenant name, and the value is a list of domains related to that tenant, not including subdomains, like this: Starting in `8.19.0`, ParseDMARC provides multi-tenant support by placing data into separate OpenSearch or Elasticsearch index prefixes. To set this up, create a YAML file that is formatted where each key is a tenant name, and the value is a list of domains related to that tenant, not including subdomains, like this:

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,3 @@
__version__ = "9.0.5" __version__ = "9.2.1"
USER_AGENT = f"parsedmarc/{__version__}" USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -2,30 +2,28 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional, Union, Any from typing import Any, Optional, Union
from collections import OrderedDict from elasticsearch.helpers import reindex
from elasticsearch_dsl.search import Q
from elasticsearch_dsl import ( from elasticsearch_dsl import (
connections, Boolean,
Object, Date,
Document, Document,
Index, Index,
Nested,
InnerDoc, InnerDoc,
Integer, Integer,
Text,
Boolean,
Ip, Ip,
Date, Nested,
Object,
Search, Search,
Text,
connections,
) )
from elasticsearch.helpers import reindex from elasticsearch_dsl.search import Q
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class ElasticsearchError(Exception): class ElasticsearchError(Exception):
@@ -94,17 +92,17 @@ class _AggregateReportDoc(Document):
spf_results = Nested(_SPFResult) spf_results = Nested(_SPFResult)
def add_policy_override(self, type_: str, comment: str): def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue]
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult): def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
self.dkim_results.append( self.dkim_results.append(
_DKIMResult(domain=domain, selector=selector, result=result) _DKIMResult(domain=domain, selector=selector, result=result)
) ) # pyright: ignore[reportCallIssue]
def add_spf_result(self, domain: str, scope: str, result: _SPFResult): def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) # pyright: ignore[reportCallIssue]
def save(self, **kwargs): def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
self.passed_dmarc = False self.passed_dmarc = False
self.passed_dmarc = self.spf_aligned or self.dkim_aligned self.passed_dmarc = self.spf_aligned or self.dkim_aligned
@@ -138,25 +136,25 @@ class _ForensicSampleDoc(InnerDoc):
attachments = Nested(_EmailAttachmentDoc) attachments = Nested(_EmailAttachmentDoc)
def add_to(self, display_name: str, address: str): def add_to(self, display_name: str, address: str):
self.to.append(_EmailAddressDoc(display_name=display_name, address=address)) self.to.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
def add_reply_to(self, display_name: str, address: str): def add_reply_to(self, display_name: str, address: str):
self.reply_to.append( self.reply_to.append(
_EmailAddressDoc(display_name=display_name, address=address) _EmailAddressDoc(display_name=display_name, address=address)
) ) # pyright: ignore[reportCallIssue]
def add_cc(self, display_name: str, address: str): def add_cc(self, display_name: str, address: str):
self.cc.append(_EmailAddressDoc(display_name=display_name, address=address)) self.cc.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
def add_bcc(self, display_name: str, address: str): def add_bcc(self, display_name: str, address: str):
self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address)) self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
def add_attachment(self, filename: str, content_type: str, sha256: str): def add_attachment(self, filename: str, content_type: str, sha256: str):
self.attachments.append( self.attachments.append(
_EmailAttachmentDoc( _EmailAttachmentDoc(
filename=filename, content_type=content_type, sha256=sha256 filename=filename, content_type=content_type, sha256=sha256
) )
) ) # pyright: ignore[reportCallIssue]
class _ForensicReportDoc(Document): class _ForensicReportDoc(Document):
@@ -224,7 +222,7 @@ class _SMTPTLSPolicyDoc(InnerDoc):
additional_information=additional_information_uri, additional_information=additional_information_uri,
failure_reason_code=failure_reason_code, failure_reason_code=failure_reason_code,
) )
self.failure_details.append(_details) self.failure_details.append(_details) # pyright: ignore[reportCallIssue]
class _SMTPTLSReportDoc(Document): class _SMTPTLSReportDoc(Document):
@@ -258,7 +256,7 @@ class _SMTPTLSReportDoc(Document):
policy_string=policy_string, policy_string=policy_string,
mx_host_patterns=mx_host_patterns, mx_host_patterns=mx_host_patterns,
failure_details=failure_details, failure_details=failure_details,
) ) # pyright: ignore[reportCallIssue]
class AlreadySaved(ValueError): class AlreadySaved(ValueError):
@@ -268,12 +266,12 @@ class AlreadySaved(ValueError):
def set_hosts( def set_hosts(
hosts: Union[str, list[str]], hosts: Union[str, list[str]],
*, *,
use_ssl: Optional[bool] = False, use_ssl: bool = False,
ssl_cert_path: Optional[str] = None, ssl_cert_path: Optional[str] = None,
username: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, api_key: Optional[str] = None,
timeout: Optional[float] = 60.0, timeout: float = 60.0,
): ):
""" """
Sets the Elasticsearch hosts to use Sets the Elasticsearch hosts to use
@@ -369,7 +367,7 @@ def migrate_indexes(
} }
Index(new_index_name).create() Index(new_index_name).create()
Index(new_index_name).put_mapping(doc_type=doc, body=body) Index(new_index_name).put_mapping(doc_type=doc, body=body)
reindex(connections.get_connection(), aggregate_index_name, new_index_name) reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType]
Index(aggregate_index_name).delete() Index(aggregate_index_name).delete()
for forensic_index in forensic_indexes: for forensic_index in forensic_indexes:
@@ -377,18 +375,18 @@ def migrate_indexes(
def save_aggregate_report_to_elasticsearch( def save_aggregate_report_to_elasticsearch(
aggregate_report: OrderedDict[str, Any], aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: Optional[bool] = False,
number_of_shards: Optional[int] = 1, number_of_shards: int = 1,
number_of_replicas: Optional[int] = 0, number_of_replicas: int = 0,
): ):
""" """
Saves a parsed DMARC aggregate report to Elasticsearch Saves a parsed DMARC aggregate report to Elasticsearch
Args: Args:
aggregate_report (OrderedDict): A parsed forensic report aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -412,11 +410,11 @@ def save_aggregate_report_to_elasticsearch(
else: else:
index_date = begin_date.strftime("%Y-%m-%d") index_date = begin_date.strftime("%Y-%m-%d")
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # type: ignore
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) begin_date_query = Q(dict(range=dict(date_begin=dict(gte=begin_date)))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(match=dict(date_end=end_date))) end_date_query = Q(dict(range=dict(date_end=dict(lte=end_date)))) # pyright: ignore[reportArgumentType]
if index_suffix is not None: if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix) search_index = "dmarc_aggregate_{0}*".format(index_suffix)
@@ -428,13 +426,12 @@ def save_aggregate_report_to_elasticsearch(
query = org_name_query & report_id_query & domain_query query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query query = query & begin_date_query & end_date_query
search.query = query search.query = query
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
try: try:
existing = search.execute() existing = search.execute()
except Exception as error_: except Exception as error_:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise ElasticsearchError( raise ElasticsearchError(
"Elasticsearch's search for existing report \ "Elasticsearch's search for existing report \
error: {}".format(error_.__str__()) error: {}".format(error_.__str__())
@@ -530,7 +527,7 @@ def save_aggregate_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
) )
create_indexes([index], index_settings) create_indexes([index], index_settings)
agg_doc.meta.index = index agg_doc.meta.index = index # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue]
try: try:
agg_doc.save() agg_doc.save()
@@ -539,7 +536,7 @@ def save_aggregate_report_to_elasticsearch(
def save_forensic_report_to_elasticsearch( def save_forensic_report_to_elasticsearch(
forensic_report: OrderedDict[str, Any], forensic_report: dict[str, Any],
index_suffix: Optional[Any] = None, index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: Optional[bool] = False,
@@ -550,7 +547,7 @@ def save_forensic_report_to_elasticsearch(
Saves a parsed DMARC forensic report to Elasticsearch Saves a parsed DMARC forensic report to Elasticsearch
Args: Args:
forensic_report (OrderedDict): A parsed forensic report forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily monthly_indexes (bool): Use monthly indexes instead of daily
@@ -570,7 +567,7 @@ def save_forensic_report_to_elasticsearch(
sample_date = forensic_report["parsed_sample"]["date"] sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date) sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"] original_headers = forensic_report["parsed_sample"]["headers"]
headers = OrderedDict() headers: dict[str, Any] = {}
for original_header in original_headers: for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header] headers[original_header.lower()] = original_headers[original_header]
@@ -584,7 +581,7 @@ def save_forensic_report_to_elasticsearch(
if index_prefix is not None: if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index) search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index) search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) # pyright: ignore[reportArgumentType]
from_ = None from_ = None
to_ = None to_ = None
@@ -599,7 +596,7 @@ def save_forensic_report_to_elasticsearch(
from_ = dict() from_ = dict()
from_["sample.headers.from"] = headers["from"] from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_)) from_query = Q(dict(match_phrase=from_)) # pyright: ignore[reportArgumentType]
q = q & from_query q = q & from_query
if "to" in headers: if "to" in headers:
# We convert the TO header from a string list to a flat string. # We convert the TO header from a string list to a flat string.
@@ -611,12 +608,12 @@ def save_forensic_report_to_elasticsearch(
to_ = dict() to_ = dict()
to_["sample.headers.to"] = headers["to"] to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_)) to_query = Q(dict(match_phrase=to_)) # pyright: ignore[reportArgumentType]
q = q & to_query q = q & to_query
if "subject" in headers: if "subject" in headers:
subject = headers["subject"] subject = headers["subject"]
subject_query = {"match_phrase": {"sample.headers.subject": subject}} subject_query = {"match_phrase": {"sample.headers.subject": subject}}
q = q & Q(subject_query) q = q & Q(subject_query) # pyright: ignore[reportArgumentType]
search.query = q search.query = q
existing = search.execute() existing = search.execute()
@@ -694,7 +691,7 @@ def save_forensic_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
) )
create_indexes([index], index_settings) create_indexes([index], index_settings)
forensic_doc.meta.index = index forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
try: try:
forensic_doc.save() forensic_doc.save()
except Exception as e: except Exception as e:
@@ -706,18 +703,18 @@ def save_forensic_report_to_elasticsearch(
def save_smtp_tls_report_to_elasticsearch( def save_smtp_tls_report_to_elasticsearch(
report: OrderedDict[str, Any], report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: bool = False,
number_of_shards: Optional[int] = 1, number_of_shards: int = 1,
number_of_replicas: Optional[int] = 0, number_of_replicas: int = 0,
): ):
""" """
Saves a parsed SMTP TLS report to Elasticsearch Saves a parsed SMTP TLS report to Elasticsearch
Args: Args:
report (OrderedDict): A parsed SMTP TLS report report (dict): A parsed SMTP TLS report
index_suffix (str): The suffix of the name of the index to save to index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -741,10 +738,10 @@ def save_smtp_tls_report_to_elasticsearch(
report["begin_date"] = begin_date report["begin_date"] = begin_date
report["end_date"] = end_date report["end_date"] = end_date
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # pyright: ignore[reportArgumentType]
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(match=dict(date_end=end_date))) end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType]
if index_suffix is not None: if index_suffix is not None:
search_index = "smtp_tls_{0}*".format(index_suffix) search_index = "smtp_tls_{0}*".format(index_suffix)
@@ -845,10 +842,10 @@ def save_smtp_tls_report_to_elasticsearch(
additional_information_uri=additional_information_uri, additional_information_uri=additional_information_uri,
failure_reason_code=failure_reason_code, failure_reason_code=failure_reason_code,
) )
smtp_tls_doc.policies.append(policy_doc) smtp_tls_doc.policies.append(policy_doc) # pyright: ignore[reportCallIssue]
create_indexes([index], index_settings) create_indexes([index], index_settings)
smtp_tls_doc.meta.index = index smtp_tls_doc.meta.index = index # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue]
try: try:
smtp_tls_doc.save() smtp_tls_doc.save()

View File

@@ -2,21 +2,18 @@
from __future__ import annotations from __future__ import annotations
from typing import Any
import logging import logging
import logging.handlers import logging.handlers
import json
import threading import threading
from collections import OrderedDict from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
from parsedmarc import ( from parsedmarc import (
parsed_aggregate_reports_to_csv_rows, parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows, parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows,
) )
from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler
log_context_data = threading.local() log_context_data = threading.local()
@@ -53,9 +50,7 @@ class GelfClient(object):
) )
self.logger.addHandler(self.handler) self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf( def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
self, aggregate_reports: list[OrderedDict[str, Any]]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows: for row in rows:
log_context_data.parsedmarc = row log_context_data.parsedmarc = row
@@ -63,14 +58,14 @@ class GelfClient(object):
log_context_data.parsedmarc = None log_context_data.parsedmarc = None
def save_forensic_report_to_gelf( def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
self, forensic_reports: list[OrderedDict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports) rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: OrderedDict[str, Any]): def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) log_context_data.parsedmarc = row
self.logger.info("parsedmarc smtptls report")

View File

@@ -2,19 +2,16 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Optional, Union
from ssl import SSLContext
import json import json
from ssl import create_default_context from ssl import SSLContext, create_default_context
from typing import Any, Optional, Union
from kafka import KafkaProducer from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
from collections import OrderedDict
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import __version__ from parsedmarc import __version__
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
class KafkaError(RuntimeError): class KafkaError(RuntimeError):
@@ -49,7 +46,7 @@ class KafkaClient(object):
``$ConnectionString``, and the password is the ``$ConnectionString``, and the password is the
Azure Event Hub connection string. Azure Event Hub connection string.
""" """
config = dict( config: dict[str, Any] = dict(
value_serializer=lambda v: json.dumps(v).encode("utf-8"), value_serializer=lambda v: json.dumps(v).encode("utf-8"),
bootstrap_servers=kafka_hosts, bootstrap_servers=kafka_hosts,
client_id="parsedmarc-{0}".format(__version__), client_id="parsedmarc-{0}".format(__version__),
@@ -66,7 +63,7 @@ class KafkaClient(object):
raise KafkaError("No Kafka brokers available") raise KafkaError("No Kafka brokers available")
@staticmethod @staticmethod
def strip_metadata(report: OrderedDict[str, Any]): def strip_metadata(report: dict[str, Any]):
""" """
Duplicates org_name, org_email and report_id into JSON root Duplicates org_name, org_email and report_id into JSON root
and removes report_metadata key to bring it more inline and removes report_metadata key to bring it more inline
@@ -80,7 +77,7 @@ class KafkaClient(object):
return report return report
@staticmethod @staticmethod
def generate_date_range(report: OrderedDict[str, Any]): def generate_date_range(report: dict[str, Any]):
""" """
Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS
based on begin and end dates for easier parsing in Kibana. based on begin and end dates for easier parsing in Kibana.
@@ -99,7 +96,7 @@ class KafkaClient(object):
def save_aggregate_reports_to_kafka( def save_aggregate_reports_to_kafka(
self, self,
aggregate_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]], aggregate_reports: Union[dict[str, Any], list[dict[str, Any]]],
aggregate_topic: str, aggregate_topic: str,
): ):
""" """
@@ -111,9 +108,7 @@ class KafkaClient(object):
aggregate_topic (str): The name of the Kafka topic aggregate_topic (str): The name of the Kafka topic
""" """
if isinstance(aggregate_reports, dict) or isinstance( if isinstance(aggregate_reports, dict):
aggregate_reports, OrderedDict
):
aggregate_reports = [aggregate_reports] aggregate_reports = [aggregate_reports]
if len(aggregate_reports) < 1: if len(aggregate_reports) < 1:
@@ -146,7 +141,7 @@ class KafkaClient(object):
def save_forensic_reports_to_kafka( def save_forensic_reports_to_kafka(
self, self,
forensic_reports: Union[OrderedDict[str, Any], list[OrderedDict[str, Any]]], forensic_reports: Union[dict[str, Any], list[dict[str, Any]]],
forensic_topic: str, forensic_topic: str,
): ):
""" """
@@ -180,7 +175,7 @@ class KafkaClient(object):
def save_smtp_tls_reports_to_kafka( def save_smtp_tls_reports_to_kafka(
self, self,
smtp_tls_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], smtp_tls_reports: Union[list[dict[str, Any]], dict[str, Any]],
smtp_tls_topic: str, smtp_tls_topic: str,
): ):
""" """

View File

@@ -3,13 +3,13 @@
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any
from collections import OrderedDict
from parsedmarc.log import logger
from azure.core.exceptions import HttpResponseError from azure.core.exceptions import HttpResponseError
from azure.identity import ClientSecretCredential from azure.identity import ClientSecretCredential
from azure.monitor.ingestion import LogsIngestionClient from azure.monitor.ingestion import LogsIngestionClient
from parsedmarc.log import logger
class LogAnalyticsException(Exception): class LogAnalyticsException(Exception):
"""Raised when an Elasticsearch error occurs""" """Raised when an Elasticsearch error occurs"""
@@ -133,7 +133,7 @@ class LogAnalyticsClient(object):
def publish_results( def publish_results(
self, self,
results: OrderedDict[str, OrderedDict[str, Any]], results: dict[str, Any],
save_aggregate: bool, save_aggregate: bool,
save_forensic: bool, save_forensic: bool,
save_smtp_tls: bool, save_smtp_tls: bool,

View File

@@ -10,6 +10,7 @@ from typing import List
from google.auth.transport.requests import Request from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from google_auth_oauthlib.flow import InstalledAppFlow from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build from googleapiclient.discovery import build
from googleapiclient.errors import HttpError from googleapiclient.errors import HttpError
@@ -18,7 +19,29 @@ from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection from parsedmarc.mail.mailbox_connection import MailboxConnection
def _get_creds(token_file, credentials_file, scopes, oauth2_port): def _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode="installed_app",
service_account_user=None,
):
normalized_auth_mode = (auth_mode or "installed_app").strip().lower()
if normalized_auth_mode == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_file,
scopes=scopes,
)
if service_account_user:
creds = creds.with_subject(service_account_user)
return creds
if normalized_auth_mode != "installed_app":
raise ValueError(
f"Unsupported Gmail auth_mode '{auth_mode}'. "
"Expected 'installed_app' or 'service_account'."
)
creds = None creds = None
if Path(token_file).exists(): if Path(token_file).exists():
@@ -47,8 +70,17 @@ class GmailConnection(MailboxConnection):
reports_folder: str, reports_folder: str,
oauth2_port: int, oauth2_port: int,
paginate_messages: bool, paginate_messages: bool,
auth_mode: str = "installed_app",
service_account_user: str | None = None,
): ):
creds = _get_creds(token_file, credentials_file, scopes, oauth2_port) creds = _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode=auth_mode,
service_account_user=service_account_user,
)
self.service = build("gmail", "v1", credentials=creds) self.service = build("gmail", "v1", credentials=creds)
self.include_spam_trash = include_spam_trash self.include_spam_trash = include_spam_trash
self.reports_label_id = self._find_label_id_for_label(reports_folder) self.reports_label_id = self._find_label_id_for_label(reports_folder)
@@ -116,17 +148,17 @@ class GmailConnection(MailboxConnection):
else: else:
return [id for id in self._fetch_all_message_ids(reports_label_id)] return [id for id in self._fetch_all_message_ids(reports_label_id)]
def fetch_message(self, message_id): def fetch_message(self, message_id) -> str:
msg = ( msg = (
self.service.users() self.service.users()
.messages() .messages()
.get(userId="me", id=message_id, format="raw") .get(userId="me", id=message_id, format="raw")
.execute() .execute()
) )
return urlsafe_b64decode(msg["raw"]) return urlsafe_b64decode(msg["raw"]).decode(errors="replace")
def delete_message(self, message_id: str): def delete_message(self, message_id: str):
self.service.users().messages().delete(userId="me", id=message_id) self.service.users().messages().delete(userId="me", id=message_id).execute()
def move_message(self, message_id: str, folder_name: str): def move_message(self, message_id: str, folder_name: str):
label_id = self._find_label_id_for_label(folder_name) label_id = self._find_label_id_for_label(folder_name)

View File

@@ -6,29 +6,35 @@ from enum import Enum
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
from typing import List, Optional from typing import Any, List, Optional, Union
from azure.identity import ( from azure.identity import (
UsernamePasswordCredential, UsernamePasswordCredential,
DeviceCodeCredential, DeviceCodeCredential,
ClientSecretCredential, ClientSecretCredential,
CertificateCredential,
TokenCachePersistenceOptions, TokenCachePersistenceOptions,
AuthenticationRecord, AuthenticationRecord,
) )
from msgraph.core import GraphClient from msgraph.core import GraphClient
from requests.exceptions import RequestException
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection from parsedmarc.mail.mailbox_connection import MailboxConnection
GRAPH_REQUEST_RETRY_ATTEMPTS = 3
GRAPH_REQUEST_RETRY_DELAY_SECONDS = 5
class AuthMethod(Enum): class AuthMethod(Enum):
DeviceCode = 1 DeviceCode = 1
UsernamePassword = 2 UsernamePassword = 2
ClientSecret = 3 ClientSecret = 3
Certificate = 4
def _get_cache_args(token_path: Path, allow_unencrypted_storage): def _get_cache_args(token_path: Path, allow_unencrypted_storage):
cache_args = { cache_args: dict[str, Any] = {
"cache_persistence_options": TokenCachePersistenceOptions( "cache_persistence_options": TokenCachePersistenceOptions(
name="parsedmarc", allow_unencrypted_storage=allow_unencrypted_storage name="parsedmarc", allow_unencrypted_storage=allow_unencrypted_storage
) )
@@ -83,30 +89,55 @@ def _generate_credential(auth_method: str, token_path: Path, **kwargs):
tenant_id=kwargs["tenant_id"], tenant_id=kwargs["tenant_id"],
client_secret=kwargs["client_secret"], client_secret=kwargs["client_secret"],
) )
elif auth_method == AuthMethod.Certificate.name:
cert_path = kwargs.get("certificate_path")
if not cert_path:
raise ValueError(
"certificate_path is required when auth_method is 'Certificate'"
)
credential = CertificateCredential(
client_id=kwargs["client_id"],
tenant_id=kwargs["tenant_id"],
certificate_path=cert_path,
password=kwargs.get("certificate_password"),
)
else: else:
raise RuntimeError(f"Auth method {auth_method} not found") raise RuntimeError(f"Auth method {auth_method} not found")
return credential return credential
class MSGraphConnection(MailboxConnection): class MSGraphConnection(MailboxConnection):
_WELL_KNOWN_FOLDERS = {
"inbox": "inbox",
"archive": "archive",
"drafts": "drafts",
"sentitems": "sentitems",
"deleteditems": "deleteditems",
"junkemail": "junkemail",
}
def __init__( def __init__(
self, self,
auth_method: str, auth_method: str,
mailbox: str, mailbox: str,
graph_url: str, graph_url: str,
client_id: str, client_id: str,
client_secret: str, client_secret: Optional[str],
username: str, username: Optional[str],
password: str, password: Optional[str],
tenant_id: str, tenant_id: str,
token_file: str, token_file: str,
allow_unencrypted_storage: bool, allow_unencrypted_storage: bool,
certificate_path: Optional[str] = None,
certificate_password: Optional[Union[str, bytes]] = None,
): ):
token_path = Path(token_file) token_path = Path(token_file)
credential = _generate_credential( credential = _generate_credential(
auth_method, auth_method,
client_id=client_id, client_id=client_id,
client_secret=client_secret, client_secret=client_secret,
certificate_path=certificate_path,
certificate_password=certificate_password,
username=username, username=username,
password=password, password=password,
tenant_id=tenant_id, tenant_id=tenant_id,
@@ -117,10 +148,10 @@ class MSGraphConnection(MailboxConnection):
"credential": credential, "credential": credential,
"cloud": graph_url, "cloud": graph_url,
} }
if not isinstance(credential, ClientSecretCredential): if not isinstance(credential, (ClientSecretCredential, CertificateCredential)):
scopes = ["Mail.ReadWrite"] scopes = ["Mail.ReadWrite"]
# Detect if mailbox is shared # Detect if mailbox is shared
if mailbox and username != mailbox: if mailbox and username and username != mailbox:
scopes = ["Mail.ReadWrite.Shared"] scopes = ["Mail.ReadWrite.Shared"]
auth_record = credential.authenticate(scopes=scopes) auth_record = credential.authenticate(scopes=scopes)
_cache_auth_record(auth_record, token_path) _cache_auth_record(auth_record, token_path)
@@ -129,6 +160,23 @@ class MSGraphConnection(MailboxConnection):
self._client = GraphClient(**client_params) self._client = GraphClient(**client_params)
self.mailbox_name = mailbox self.mailbox_name = mailbox
def _request_with_retries(self, method_name: str, *args, **kwargs):
for attempt in range(1, GRAPH_REQUEST_RETRY_ATTEMPTS + 1):
try:
return getattr(self._client, method_name)(*args, **kwargs)
except RequestException as error:
if attempt == GRAPH_REQUEST_RETRY_ATTEMPTS:
raise
logger.warning(
"Transient MS Graph %s error on attempt %s/%s: %s",
method_name.upper(),
attempt,
GRAPH_REQUEST_RETRY_ATTEMPTS,
error,
)
sleep(GRAPH_REQUEST_RETRY_DELAY_SECONDS)
raise RuntimeError("no retry attempts configured")
def create_folder(self, folder_name: str): def create_folder(self, folder_name: str):
sub_url = "" sub_url = ""
path_parts = folder_name.split("/") path_parts = folder_name.split("/")
@@ -143,7 +191,7 @@ class MSGraphConnection(MailboxConnection):
request_body = {"displayName": folder_name} request_body = {"displayName": folder_name}
request_url = f"/users/{self.mailbox_name}/mailFolders{sub_url}" request_url = f"/users/{self.mailbox_name}/mailFolders{sub_url}"
resp = self._client.post(request_url, json=request_body) resp = self._request_with_retries("post", request_url, json=request_body)
if resp.status_code == 409: if resp.status_code == 409:
logger.debug(f"Folder {folder_name} already exists, skipping creation") logger.debug(f"Folder {folder_name} already exists, skipping creation")
elif resp.status_code == 201: elif resp.status_code == 201:
@@ -151,9 +199,9 @@ class MSGraphConnection(MailboxConnection):
else: else:
logger.warning(f"Unknown response {resp.status_code} {resp.json()}") logger.warning(f"Unknown response {resp.status_code} {resp.json()}")
def fetch_messages(self, folder_name: str, **kwargs) -> List[str]: def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
"""Returns a list of message UIDs in the specified folder""" """Returns a list of message UIDs in the specified folder"""
folder_id = self._find_folder_id_from_folder_path(folder_name) folder_id = self._find_folder_id_from_folder_path(reports_folder)
url = f"/users/{self.mailbox_name}/mailFolders/{folder_id}/messages" url = f"/users/{self.mailbox_name}/mailFolders/{folder_id}/messages"
since = kwargs.get("since") since = kwargs.get("since")
if not since: if not since:
@@ -166,14 +214,14 @@ class MSGraphConnection(MailboxConnection):
def _get_all_messages(self, url, batch_size, since): def _get_all_messages(self, url, batch_size, since):
messages: list messages: list
params = {"$select": "id"} params: dict[str, Union[str, int]] = {"$select": "id"}
if since: if since:
params["$filter"] = f"receivedDateTime ge {since}" params["$filter"] = f"receivedDateTime ge {since}"
if batch_size and batch_size > 0: if batch_size and batch_size > 0:
params["$top"] = batch_size params["$top"] = batch_size
else: else:
params["$top"] = 100 params["$top"] = 100
result = self._client.get(url, params=params) result = self._request_with_retries("get", url, params=params)
if result.status_code != 200: if result.status_code != 200:
raise RuntimeError(f"Failed to fetch messages {result.text}") raise RuntimeError(f"Failed to fetch messages {result.text}")
messages = result.json()["value"] messages = result.json()["value"]
@@ -181,7 +229,7 @@ class MSGraphConnection(MailboxConnection):
while "@odata.nextLink" in result.json() and ( while "@odata.nextLink" in result.json() and (
since is not None or (batch_size == 0 or batch_size - len(messages) > 0) since is not None or (batch_size == 0 or batch_size - len(messages) > 0)
): ):
result = self._client.get(result.json()["@odata.nextLink"]) result = self._request_with_retries("get", result.json()["@odata.nextLink"])
if result.status_code != 200: if result.status_code != 200:
raise RuntimeError(f"Failed to fetch messages {result.text}") raise RuntimeError(f"Failed to fetch messages {result.text}")
messages.extend(result.json()["value"]) messages.extend(result.json()["value"])
@@ -190,7 +238,7 @@ class MSGraphConnection(MailboxConnection):
def mark_message_read(self, message_id: str): def mark_message_read(self, message_id: str):
"""Marks a message as read""" """Marks a message as read"""
url = f"/users/{self.mailbox_name}/messages/{message_id}" url = f"/users/{self.mailbox_name}/messages/{message_id}"
resp = self._client.patch(url, json={"isRead": "true"}) resp = self._request_with_retries("patch", url, json={"isRead": "true"})
if resp.status_code != 200: if resp.status_code != 200:
raise RuntimeWarning( raise RuntimeWarning(
f"Failed to mark message read{resp.status_code}: {resp.json()}" f"Failed to mark message read{resp.status_code}: {resp.json()}"
@@ -198,7 +246,7 @@ class MSGraphConnection(MailboxConnection):
def fetch_message(self, message_id: str, **kwargs): def fetch_message(self, message_id: str, **kwargs):
url = f"/users/{self.mailbox_name}/messages/{message_id}/$value" url = f"/users/{self.mailbox_name}/messages/{message_id}/$value"
result = self._client.get(url) result = self._request_with_retries("get", url)
if result.status_code != 200: if result.status_code != 200:
raise RuntimeWarning( raise RuntimeWarning(
f"Failed to fetch message{result.status_code}: {result.json()}" f"Failed to fetch message{result.status_code}: {result.json()}"
@@ -210,7 +258,7 @@ class MSGraphConnection(MailboxConnection):
def delete_message(self, message_id: str): def delete_message(self, message_id: str):
url = f"/users/{self.mailbox_name}/messages/{message_id}" url = f"/users/{self.mailbox_name}/messages/{message_id}"
resp = self._client.delete(url) resp = self._request_with_retries("delete", url)
if resp.status_code != 204: if resp.status_code != 204:
raise RuntimeWarning( raise RuntimeWarning(
f"Failed to delete message {resp.status_code}: {resp.json()}" f"Failed to delete message {resp.status_code}: {resp.json()}"
@@ -220,7 +268,7 @@ class MSGraphConnection(MailboxConnection):
folder_id = self._find_folder_id_from_folder_path(folder_name) folder_id = self._find_folder_id_from_folder_path(folder_name)
request_body = {"destinationId": folder_id} request_body = {"destinationId": folder_id}
url = f"/users/{self.mailbox_name}/messages/{message_id}/move" url = f"/users/{self.mailbox_name}/messages/{message_id}/move"
resp = self._client.post(url, json=request_body) resp = self._request_with_retries("post", url, json=request_body)
if resp.status_code != 201: if resp.status_code != 201:
raise RuntimeWarning( raise RuntimeWarning(
f"Failed to move message {resp.status_code}: {resp.json()}" f"Failed to move message {resp.status_code}: {resp.json()}"
@@ -248,6 +296,19 @@ class MSGraphConnection(MailboxConnection):
else: else:
return self._find_folder_id_with_parent(folder_name, None) return self._find_folder_id_with_parent(folder_name, None)
def _get_well_known_folder_id(self, folder_name: str) -> Optional[str]:
folder_key = folder_name.lower().replace(" ", "").replace("-", "")
alias = self._WELL_KNOWN_FOLDERS.get(folder_key)
if alias is None:
return None
url = f"/users/{self.mailbox_name}/mailFolders/{alias}?$select=id,displayName"
folder_resp = self._request_with_retries("get", url)
if folder_resp.status_code != 200:
return None
payload = folder_resp.json()
return payload.get("id")
def _find_folder_id_with_parent( def _find_folder_id_with_parent(
self, folder_name: str, parent_folder_id: Optional[str] self, folder_name: str, parent_folder_id: Optional[str]
): ):
@@ -256,8 +317,12 @@ class MSGraphConnection(MailboxConnection):
sub_url = f"/{parent_folder_id}/childFolders" sub_url = f"/{parent_folder_id}/childFolders"
url = f"/users/{self.mailbox_name}/mailFolders{sub_url}" url = f"/users/{self.mailbox_name}/mailFolders{sub_url}"
filter = f"?$filter=displayName eq '{folder_name}'" filter = f"?$filter=displayName eq '{folder_name}'"
folders_resp = self._client.get(url + filter) folders_resp = self._request_with_retries("get", url + filter)
if folders_resp.status_code != 200: if folders_resp.status_code != 200:
if parent_folder_id is None:
well_known_folder_id = self._get_well_known_folder_id(folder_name)
if well_known_folder_id:
return well_known_folder_id
raise RuntimeWarning(f"Failed to list folders.{folders_resp.json()}") raise RuntimeWarning(f"Failed to list folders.{folders_resp.json()}")
folders: list = folders_resp.json()["value"] folders: list = folders_resp.json()["value"]
matched_folders = [ matched_folders = [

View File

@@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional from typing import cast
from time import sleep from time import sleep
@@ -17,15 +17,14 @@ from parsedmarc.mail.mailbox_connection import MailboxConnection
class IMAPConnection(MailboxConnection): class IMAPConnection(MailboxConnection):
def __init__( def __init__(
self, self,
host: Optional[str] = None, host: str,
*, user: str,
user: Optional[str] = None, password: str,
password: Optional[str] = None, port: int = 993,
port: Optional[str] = None, ssl: bool = True,
ssl: Optional[bool] = True, verify: bool = True,
verify: Optional[bool] = True, timeout: int = 30,
timeout: Optional[int] = 30, max_retries: int = 4,
max_retries: Optional[int] = 4,
): ):
self._username = user self._username = user
self._password = password self._password = password
@@ -47,19 +46,37 @@ class IMAPConnection(MailboxConnection):
def fetch_messages(self, reports_folder: str, **kwargs): def fetch_messages(self, reports_folder: str, **kwargs):
self._client.select_folder(reports_folder) self._client.select_folder(reports_folder)
since = kwargs.get("since") since = kwargs.get("since")
if since: if since is not None:
return self._client.search(["SINCE", since]) return self._client.search(f"SINCE {since}")
else: else:
return self._client.search() return self._client.search()
def fetch_message(self, message_id: int): def fetch_message(self, message_id: int):
return self._client.fetch_message(message_id, parse=False) return cast(str, self._client.fetch_message(message_id, parse=False))
def delete_message(self, message_id: int): def delete_message(self, message_id: int):
self._client.delete_messages([message_id]) try:
self._client.delete_messages([message_id])
except IMAPClientError as error:
logger.warning(
"IMAP delete fallback for message %s due to server error: %s",
message_id,
error,
)
self._client.add_flags([message_id], [r"\Deleted"], silent=True)
self._client.expunge()
def move_message(self, message_id: int, folder_name: str): def move_message(self, message_id: int, folder_name: str):
self._client.move_messages([message_id], folder_name) try:
self._client.move_messages([message_id], folder_name)
except IMAPClientError as error:
logger.warning(
"IMAP move fallback for message %s due to server error: %s",
message_id,
error,
)
self._client.copy([message_id], folder_name)
self.delete_message(message_id)
def keepalive(self): def keepalive(self):
self._client.noop() self._client.noop()

View File

@@ -13,16 +13,16 @@ class MailboxConnection(ABC):
def create_folder(self, folder_name: str): def create_folder(self, folder_name: str):
raise NotImplementedError raise NotImplementedError
def fetch_messages(self, reports_folder: str, **kwargs) -> list[str]: def fetch_messages(self, reports_folder: str, **kwargs):
raise NotImplementedError raise NotImplementedError
def fetch_message(self, message_id) -> str: def fetch_message(self, message_id) -> str:
raise NotImplementedError raise NotImplementedError
def delete_message(self, message_id: str): def delete_message(self, message_id):
raise NotImplementedError raise NotImplementedError
def move_message(self, message_id: str, folder_name: str): def move_message(self, message_id, folder_name: str):
raise NotImplementedError raise NotImplementedError
def keepalive(self): def keepalive(self):

View File

@@ -2,21 +2,20 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional import mailbox
import os
from time import sleep from time import sleep
from typing import Dict
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection from parsedmarc.mail.mailbox_connection import MailboxConnection
import mailbox
import os
class MaildirConnection(MailboxConnection): class MaildirConnection(MailboxConnection):
def __init__( def __init__(
self, self,
maildir_path: Optional[bool] = None, maildir_path: str,
maildir_create: Optional[bool] = False, maildir_create: bool = False,
): ):
self._maildir_path = maildir_path self._maildir_path = maildir_path
self._maildir_create = maildir_create self._maildir_create = maildir_create
@@ -33,27 +32,31 @@ class MaildirConnection(MailboxConnection):
) )
raise Exception(ex) raise Exception(ex)
self._client = mailbox.Maildir(maildir_path, create=maildir_create) self._client = mailbox.Maildir(maildir_path, create=maildir_create)
self._subfolder_client = {} self._subfolder_client: Dict[str, mailbox.Maildir] = {}
def create_folder(self, folder_name: str): def create_folder(self, folder_name: str):
self._subfolder_client[folder_name] = self._client.add_folder(folder_name) self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
self._client.add_folder(folder_name)
def fetch_messages(self, reports_folder: str, **kwargs): def fetch_messages(self, reports_folder: str, **kwargs):
return self._client.keys() return self._client.keys()
def fetch_message(self, message_id: str): def fetch_message(self, message_id: str) -> str:
return self._client.get(message_id).as_string() msg = self._client.get(message_id)
if msg is not None:
msg = msg.as_string()
if msg is not None:
return msg
return ""
def delete_message(self, message_id: str): def delete_message(self, message_id: str):
self._client.remove(message_id) self._client.remove(message_id)
def move_message(self, message_id: str, folder_name: str): def move_message(self, message_id: str, folder_name: str):
message_data = self._client.get(message_id) message_data = self._client.get(message_id)
if folder_name not in self._subfolder_client.keys(): if message_data is None:
self._subfolder_client = mailbox.Maildir( return
os.join(self.maildir_path, folder_name), create=self.maildir_create if folder_name not in self._subfolder_client:
) self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
self._subfolder_client[folder_name].add(message_data) self._subfolder_client[folder_name].add(message_data)
self._client.remove(message_id) self._client.remove(message_id)

View File

@@ -2,30 +2,31 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional, Union, Any from typing import Any, Optional, Union
from collections import OrderedDict
import boto3
from opensearchpy import ( from opensearchpy import (
Q, AWSV4SignerAuth,
connections, Boolean,
Object, Date,
Document, Document,
Index, Index,
Nested,
InnerDoc, InnerDoc,
Integer, Integer,
Text,
Boolean,
Ip, Ip,
Date, Nested,
Object,
Q,
RequestsHttpConnection,
Search, Search,
Text,
connections,
) )
from opensearchpy.helpers import reindex from opensearchpy.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class OpenSearchError(Exception): class OpenSearchError(Exception):
@@ -104,7 +105,7 @@ class _AggregateReportDoc(Document):
def add_spf_result(self, domain: str, scope: str, result: _SPFResult): def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
self.passed_dmarc = False self.passed_dmarc = False
self.passed_dmarc = self.spf_aligned or self.dkim_aligned self.passed_dmarc = self.spf_aligned or self.dkim_aligned
@@ -274,6 +275,9 @@ def set_hosts(
password: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, api_key: Optional[str] = None,
timeout: Optional[float] = 60.0, timeout: Optional[float] = 60.0,
auth_type: str = "basic",
aws_region: Optional[str] = None,
aws_service: str = "es",
): ):
""" """
Sets the OpenSearch hosts to use Sets the OpenSearch hosts to use
@@ -286,6 +290,9 @@ def set_hosts(
password (str): The password to use for authentication password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication api_key (str): The Base64 encoded API key to use for authentication
timeout (float): Timeout in seconds timeout (float): Timeout in seconds
auth_type (str): OpenSearch auth mode: basic (default) or awssigv4
aws_region (str): AWS region for SigV4 auth (required for awssigv4)
aws_service (str): AWS service for SigV4 signing (default: es)
""" """
if not isinstance(hosts, list): if not isinstance(hosts, list):
hosts = [hosts] hosts = [hosts]
@@ -297,10 +304,30 @@ def set_hosts(
conn_params["ca_certs"] = ssl_cert_path conn_params["ca_certs"] = ssl_cert_path
else: else:
conn_params["verify_certs"] = False conn_params["verify_certs"] = False
if username and password: normalized_auth_type = (auth_type or "basic").strip().lower()
conn_params["http_auth"] = username + ":" + password if normalized_auth_type == "awssigv4":
if api_key: if not aws_region:
conn_params["api_key"] = api_key raise OpenSearchError(
"OpenSearch AWS SigV4 auth requires 'aws_region' to be set"
)
session = boto3.Session()
credentials = session.get_credentials()
if credentials is None:
raise OpenSearchError(
"Unable to load AWS credentials for OpenSearch SigV4 authentication"
)
conn_params["http_auth"] = AWSV4SignerAuth(credentials, aws_region, aws_service)
conn_params["connection_class"] = RequestsHttpConnection
elif normalized_auth_type == "basic":
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
conn_params["api_key"] = api_key
else:
raise OpenSearchError(
f"Unsupported OpenSearch auth_type '{auth_type}'. "
"Expected 'basic' or 'awssigv4'."
)
connections.create_connection(**conn_params) connections.create_connection(**conn_params)
@@ -377,18 +404,18 @@ def migrate_indexes(
def save_aggregate_report_to_opensearch( def save_aggregate_report_to_opensearch(
aggregate_report: OrderedDict[str, Any], aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: bool = False,
number_of_shards: Optional[int] = 1, number_of_shards: int = 1,
number_of_replicas: Optional[int] = 0, number_of_replicas: int = 0,
): ):
""" """
Saves a parsed DMARC aggregate report to OpenSearch Saves a parsed DMARC aggregate report to OpenSearch
Args: Args:
aggregate_report (OrderedDict): A parsed forensic report aggregate_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -415,8 +442,8 @@ def save_aggregate_report_to_opensearch(
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) begin_date_query = Q(dict(range=dict(date_begin=dict(gte=begin_date))))
end_date_query = Q(dict(match=dict(date_end=end_date))) end_date_query = Q(dict(range=dict(date_end=dict(lte=end_date))))
if index_suffix is not None: if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix) search_index = "dmarc_aggregate_{0}*".format(index_suffix)
@@ -428,13 +455,12 @@ def save_aggregate_report_to_opensearch(
query = org_name_query & report_id_query & domain_query query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query query = query & begin_date_query & end_date_query
search.query = query search.query = query
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
try: try:
existing = search.execute() existing = search.execute()
except Exception as error_: except Exception as error_:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise OpenSearchError( raise OpenSearchError(
"OpenSearch's search for existing report \ "OpenSearch's search for existing report \
error: {}".format(error_.__str__()) error: {}".format(error_.__str__())
@@ -539,10 +565,10 @@ def save_aggregate_report_to_opensearch(
def save_forensic_report_to_opensearch( def save_forensic_report_to_opensearch(
forensic_report: OrderedDict[str, Any], forensic_report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: bool = False,
number_of_shards: int = 1, number_of_shards: int = 1,
number_of_replicas: int = 0, number_of_replicas: int = 0,
): ):
@@ -550,7 +576,7 @@ def save_forensic_report_to_opensearch(
Saves a parsed DMARC forensic report to OpenSearch Saves a parsed DMARC forensic report to OpenSearch
Args: Args:
forensic_report (OrderedDict): A parsed forensic report forensic_report (dict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily monthly_indexes (bool): Use monthly indexes instead of daily
@@ -570,7 +596,7 @@ def save_forensic_report_to_opensearch(
sample_date = forensic_report["parsed_sample"]["date"] sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date) sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"] original_headers = forensic_report["parsed_sample"]["headers"]
headers = OrderedDict() headers: dict[str, Any] = {}
for original_header in original_headers: for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header] headers[original_header.lower()] = original_headers[original_header]
@@ -706,18 +732,18 @@ def save_forensic_report_to_opensearch(
def save_smtp_tls_report_to_opensearch( def save_smtp_tls_report_to_opensearch(
report: OrderedDict[str, Any], report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: bool = False,
number_of_shards: Optional[int] = 1, number_of_shards: int = 1,
number_of_replicas: Optional[int] = 0, number_of_replicas: int = 0,
): ):
""" """
Saves a parsed SMTP TLS report to OpenSearch Saves a parsed SMTP TLS report to OpenSearch
Args: Args:
report (OrderedDict): A parsed SMTP TLS report report (dict): A parsed SMTP TLS report
index_suffix (str): The suffix of the name of the index to save to index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes monthly_indexes (bool): Use monthly indexes instead of daily indexes

View File

@@ -2,13 +2,11 @@
from __future__ import annotations from __future__ import annotations
import json
from typing import Any from typing import Any
import json
import boto3 import boto3
from collections import OrderedDict
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime from parsedmarc.utils import human_timestamp_to_datetime
@@ -55,16 +53,16 @@ class S3Client(object):
) )
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore
def save_aggregate_report_to_s3(self, report: OrderedDict[str, Any]): def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate") self.save_report_to_s3(report, "aggregate")
def save_forensic_report_to_s3(self, report: OrderedDict[str, Any]): def save_forensic_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "forensic") self.save_report_to_s3(report, "forensic")
def save_smtp_tls_report_to_s3(self, report: OrderedDict[str, Any]): def save_smtp_tls_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "smtp_tls") self.save_report_to_s3(report, "smtp_tls")
def save_report_to_s3(self, report: OrderedDict[str, Any], report_type: str): def save_report_to_s3(self, report: dict[str, Any], report_type: str):
if report_type == "smtp_tls": if report_type == "smtp_tls":
report_date = report["begin_date"] report_date = report["begin_date"]
report_id = report["report_id"] report_id = report["report_id"]

View File

@@ -2,16 +2,13 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Union
from collections import OrderedDict
from urllib.parse import urlparse
import socket
import json import json
import socket
from typing import Any, Union
from urllib.parse import urlparse
import urllib3
import requests import requests
import urllib3
from parsedmarc.constants import USER_AGENT from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger from parsedmarc.log import logger
@@ -73,7 +70,7 @@ class HECClient(object):
def save_aggregate_reports_to_splunk( def save_aggregate_reports_to_splunk(
self, self,
aggregate_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], aggregate_reports: Union[list[dict[str, Any]], dict[str, Any]],
): ):
""" """
Saves aggregate DMARC reports to Splunk Saves aggregate DMARC reports to Splunk
@@ -139,7 +136,7 @@ class HECClient(object):
def save_forensic_reports_to_splunk( def save_forensic_reports_to_splunk(
self, self,
forensic_reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]], forensic_reports: Union[list[dict[str, Any]], dict[str, Any]],
): ):
""" """
Saves forensic DMARC reports to Splunk Saves forensic DMARC reports to Splunk
@@ -175,7 +172,7 @@ class HECClient(object):
raise SplunkError(response["text"]) raise SplunkError(response["text"])
def save_smtp_tls_reports_to_splunk( def save_smtp_tls_reports_to_splunk(
self, reports: Union[list[OrderedDict[str, Any]], OrderedDict[str, Any]] self, reports: Union[list[dict[str, Any]], dict[str, Any]]
): ):
""" """
Saves aggregate DMARC reports to Splunk Saves aggregate DMARC reports to Splunk

View File

@@ -3,14 +3,13 @@
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
import logging.handlers import logging.handlers
import socket
from typing import Any import ssl
import time
from collections import OrderedDict from typing import Any, Optional
import json
from parsedmarc import ( from parsedmarc import (
parsed_aggregate_reports_to_csv_rows, parsed_aggregate_reports_to_csv_rows,
@@ -22,37 +21,161 @@ from parsedmarc import (
class SyslogClient(object): class SyslogClient(object):
"""A client for Syslog""" """A client for Syslog"""
def __init__(self, server_name: str, server_port: int): def __init__(
self,
server_name: str,
server_port: int,
protocol: str = "udp",
cafile_path: Optional[str] = None,
certfile_path: Optional[str] = None,
keyfile_path: Optional[str] = None,
timeout: float = 5.0,
retry_attempts: int = 3,
retry_delay: int = 5,
):
""" """
Initializes the SyslogClient Initializes the SyslogClient
Args: Args:
server_name (str): The Syslog server server_name (str): The Syslog server
server_port (int): The Syslog UDP port server_port (int): The Syslog port
protocol (str): The protocol to use: "udp", "tcp", or "tls" (Default: "udp")
cafile_path (str): Path to CA certificate file for TLS server verification (Optional)
certfile_path (str): Path to client certificate file for TLS authentication (Optional)
keyfile_path (str): Path to client private key file for TLS authentication (Optional)
timeout (float): Connection timeout in seconds for TCP/TLS (Default: 5.0)
retry_attempts (int): Number of retry attempts for failed connections (Default: 3)
retry_delay (int): Delay in seconds between retry attempts (Default: 5)
""" """
self.server_name = server_name self.server_name = server_name
self.server_port = server_port self.server_port = server_port
self.protocol = protocol.lower()
self.timeout = timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.logger = logging.getLogger("parsedmarc_syslog") self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO) self.logger.setLevel(logging.INFO)
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
# Create the appropriate syslog handler based on protocol
log_handler = self._create_syslog_handler(
server_name,
server_port,
self.protocol,
cafile_path,
certfile_path,
keyfile_path,
timeout,
retry_attempts,
retry_delay,
)
self.logger.addHandler(log_handler) self.logger.addHandler(log_handler)
def save_aggregate_report_to_syslog( def _create_syslog_handler(
self, aggregate_reports: list[OrderedDict[str, Any]] self,
): server_name: str,
server_port: int,
protocol: str,
cafile_path: Optional[str],
certfile_path: Optional[str],
keyfile_path: Optional[str],
timeout: float,
retry_attempts: int,
retry_delay: int,
) -> logging.handlers.SysLogHandler:
"""
Creates a SysLogHandler with the specified protocol and TLS settings
"""
if protocol == "udp":
# UDP protocol (default, backward compatible)
return logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_DGRAM,
)
elif protocol in ["tcp", "tls"]:
# TCP or TLS protocol with retry logic
for attempt in range(1, retry_attempts + 1):
try:
if protocol == "tcp":
# TCP without TLS
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Set timeout on the socket
if hasattr(handler, "socket") and handler.socket:
handler.socket.settimeout(timeout)
return handler
else:
# TLS protocol
# Create SSL context with secure defaults
ssl_context = ssl.create_default_context()
# Explicitly set minimum TLS version to 1.2 for security
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
# Configure server certificate verification
if cafile_path:
ssl_context.load_verify_locations(cafile=cafile_path)
# Configure client certificate authentication
if certfile_path and keyfile_path:
ssl_context.load_cert_chain(
certfile=certfile_path,
keyfile=keyfile_path,
)
elif certfile_path or keyfile_path:
# Warn if only one of the two required parameters is provided
self.logger.warning(
"Both certfile_path and keyfile_path are required for "
"client certificate authentication. Client authentication "
"will not be used."
)
# Create TCP handler first
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Wrap socket with TLS
if hasattr(handler, "socket") and handler.socket:
handler.socket = ssl_context.wrap_socket(
handler.socket,
server_hostname=server_name,
)
handler.socket.settimeout(timeout)
return handler
except Exception as e:
if attempt < retry_attempts:
self.logger.warning(
f"Syslog connection attempt {attempt}/{retry_attempts} failed: {e}. "
f"Retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
self.logger.error(
f"Syslog connection failed after {retry_attempts} attempts: {e}"
)
raise
else:
raise ValueError(
f"Invalid protocol '{protocol}'. Must be 'udp', 'tcp', or 'tls'."
)
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog( def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]):
self, forensic_reports: list[OrderedDict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports) rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_syslog( def save_smtp_tls_report_to_syslog(self, smtp_tls_reports: list[dict[str, Any]]):
self, smtp_tls_reports: list[OrderedDict[str, Any]]
):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))

220
parsedmarc/types.py Normal file
View File

@@ -0,0 +1,220 @@
from __future__ import annotations
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# NOTE: This module is intentionally Python 3.10 compatible.
# - No PEP 604 unions (A | B)
# - No typing.NotRequired / Required (3.11+) to avoid an extra dependency.
# For optional keys, use total=False TypedDicts.
ReportType = Literal["aggregate", "forensic", "smtp_tls"]
class AggregateReportMetadata(TypedDict):
org_name: str
org_email: str
org_extra_contact_info: Optional[str]
report_id: str
begin_date: str
end_date: str
timespan_requires_normalization: bool
original_timespan_seconds: int
errors: List[str]
class AggregatePolicyPublished(TypedDict):
domain: str
adkim: str
aspf: str
p: str
sp: str
pct: str
fo: str
class IPSourceInfo(TypedDict):
ip_address: str
country: Optional[str]
reverse_dns: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
class AggregateAlignment(TypedDict):
spf: bool
dkim: bool
dmarc: bool
class AggregateIdentifiers(TypedDict):
header_from: str
envelope_from: Optional[str]
envelope_to: Optional[str]
class AggregatePolicyOverrideReason(TypedDict):
type: Optional[str]
comment: Optional[str]
class AggregateAuthResultDKIM(TypedDict):
domain: str
result: str
selector: str
class AggregateAuthResultSPF(TypedDict):
domain: str
result: str
scope: str
class AggregateAuthResults(TypedDict):
dkim: List[AggregateAuthResultDKIM]
spf: List[AggregateAuthResultSPF]
class AggregatePolicyEvaluated(TypedDict):
disposition: str
dkim: str
spf: str
policy_override_reasons: List[AggregatePolicyOverrideReason]
class AggregateRecord(TypedDict):
interval_begin: str
interval_end: str
source: IPSourceInfo
count: int
alignment: AggregateAlignment
policy_evaluated: AggregatePolicyEvaluated
disposition: str
identifiers: AggregateIdentifiers
auth_results: AggregateAuthResults
class AggregateReport(TypedDict):
xml_schema: str
report_metadata: AggregateReportMetadata
policy_published: AggregatePolicyPublished
records: List[AggregateRecord]
class EmailAddress(TypedDict):
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
class EmailAttachment(TypedDict, total=False):
filename: Optional[str]
mail_content_type: Optional[str]
sha256: Optional[str]
ParsedEmail = TypedDict(
"ParsedEmail",
{
# This is a lightly-specified version of mailsuite/mailparser JSON.
# It focuses on the fields parsedmarc uses in forensic handling.
"headers": Dict[str, Any],
"subject": Optional[str],
"filename_safe_subject": Optional[str],
"date": Optional[str],
"from": EmailAddress,
"to": List[EmailAddress],
"cc": List[EmailAddress],
"bcc": List[EmailAddress],
"attachments": List[EmailAttachment],
"body": Optional[str],
"has_defects": bool,
"defects": Any,
"defects_categories": Any,
},
total=False,
)
class ForensicReport(TypedDict):
feedback_type: Optional[str]
user_agent: Optional[str]
version: Optional[str]
original_envelope_id: Optional[str]
original_mail_from: Optional[str]
original_rcpt_to: Optional[str]
arrival_date: str
arrival_date_utc: str
authentication_results: Optional[str]
delivery_result: Optional[str]
auth_failure: List[str]
authentication_mechanisms: List[str]
dkim_domain: Optional[str]
reported_domain: str
sample_headers_only: bool
source: IPSourceInfo
sample: str
parsed_sample: ParsedEmail
class SMTPTLSFailureDetails(TypedDict):
result_type: str
failed_session_count: int
class SMTPTLSFailureDetailsOptional(SMTPTLSFailureDetails, total=False):
sending_mta_ip: str
receiving_ip: str
receiving_mx_hostname: str
receiving_mx_helo: str
additional_info_uri: str
failure_reason_code: str
ip_address: str
class SMTPTLSPolicySummary(TypedDict):
policy_domain: str
policy_type: str
successful_session_count: int
failed_session_count: int
class SMTPTLSPolicy(SMTPTLSPolicySummary, total=False):
policy_strings: List[str]
mx_host_patterns: List[str]
failure_details: List[SMTPTLSFailureDetailsOptional]
class SMTPTLSReport(TypedDict):
organization_name: str
begin_date: str
end_date: str
contact_info: Union[str, List[str]]
report_id: str
policies: List[SMTPTLSPolicy]
class AggregateParsedReport(TypedDict):
report_type: Literal["aggregate"]
report: AggregateReport
class ForensicParsedReport(TypedDict):
report_type: Literal["forensic"]
report: ForensicReport
class SMTPTLSParsedReport(TypedDict):
report_type: Literal["smtp_tls"]
report: SMTPTLSReport
ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport]
class ParsingResults(TypedDict):
aggregate_reports: List[AggregateReport]
forensic_reports: List[ForensicReport]
smtp_tls_reports: List[SMTPTLSReport]

View File

@@ -4,26 +4,23 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional, Union
import logging
import os
from datetime import datetime
from datetime import timezone
from datetime import timedelta
from collections import OrderedDict
from expiringdict import ExpiringDict
import tempfile
import subprocess
import shutil
import mailparser
import json
import hashlib
import base64 import base64
import mailbox
import re
import csv import csv
import hashlib
import io import io
import json
import logging
import mailbox
import os
import re
import shutil
import subprocess
import tempfile
from datetime import datetime, timedelta, timezone
from typing import Optional, TypedDict, Union, cast
import mailparser
from expiringdict import ExpiringDict
try: try:
from importlib.resources import files from importlib.resources import files
@@ -32,19 +29,19 @@ except ImportError:
from importlib.resources import files from importlib.resources import files
from dateutil.parser import parse as parse_date
import dns.reversename
import dns.resolver
import dns.exception import dns.exception
import dns.resolver
import dns.reversename
import geoip2.database import geoip2.database
import geoip2.errors import geoip2.errors
import publicsuffixlist import publicsuffixlist
import requests import requests
from dateutil.parser import parse as parse_date
from parsedmarc.log import logger
import parsedmarc.resources.dbip import parsedmarc.resources.dbip
import parsedmarc.resources.maps import parsedmarc.resources.maps
from parsedmarc.constants import USER_AGENT from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
parenthesis_regex = re.compile(r"\s*\(.*\)\s*") parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
@@ -67,7 +64,24 @@ class DownloadError(RuntimeError):
"""Raised when an error occurs when downloading a file""" """Raised when an error occurs when downloading a file"""
def decode_base64(data) -> bytes: class ReverseDNSService(TypedDict):
name: str
type: Optional[str]
ReverseDNSMap = dict[str, ReverseDNSService]
class IPAddressInfo(TypedDict):
ip_address: str
reverse_dns: Optional[str]
country: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
def decode_base64(data: str) -> bytes:
""" """
Decodes a base64 string, with padding being optional Decodes a base64 string, with padding being optional
@@ -78,14 +92,14 @@ def decode_base64(data) -> bytes:
bytes: The decoded bytes bytes: The decoded bytes
""" """
data = bytes(data, encoding="ascii") data_bytes = bytes(data, encoding="ascii")
missing_padding = len(data) % 4 missing_padding = len(data_bytes) % 4
if missing_padding != 0: if missing_padding != 0:
data += b"=" * (4 - missing_padding) data_bytes += b"=" * (4 - missing_padding)
return base64.b64decode(data) return base64.b64decode(data_bytes)
def get_base_domain(domain: str) -> str: def get_base_domain(domain: str) -> Optional[str]:
""" """
Gets the base domain name for the given domain Gets the base domain name for the given domain
@@ -114,8 +128,8 @@ def query_dns(
record_type: str, record_type: str,
*, *,
cache: Optional[ExpiringDict] = None, cache: Optional[ExpiringDict] = None,
nameservers: list[str] = None, nameservers: Optional[list[str]] = None,
timeout: int = 2.0, timeout: float = 2.0,
) -> list[str]: ) -> list[str]:
""" """
Queries DNS Queries DNS
@@ -135,9 +149,9 @@ def query_dns(
record_type = record_type.upper() record_type = record_type.upper()
cache_key = "{0}_{1}".format(domain, record_type) cache_key = "{0}_{1}".format(domain, record_type)
if cache: if cache:
records = cache.get(cache_key, None) cached_records = cache.get(cache_key, None)
if records: if isinstance(cached_records, list):
return records return cast(list[str], cached_records)
resolver = dns.resolver.Resolver() resolver = dns.resolver.Resolver()
timeout = float(timeout) timeout = float(timeout)
@@ -151,26 +165,12 @@ def query_dns(
resolver.nameservers = nameservers resolver.nameservers = nameservers
resolver.timeout = timeout resolver.timeout = timeout
resolver.lifetime = timeout resolver.lifetime = timeout
if record_type == "TXT": records = list(
resource_records = list( map(
map( lambda r: r.to_text().replace('"', "").rstrip("."),
lambda r: r.strings, resolver.resolve(domain, record_type, lifetime=timeout),
resolver.resolve(domain, record_type, lifetime=timeout),
)
)
_resource_record = [
resource_record[0][:0].join(resource_record)
for resource_record in resource_records
if resource_record
]
records = [r.decode() for r in _resource_record]
else:
records = list(
map(
lambda r: r.to_text().replace('"', "").rstrip("."),
resolver.resolve(domain, record_type, lifetime=timeout),
)
) )
)
if cache: if cache:
cache[cache_key] = records cache[cache_key] = records
@@ -181,9 +181,9 @@ def get_reverse_dns(
ip_address, ip_address,
*, *,
cache: Optional[ExpiringDict] = None, cache: Optional[ExpiringDict] = None,
nameservers: list[str] = None, nameservers: Optional[list[str]] = None,
timeout: int = 2.0, timeout: float = 2.0,
) -> str: ) -> Optional[str]:
""" """
Resolves an IP address to a hostname using a reverse DNS query Resolves an IP address to a hostname using a reverse DNS query
@@ -201,12 +201,11 @@ def get_reverse_dns(
try: try:
address = dns.reversename.from_address(ip_address) address = dns.reversename.from_address(ip_address)
hostname = query_dns( hostname = query_dns(
address, "PTR", cache=cache, nameservers=nameservers, timeout=timeout str(address), "PTR", cache=cache, nameservers=nameservers, timeout=timeout
)[0] )[0]
except dns.exception.DNSException as e: except dns.exception.DNSException as e:
logger.warning(f"get_reverse_dns({ip_address}) exception: {e}") logger.debug(f"get_reverse_dns({ip_address}) exception: {e}")
pass
return hostname return hostname
@@ -238,7 +237,7 @@ def timestamp_to_human(timestamp: int) -> str:
def human_timestamp_to_datetime( def human_timestamp_to_datetime(
human_timestamp: str, *, to_utc: Optional[bool] = False human_timestamp: str, *, to_utc: bool = False
) -> datetime: ) -> datetime:
""" """
Converts a human-readable timestamp into a Python ``datetime`` object Converts a human-readable timestamp into a Python ``datetime`` object
@@ -269,10 +268,12 @@ def human_timestamp_to_unix_timestamp(human_timestamp: str) -> int:
float: The converted timestamp float: The converted timestamp
""" """
human_timestamp = human_timestamp.replace("T", " ") human_timestamp = human_timestamp.replace("T", " ")
return human_timestamp_to_datetime(human_timestamp).timestamp() return int(human_timestamp_to_datetime(human_timestamp).timestamp())
def get_ip_address_country(ip_address: str, *, db_path: Optional[str] = None) -> str: def get_ip_address_country(
ip_address: str, *, db_path: Optional[str] = None
) -> Optional[str]:
""" """
Returns the ISO code for the country associated Returns the ISO code for the country associated
with the given IPv4 or IPv6 address with the given IPv4 or IPv6 address
@@ -337,12 +338,12 @@ def get_ip_address_country(ip_address: str, *, db_path: Optional[str] = None) ->
def get_service_from_reverse_dns_base_domain( def get_service_from_reverse_dns_base_domain(
base_domain, base_domain,
*, *,
always_use_local_file: Optional[bool] = False, always_use_local_file: bool = False,
local_file_path: Optional[bool] = None, local_file_path: Optional[str] = None,
url: Optional[bool] = None, url: Optional[str] = None,
offline: Optional[bool] = False, offline: bool = False,
reverse_dns_map: Optional[bool] = None, reverse_dns_map: Optional[ReverseDNSMap] = None,
) -> str: ) -> ReverseDNSService:
""" """
Returns the service name of a given base domain name from reverse DNS. Returns the service name of a given base domain name from reverse DNS.
@@ -359,12 +360,6 @@ def get_service_from_reverse_dns_base_domain(
the supplied reverse_dns_base_domain and the type will be None the supplied reverse_dns_base_domain and the type will be None
""" """
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = dict(name=row["name"], type=row["type"])
base_domain = base_domain.lower().strip() base_domain = base_domain.lower().strip()
if url is None: if url is None:
url = ( url = (
@@ -372,11 +367,24 @@ def get_service_from_reverse_dns_base_domain(
"/parsedmarc/master/parsedmarc/" "/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv" "resources/maps/base_reverse_dns_map.csv"
) )
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None: if reverse_dns_map is None:
reverse_dns_map = dict() reverse_dns_map_value = {}
else:
reverse_dns_map_value = reverse_dns_map
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
csv_file = io.StringIO() csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map) == 0: if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
try: try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...") logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT} headers = {"User-Agent": USER_AGENT}
@@ -393,7 +401,7 @@ def get_service_from_reverse_dns_base_domain(
logging.debug("Response body:") logging.debug("Response body:")
logger.debug(csv_file.read()) logger.debug(csv_file.read())
if len(reverse_dns_map) == 0: if len(reverse_dns_map_value) == 0:
logger.info("Loading included reverse DNS map...") logger.info("Loading included reverse DNS map...")
path = str( path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv") files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
@@ -402,10 +410,11 @@ def get_service_from_reverse_dns_base_domain(
path = local_file_path path = local_file_path
with open(path) as csv_file: with open(path) as csv_file:
load_csv(csv_file) load_csv(csv_file)
service: ReverseDNSService
try: try:
service = reverse_dns_map[base_domain] service = reverse_dns_map_value[base_domain]
except KeyError: except KeyError:
service = dict(name=base_domain, type=None) service = {"name": base_domain, "type": None}
return service return service
@@ -415,14 +424,14 @@ def get_ip_address_info(
*, *,
ip_db_path: Optional[str] = None, ip_db_path: Optional[str] = None,
reverse_dns_map_path: Optional[str] = None, reverse_dns_map_path: Optional[str] = None,
always_use_local_files: Optional[bool] = False, always_use_local_files: bool = False,
reverse_dns_map_url: Optional[str] = None, reverse_dns_map_url: Optional[str] = None,
cache: Optional[ExpiringDict] = None, cache: Optional[ExpiringDict] = None,
reverse_dns_map: Optional[dict] = None, reverse_dns_map: Optional[ReverseDNSMap] = None,
offline: Optional[bool] = False, offline: bool = False,
nameservers: Optional[list[str]] = None, nameservers: Optional[list[str]] = None,
timeout: Optional[float] = 2.0, timeout: float = 2.0,
) -> OrderedDict[str, str]: ) -> IPAddressInfo:
""" """
Returns reverse DNS and country information for the given IP address Returns reverse DNS and country information for the given IP address
@@ -440,17 +449,27 @@ def get_ip_address_info(
timeout (float): Sets the DNS timeout in seconds timeout (float): Sets the DNS timeout in seconds
Returns: Returns:
OrderedDict: ``ip_address``, ``reverse_dns``, ``country`` dict: ``ip_address``, ``reverse_dns``, ``country``
""" """
ip_address = ip_address.lower() ip_address = ip_address.lower()
if cache is not None: if cache is not None:
info = cache.get(ip_address, None) cached_info = cache.get(ip_address, None)
if info: if (
cached_info
and isinstance(cached_info, dict)
and "ip_address" in cached_info
):
logger.debug(f"IP address {ip_address} was found in cache") logger.debug(f"IP address {ip_address} was found in cache")
return info return cast(IPAddressInfo, cached_info)
info = OrderedDict() info: IPAddressInfo = {
info["ip_address"] = ip_address "ip_address": ip_address,
"reverse_dns": None,
"country": None,
"base_domain": None,
"name": None,
"type": None,
}
if offline: if offline:
reverse_dns = None reverse_dns = None
else: else:
@@ -460,9 +479,6 @@ def get_ip_address_info(
country = get_ip_address_country(ip_address, db_path=ip_db_path) country = get_ip_address_country(ip_address, db_path=ip_db_path)
info["country"] = country info["country"] = country
info["reverse_dns"] = reverse_dns info["reverse_dns"] = reverse_dns
info["base_domain"] = None
info["name"] = None
info["type"] = None
if reverse_dns is not None: if reverse_dns is not None:
base_domain = get_base_domain(reverse_dns) base_domain = get_base_domain(reverse_dns)
if base_domain is not None: if base_domain is not None:
@@ -487,7 +503,7 @@ def get_ip_address_info(
return info return info
def parse_email_address(original_address: str) -> OrderedDict[str, str]: def parse_email_address(original_address: str) -> dict[str, Optional[str]]:
if original_address[0] == "": if original_address[0] == "":
display_name = None display_name = None
else: else:
@@ -500,14 +516,12 @@ def parse_email_address(original_address: str) -> OrderedDict[str, str]:
local = address_parts[0].lower() local = address_parts[0].lower()
domain = address_parts[-1].lower() domain = address_parts[-1].lower()
return OrderedDict( return {
[ "display_name": display_name,
("display_name", display_name), "address": address,
("address", address), "local": local,
("local", local), "domain": domain,
("domain", domain), }
]
)
def get_filename_safe_string(string: str) -> str: def get_filename_safe_string(string: str) -> str:
@@ -568,7 +582,7 @@ def is_outlook_msg(content) -> bool:
) )
def convert_outlook_msg(msg_bytes: bytes) -> str: def convert_outlook_msg(msg_bytes: bytes) -> bytes:
""" """
Uses the ``msgconvert`` Perl utility to convert an Outlook MS file to Uses the ``msgconvert`` Perl utility to convert an Outlook MS file to
standard RFC 822 format standard RFC 822 format
@@ -577,7 +591,7 @@ def convert_outlook_msg(msg_bytes: bytes) -> str:
msg_bytes (bytes): the content of the .msg file msg_bytes (bytes): the content of the .msg file
Returns: Returns:
A RFC 822 string A RFC 822 bytes payload
""" """
if not is_outlook_msg(msg_bytes): if not is_outlook_msg(msg_bytes):
raise ValueError("The supplied bytes are not an Outlook MSG file") raise ValueError("The supplied bytes are not an Outlook MSG file")
@@ -605,8 +619,8 @@ def convert_outlook_msg(msg_bytes: bytes) -> str:
def parse_email( def parse_email(
data: Union[bytes, str], *, strip_attachment_payloads: Optional[bool] = False data: Union[bytes, str], *, strip_attachment_payloads: bool = False
): ) -> dict:
""" """
A simplified email parser A simplified email parser

View File

@@ -2,7 +2,7 @@
requires = [ requires = [
"hatchling>=1.27.0", "hatchling>=1.27.0",
] ]
requires_python = ">=3.9,<3.14" requires_python = ">=3.10,<3.15"
build-backend = "hatchling.build" build-backend = "hatchling.build"
[project] [project]
@@ -29,7 +29,7 @@ classifiers = [
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Programming Language :: Python :: 3" "Programming Language :: Python :: 3"
] ]
requires-python = ">=3.9, <3.14" requires-python = ">=3.10"
dependencies = [ dependencies = [
"azure-identity>=1.8.0", "azure-identity>=1.8.0",
"azure-monitor-ingestion>=1.0.0", "azure-monitor-ingestion>=1.0.0",
@@ -45,10 +45,10 @@ dependencies = [
"google-auth-httplib2>=0.1.0", "google-auth-httplib2>=0.1.0",
"google-auth-oauthlib>=0.4.6", "google-auth-oauthlib>=0.4.6",
"google-auth>=2.3.3", "google-auth>=2.3.3",
"imapclient>=2.1.0", "imapclient>=3.1.0",
"kafka-python-ng>=2.2.2", "kafka-python-ng>=2.2.2",
"lxml>=4.4.0", "lxml>=4.4.0",
"mailsuite>=1.9.18", "mailsuite>=1.11.2",
"msgraph-core==0.2.2", "msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0", "opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0", "publicsuffixlist>=0.10.0",

1769
tests.py Normal file → Executable file

File diff suppressed because it is too large Load Diff