Compare commits

..

13 Commits

Author SHA1 Message Date
Sean Whalen
1542936468 Bump version to 9.5.4, enhance Maildir folder handling, and add config key aliases for environment variable compatibility 2026-03-25 23:22:46 -04:00
Sean Whalen
fb3c38a8b8 9.5.3
- Fixed `FileNotFoundError` when using Maildir with Docker volume mounts. Python's `mailbox.Maildir(create=True)` only creates `cur/new/tmp` subdirectories when the top-level directory doesn't exist; Docker volume mounts pre-create the directory as empty, skipping subdirectory creation. parsedmarc now explicitly creates the subdirectories when `maildir_create` is enabled.
- Maildir UID mismatch no longer crashes the process. In Docker containers where volume ownership differs from the container UID, parsedmarc now logs a warning instead of raising an exception. Also handles `os.setuid` failures gracefully in containers without `CAP_SETUID`.
- Token file writes (MS Graph and Gmail) now create parent directories automatically, preventing `FileNotFoundError` when the token path points to a directory that doesn't yet exist.
- File paths from config (`token_file`, `credentials_file`, `cert_path`, `log_file`, `output`, `ip_db_path`, `maildir_path`, syslog cert paths, etc.) now expand `~` and `$VAR` references via `os.path.expanduser`/`os.path
2026-03-25 21:29:08 -04:00
Sean Whalen
c9a6145505 9.5.3
- Fixed `FileNotFoundError` when using Maildir with Docker volume mounts. Python's `mailbox.Maildir(create=True)` only creates `cur/new/tmp` subdirectories when the top-level directory doesn't exist; Docker volume mounts pre-create the directory as empty, skipping subdirectory creation. parsedmarc now explicitly creates the subdirectories when `maildir_create` is enabled.
2026-03-25 21:13:34 -04:00
Sean Whalen
e1bdbeb257 Bump version to 9.5.2 and fix interpolation issues in config parser 2026-03-25 20:21:08 -04:00
Sean Whalen
12c4676b79 9.5.1
- Correct ISO format for MSGraphConnection timestamps (PR #706)
2026-03-25 19:43:24 -04:00
mihugo
cda039ee27 Correct ISO format for MSGraphConnection timestamps (#706)
Fix formatting of ISO 8601 date strings for MSGraphConnection.  format yyyy-dd-mmThh:MM:SS.zzzzzz+00:00 already has a timezone indicated. The extra Z is invalid in this format.  specifying a "since" in config file causes msgraph to error due to invalid time stamp.
2026-03-25 19:38:23 -04:00
Sean Whalen
ff0ca6538c 9.5.0
Add environment variable configuration support and update documentation

- Introduced support for configuration via environment variables using the `PARSEDMARC_{SECTION}_{KEY}` format.
- Added `PARSEDMARC_CONFIG_FILE` variable to specify the config file path.
- Enabled env-only mode for file-less Docker deployments.
- Implemented explicit read permission checks on config files.
- Updated changelog and usage documentation to reflect these changes.
2026-03-25 19:25:21 -04:00
Sean Whalen
2032438d3b 9.4.0
### Added

- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
- Add premade OpenSearch index patterns, visualizations, and dashboards

### Changed

- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
- Bump OpenSearch support to `< 4`

### Fixed

- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
2026-03-23 17:08:26 -04:00
Sean Whalen
1e95c5d30b 9.3.1
Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
- Splunk HEC `skip_certificate_verification` now works correctly with self-signed certificates
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
- Enhanced error handling for output client initialization
2026-03-22 14:38:32 -04:00
Sean Whalen
cb2384be83 Copy report before modifying begin_date and end_date in save_smtp_tls_report functions 2026-03-22 13:13:21 -04:00
Sean Whalen
9a5b5310fa Update Grafana and Splunk environment variables in docker-compose for consistency 2026-03-22 12:40:42 -04:00
Sean Whalen
9849598100 Formatting 2026-03-21 16:17:35 -04:00
Sean Whalen
e82f3e58a1 SIGHUP-based configuration reload for watch mode (#697)
* Enhance mailbox connection watch method to support reload functionality

- Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so.
- Modified related tests to accommodate the new method signature.
- Changed logger calls from `critical` to `error` for consistency in logging severity.
- Added a new settings file for Claude with specific permissions for testing and code checks.

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#698)

* Initial plan

* Fix reload state consistency, resource leaks, stale opts; add tests

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#699)

* Initial plan

* Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based configuration reload: address review feedback (#700)

* Initial plan

* Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Best-effort initialization for optional output clients in watch mode (#701)

* Initial plan

* Wrap optional output client init in try/except for best-effort initialization

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Fix SIGHUP reload tight-loop in watch mode (#702)

* Initial plan

* Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fix resource leak when HEC config is invalid in `_init_output_clients()` (#703)

* Initial plan

* Fix resource leak: validate HEC settings before creating any output clients

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/38c73e09-789d-4d41-b75e-bbc61418859d

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Ensure SIGHUP never triggers a new email batch across all watch() implementations (#704)

* Initial plan

* Ensure SIGHUP never starts a new email batch in any watch() implementation

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/45d5be30-8f6b-4200-9bdd-15c655033f17

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based config reload for watch mode: address review feedback (#705)

* Initial plan

* Address review feedback: Kafka SSL context, SIGHUP handler safety, test formatting

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/8f2fd48f-32a4-4258-9a89-06f7c7ac29bf

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Reverted changes by copilot that turned errors into warnings

* Enhance usage documentation for config reload: clarify behavior on successful reload and error handling

* Update CHANGELOG.md to reflect config reload enhancements

* Add pytest command to settings for silent output during testing

* Enhance resource management: add close methods for S3Client and HECClient, and improve IMAP connection handling during IDLE. Update CHANGELOG.md for config reload improvements and bug fixes.

* Update changelog to not include fixes within the same unreleased version

* Refactor changelog entries for clarity and consistency in configuration reload section

* Fix changelog entry for msgraph configuration check

* Update CHANGELOG..md

* make single list items on one line in the changelog instead of doing hard wraps

* Remove incorrect IMAP changes

* Rename 'should_reload' parameter to 'config_reloading' in mailbox connection methods for clarity

* Restore startup configuration checks

* Improve error logging for Elasticsearch and OpenSearch exceptions

* Bump version to 9.3.0 in constants.py

* Refactor GelfClient methods to use specific report types instead of generic dicts

* Refactor tests to use assertions consistently and improve type hints

---------

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2026-03-21 16:14:48 -04:00
23 changed files with 1670 additions and 345 deletions

View File

@@ -7,7 +7,8 @@
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
"Bash(ls tests*)",
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
"Bash(python -m pytest tests.py --no-header -q)"
],
"additionalDirectories": [
"/tmp"

View File

@@ -52,6 +52,7 @@
"geoipupdate",
"Geolite",
"geolocation",
"getuid",
"githubpages",
"Grafana",
"hostnames",
@@ -75,6 +76,7 @@
"LISTSERV",
"loganalytics",
"lxml",
"Maildir",
"mailparser",
"mailrelay",
"mailsuite",

View File

@@ -42,7 +42,7 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
### Key modules
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing (`_load_config` + `_parse_config`), output orchestration. Supports configuration via INI files, `PARSEDMARC_{SECTION}_{KEY}` environment variables, or both (env vars override file values).
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
@@ -52,6 +52,10 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError``InvalidDMARCReport``InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
### Configuration
Config priority: CLI args > env vars > config file > defaults. Env var naming: `PARSEDMARC_{SECTION}_{KEY}` (e.g. `PARSEDMARC_IMAP_PASSWORD`). Section names with underscores use longest-prefix matching (`PARSEDMARC_SPLUNK_HEC_TOKEN``[splunk_hec] token`). Some INI keys have short aliases for env var friendliness (e.g. `[maildir] create` for `maildir_create`). File path values are expanded via `os.path.expanduser`/`os.path.expandvars`. Config can be loaded purely from env vars with no file (`PARSEDMARC_CONFIG_FILE` sets the file path).
### Caching
IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour (via `ExpiringDict`).
@@ -62,3 +66,6 @@ IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour
- TypedDict for structured data, type hints throughout
- Python ≥3.10 required
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`
- File path config values must be wrapped with `_expand_path()` in `cli.py`
- Maildir UID checks are intentionally relaxed (warn, don't crash) for Docker compatibility
- Token file writes must create parent directories before opening for write

View File

@@ -1,23 +1,96 @@
# Changelog
## 9.5.4
### Fixed
- Maildir `fetch_messages` now respects the `reports_folder` argument. Previously it always read from the top-level Maildir, ignoring the configured reports folder. `fetch_message`, `delete_message`, and `move_message` now also operate on the correct active folder.
- Config key aliases for env var compatibility: `[maildir] create` and `path` are now accepted as aliases for `maildir_create` and `maildir_path`, and `[msgraph] url` for `graph_url`. This allows natural env var names like `PARSEDMARC_MAILDIR_CREATE` to work without the redundant `PARSEDMARC_MAILDIR_MAILDIR_CREATE`.
## 9.5.3
### Fixed
- Fixed `FileNotFoundError` when using Maildir with Docker volume mounts. Python's `mailbox.Maildir(create=True)` only creates `cur/new/tmp` subdirectories when the top-level directory doesn't exist; Docker volume mounts pre-create the directory as empty, skipping subdirectory creation. parsedmarc now explicitly creates the subdirectories when `maildir_create` is enabled.
- Maildir UID mismatch no longer crashes the process. In Docker containers where volume ownership differs from the container UID, parsedmarc now logs a warning instead of raising an exception. Also handles `os.setuid` failures gracefully in containers without `CAP_SETUID`.
- Token file writes (MS Graph and Gmail) now create parent directories automatically, preventing `FileNotFoundError` when the token path points to a directory that doesn't yet exist.
- File paths from config (`token_file`, `credentials_file`, `cert_path`, `log_file`, `output`, `ip_db_path`, `maildir_path`, syslog cert paths, etc.) now expand `~` and `$VAR` references via `os.path.expanduser`/`os.path.expandvars`.
## 9.5.2
### Fixed
- Fixed `ValueError: invalid interpolation syntax` when config values (from env vars or INI files) contain `%` characters, such as in passwords. Disabled ConfigParser's `%`-based string interpolation.
## 9.5.1
### Changes
- Correct ISO format for MSGraphConnection timestamps (PR #706)
## 9.5.0
### Added
- Environment variable configuration support: any config option can now be set via `PARSEDMARC_{SECTION}_{KEY}` environment variables (e.g. `PARSEDMARC_IMAP_PASSWORD`, `PARSEDMARC_SPLUNK_HEC_TOKEN`). Environment variables override config file values but are overridden by CLI arguments.
- `PARSEDMARC_CONFIG_FILE` environment variable to specify the config file path without the `-c` flag.
- Env-only mode: parsedmarc can now run without a config file when `PARSEDMARC_*` environment variables are set, enabling fully file-less Docker deployments.
- Explicit read permission check on config file, giving a clear error message when the container UID cannot read the file (e.g. `chmod 600` with a UID mismatch).
## 9.4.0
### Added
- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
- Add premade OpenSearch index patterns, visualizations, and dashboards
### Changed
- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
- Bump OpenSearch support to `< 4`
### Fixed
- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
## 9.3.1
### Breaking changes
- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
### Fixed
- Splunk HEC `skip_certificate_verification` now works correctly
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
## 9.3.0
### Added
- SIGHUP-based configuration reload for watch mode — update output
destinations, DNS/GeoIP settings, processing flags, and log level
without restarting the service or interrupting in-progress report
processing. Use `systemctl reload parsedmarc` when running under
systemd.
- Extracted `_parse_config_file()` and `_init_output_clients()` from
`_main()` in `cli.py` to support config reload and reduce code
duplication.
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
- Use `systemctl reload parsedmarc` when running under `systemd`.
- On a successful reload, old output clients are closed and recreated.
- On a failed reload, the previous configuration remains fully active.
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
### Fixed
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
## 9.2.1
### Added
- Better checking of `msconfig` configuration (PR #695)
- Better checking of `msgraph` configuration (PR #695)
### Changed

View File

@@ -15,7 +15,7 @@ services:
condition: service_healthy
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:2
image: opensearchproject/opensearch-dashboards:3
environment:
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
ports:
@@ -27,7 +27,7 @@ services:
grafana:
image: grafana/grafana:latest
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
- GRAFANA_PASSWORD=${GRAFANA_PASSWORD}
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
ports:
- "127.0.0.1:3000:3000"
@@ -41,5 +41,7 @@ services:
- SPLUNK_START_ARGS=--accept-license
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
ports:
- "127.0.0.1:8000:8000"
- "127.0.0.1:8088:8088"

View File

@@ -273,6 +273,8 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -300,6 +302,8 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -527,6 +531,96 @@ PUT _cluster/settings
Increasing this value increases resource usage.
:::
## Environment variable configuration
Any configuration option can be set via environment variables using the
naming convention `PARSEDMARC_{SECTION}_{KEY}` (uppercase). This is
especially useful for Docker deployments where file permissions make it
difficult to use config files for secrets.
**Priority order:** CLI arguments > environment variables > config file > defaults
### Examples
```bash
# Set IMAP credentials via env vars
export PARSEDMARC_IMAP_HOST=imap.example.com
export PARSEDMARC_IMAP_USER=dmarc@example.com
export PARSEDMARC_IMAP_PASSWORD=secret
# Elasticsearch
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://localhost:9200
export PARSEDMARC_ELASTICSEARCH_SSL=false
# Splunk HEC (note: section name splunk_hec becomes SPLUNK_HEC)
export PARSEDMARC_SPLUNK_HEC_URL=https://splunk.example.com
export PARSEDMARC_SPLUNK_HEC_TOKEN=my-hec-token
export PARSEDMARC_SPLUNK_HEC_INDEX=email
# General settings
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
export PARSEDMARC_GENERAL_DEBUG=true
```
### Specifying the config file via environment variable
```bash
export PARSEDMARC_CONFIG_FILE=/etc/parsedmarc.ini
parsedmarc
```
### Running without a config file (env-only mode)
When no config file is given (neither `-c` flag nor `PARSEDMARC_CONFIG_FILE`),
parsedmarc will still pick up any `PARSEDMARC_*` environment variables. This
enables fully file-less deployments:
```bash
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
export PARSEDMARC_GENERAL_OFFLINE=true
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://elasticsearch:9200
parsedmarc /path/to/reports/*
```
### Docker Compose example
```yaml
services:
parsedmarc:
image: parsedmarc:latest
environment:
PARSEDMARC_IMAP_HOST: imap.example.com
PARSEDMARC_IMAP_USER: dmarc@example.com
PARSEDMARC_IMAP_PASSWORD: ${IMAP_PASSWORD}
PARSEDMARC_MAILBOX_WATCH: "true"
PARSEDMARC_ELASTICSEARCH_HOSTS: http://elasticsearch:9200
PARSEDMARC_GENERAL_SAVE_AGGREGATE: "true"
PARSEDMARC_GENERAL_SAVE_FORENSIC: "true"
```
### Section name mapping
For sections with underscores in the name, the full section name is used:
| Section | Env var prefix |
|------------------|-------------------------------|
| `general` | `PARSEDMARC_GENERAL_` |
| `mailbox` | `PARSEDMARC_MAILBOX_` |
| `imap` | `PARSEDMARC_IMAP_` |
| `msgraph` | `PARSEDMARC_MSGRAPH_` |
| `elasticsearch` | `PARSEDMARC_ELASTICSEARCH_` |
| `opensearch` | `PARSEDMARC_OPENSEARCH_` |
| `splunk_hec` | `PARSEDMARC_SPLUNK_HEC_` |
| `kafka` | `PARSEDMARC_KAFKA_` |
| `smtp` | `PARSEDMARC_SMTP_` |
| `s3` | `PARSEDMARC_S3_` |
| `syslog` | `PARSEDMARC_SYSLOG_` |
| `gmail_api` | `PARSEDMARC_GMAIL_API_` |
| `maildir` | `PARSEDMARC_MAILDIR_` |
| `log_analytics` | `PARSEDMARC_LOG_ANALYTICS_` |
| `gelf` | `PARSEDMARC_GELF_` |
| `webhook` | `PARSEDMARC_WEBHOOK_` |
## Performance tuning
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
@@ -666,8 +760,15 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
Gmail API, Maildir path) are **not** reloaded — changing those still
requires a full restart.
If the new configuration file contains errors, the reload is aborted
and the previous configuration remains active. Check the logs for
On a **successful** reload, existing output client connections are
closed and new ones are created from the updated configuration. The
service then resumes watching with the new settings.
If the new configuration file contains errors (missing required
settings, unreachable output destinations, etc.), the **entire reload
is aborted** — no output clients are replaced and the previous
configuration remains fully active. This means a typo in one section
will not take down an otherwise working setup. Check the logs for
details:
```bash

File diff suppressed because one or more lines are too long

View File

@@ -1955,9 +1955,7 @@ def get_dmarc_reports_from_mailbox(
)
current_time = datetime.now(timezone.utc).strftime("%d-%b-%Y")
elif isinstance(connection, MSGraphConnection):
since = (
datetime.now(timezone.utc) - timedelta(minutes=_since)
).isoformat() + "Z"
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).isoformat()
current_time = datetime.now(timezone.utc).isoformat() + "Z"
elif isinstance(connection, GmailConnection):
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime(
@@ -2195,7 +2193,7 @@ def watch_inbox(
batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24,
should_reload: Optional[Callable] = None,
config_reloading: Optional[Callable] = None,
):
"""
Watches the mailbox for new messages and
@@ -2223,7 +2221,7 @@ def watch_inbox(
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
should_reload: Optional callable that returns True when a config
config_reloading: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
"""
@@ -2253,8 +2251,8 @@ def watch_inbox(
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if should_reload is not None:
watch_kwargs["should_reload"] = should_reload
if config_reloading is not None:
watch_kwargs["config_reloading"] = config_reloading
mailbox_connection.watch(**watch_kwargs)

View File

@@ -19,6 +19,7 @@ import yaml
from tqdm import tqdm
from parsedmarc import (
REVERSE_DNS_MAP,
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
ParserError,
@@ -48,7 +49,12 @@ from parsedmarc.mail import (
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
from parsedmarc.utils import (
get_base_domain,
get_reverse_dns,
is_mbox,
load_reverse_dns_map,
)
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
@@ -69,6 +75,84 @@ def _str_to_list(s):
return list(map(lambda i: i.lstrip(), _list))
def _expand_path(p: str) -> str:
"""Expand ``~`` and ``$VAR`` references in a file path."""
return os.path.expanduser(os.path.expandvars(p))
# All known INI config section names, used for env var resolution.
_KNOWN_SECTIONS = frozenset(
{
"general",
"mailbox",
"imap",
"msgraph",
"elasticsearch",
"opensearch",
"splunk_hec",
"kafka",
"smtp",
"s3",
"syslog",
"gmail_api",
"maildir",
"log_analytics",
"gelf",
"webhook",
}
)
def _resolve_section_key(suffix: str) -> tuple:
"""Resolve an env var suffix like ``IMAP_PASSWORD`` to ``('imap', 'password')``.
Uses longest-prefix matching against known section names so that
multi-word sections like ``splunk_hec`` are handled correctly.
Returns ``(None, None)`` when no known section matches.
"""
suffix_lower = suffix.lower()
best_section = None
best_key = None
for section in _KNOWN_SECTIONS:
section_prefix = section + "_"
if suffix_lower.startswith(section_prefix):
key = suffix_lower[len(section_prefix) :]
if key and (best_section is None or len(section) > len(best_section)):
best_section = section
best_key = key
return best_section, best_key
def _apply_env_overrides(config: ConfigParser) -> None:
"""Inject ``PARSEDMARC_*`` environment variables into *config*.
Environment variables matching ``PARSEDMARC_{SECTION}_{KEY}`` override
(or create) the corresponding config-file values. Sections are created
automatically when they do not yet exist.
"""
prefix = "PARSEDMARC_"
for env_key, env_value in os.environ.items():
if not env_key.startswith(prefix) or env_key == "PARSEDMARC_CONFIG_FILE":
continue
suffix = env_key[len(prefix) :]
section, key = _resolve_section_key(suffix)
if section is None:
logger.debug("Ignoring unrecognized env var: %s", env_key)
continue
if not config.has_section(section):
config.add_section(section)
config.set(section, key, env_value)
logger.debug("Config override from env: [%s] %s", section, key)
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
@@ -172,12 +256,39 @@ class ConfigurationError(Exception):
pass
def _parse_config_file(config_file, opts):
"""Parse a config file and update opts in place.
def _load_config(config_file: str | None = None) -> ConfigParser:
"""Load configuration from an INI file and/or environment variables.
Args:
config_file: Path to the .ini config file
opts: Namespace object to update with parsed values
config_file: Optional path to an .ini config file.
Returns:
A ``ConfigParser`` populated from the file (if given) and from any
``PARSEDMARC_*`` environment variables.
Raises:
ConfigurationError: If *config_file* is given but does not exist.
"""
config = ConfigParser(interpolation=None)
if config_file is not None:
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
if not os.access(abs_path, os.R_OK):
raise ConfigurationError(
"Unable to read {0} — check file permissions".format(abs_path)
)
config.read(config_file)
_apply_env_overrides(config)
return config
def _parse_config(config: ConfigParser, opts):
"""Apply a loaded ``ConfigParser`` to *opts* in place.
Args:
config: A ``ConfigParser`` (from ``_load_config``).
opts: Namespace object to update with parsed values.
Returns:
index_prefix_domain_map or None
@@ -185,13 +296,8 @@ def _parse_config_file(config_file, opts):
Raises:
ConfigurationError: If required settings are missing or invalid.
"""
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
opts.silent = True
config = ConfigParser()
index_prefix_domain_map = None
config.read(config_file)
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
@@ -201,21 +307,8 @@ def _parse_config_file(config_file, opts):
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
map_path = general_config["index_prefix_domain_map"]
try:
with open(map_path) as f:
index_prefix_domain_map = yaml.safe_load(f)
except OSError as exc:
raise ConfigurationError(
"Failed to read index_prefix_domain_map file '{0}': {1}".format(
map_path, exc
)
) from exc
except yaml.YAMLError as exc:
raise ConfigurationError(
"Failed to parse YAML in index_prefix_domain_map "
"file '{0}': {1}".format(map_path, exc)
) from exc
with open(_expand_path(general_config["index_prefix_domain_map"])) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
if "strip_attachment_payloads" in general_config:
@@ -223,7 +316,7 @@ def _parse_config_file(config_file, opts):
general_config.getboolean("strip_attachment_payloads")
)
if "output" in general_config:
opts.output = general_config["output"]
opts.output = _expand_path(general_config["output"])
if "aggregate_json_filename" in general_config:
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
if "forensic_json_filename" in general_config:
@@ -279,11 +372,11 @@ def _parse_config_file(config_file, opts):
general_config.getboolean("fail_on_output_error")
)
if "log_file" in general_config:
opts.log_file = general_config["log_file"]
opts.log_file = _expand_path(general_config["log_file"])
if "n_procs" in general_config:
opts.n_procs = general_config.getint("n_procs")
if "ip_db_path" in general_config:
opts.ip_db_path = general_config["ip_db_path"]
opts.ip_db_path = _expand_path(general_config["ip_db_path"])
else:
opts.ip_db_path = None
if "always_use_local_files" in general_config:
@@ -291,7 +384,9 @@ def _parse_config_file(config_file, opts):
general_config.getboolean("always_use_local_files")
)
if "local_reverse_dns_map_path" in general_config:
opts.reverse_dns_map_path = general_config["local_reverse_dns_map_path"]
opts.reverse_dns_map_path = _expand_path(
general_config["local_reverse_dns_map_path"]
)
if "reverse_dns_map_url" in general_config:
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
if "prettify_json" in general_config:
@@ -406,7 +501,7 @@ def _parse_config_file(config_file, opts):
if "msgraph" in config.sections():
graph_config = config["msgraph"]
opts.graph_token_file = graph_config.get("token_file", ".token")
opts.graph_token_file = _expand_path(graph_config.get("token_file", ".token"))
if "auth_method" not in graph_config:
logger.info(
@@ -460,7 +555,9 @@ def _parse_config_file(config_file, opts):
if opts.graph_auth_method == AuthMethod.Certificate.name:
if "certificate_path" in graph_config:
opts.graph_certificate_path = graph_config["certificate_path"]
opts.graph_certificate_path = _expand_path(
graph_config["certificate_path"]
)
else:
raise ConfigurationError(
"certificate_path setting missing from the msgraph config section"
@@ -484,6 +581,8 @@ def _parse_config_file(config_file, opts):
if "graph_url" in graph_config:
opts.graph_url = graph_config["graph_url"]
elif "url" in graph_config:
opts.graph_url = graph_config["url"]
if "allow_unencrypted_storage" in graph_config:
opts.graph_allow_unencrypted_storage = bool(
@@ -517,7 +616,13 @@ def _parse_config_file(config_file, opts):
if "ssl" in elasticsearch_config:
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
opts.elasticsearch_ssl_cert_path = _expand_path(
elasticsearch_config["cert_path"]
)
if "skip_certificate_verification" in elasticsearch_config:
opts.elasticsearch_skip_certificate_verification = bool(
elasticsearch_config.getboolean("skip_certificate_verification")
)
if "user" in elasticsearch_config:
opts.elasticsearch_username = elasticsearch_config["user"]
if "password" in elasticsearch_config:
@@ -556,7 +661,11 @@ def _parse_config_file(config_file, opts):
if "ssl" in opensearch_config:
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
opts.opensearch_ssl_cert_path = _expand_path(opensearch_config["cert_path"])
if "skip_certificate_verification" in opensearch_config:
opts.opensearch_skip_certificate_verification = bool(
opensearch_config.getboolean("skip_certificate_verification")
)
if "user" in opensearch_config:
opts.opensearch_username = opensearch_config["user"]
if "password" in opensearch_config:
@@ -679,7 +788,7 @@ def _parse_config_file(config_file, opts):
if "subject" in smtp_config:
opts.smtp_subject = smtp_config["subject"]
if "attachment" in smtp_config:
opts.smtp_attachment = smtp_config["attachment"]
opts.smtp_attachment = _expand_path(smtp_config["attachment"])
if "message" in smtp_config:
opts.smtp_message = smtp_config["message"]
@@ -726,11 +835,11 @@ def _parse_config_file(config_file, opts):
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = syslog_config["cafile_path"]
opts.syslog_cafile_path = _expand_path(syslog_config["cafile_path"])
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = syslog_config["certfile_path"]
opts.syslog_certfile_path = _expand_path(syslog_config["certfile_path"])
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
opts.syslog_keyfile_path = _expand_path(syslog_config["keyfile_path"])
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
@@ -746,8 +855,13 @@ def _parse_config_file(config_file, opts):
if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"]
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
gmail_creds = gmail_api_config.get("credentials_file")
opts.gmail_api_credentials_file = (
_expand_path(gmail_creds) if gmail_creds else gmail_creds
)
opts.gmail_api_token_file = _expand_path(
gmail_api_config.get("token_file", ".token")
)
opts.gmail_api_include_spam_trash = bool(
gmail_api_config.getboolean("include_spam_trash", False)
)
@@ -760,21 +874,27 @@ def _parse_config_file(config_file, opts):
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
opts.gmail_api_service_account_user = gmail_api_config[
"service_account_user"
).strip()
].strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
opts.gmail_api_service_account_user = gmail_api_config[
"delegated_user"
).strip()
].strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
opts.maildir_path = maildir_api_config.get("maildir_path")
maildir_p = maildir_api_config.get(
"maildir_path", maildir_api_config.get("path")
)
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
opts.maildir_create = bool(
maildir_api_config.getboolean("maildir_create", fallback=False)
maildir_api_config.getboolean(
"maildir_create",
fallback=maildir_api_config.getboolean("create", fallback=False),
)
)
if "log_analytics" in config.sections():
@@ -823,6 +943,38 @@ def _parse_config_file(config_file, opts):
return index_prefix_domain_map
class _ElasticsearchHandle:
"""Sentinel so Elasticsearch participates in _close_output_clients."""
def close(self):
try:
conn = elastic.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
elastic.connections.remove_connection("default")
except Exception:
pass
class _OpenSearchHandle:
"""Sentinel so OpenSearch participates in _close_output_clients."""
def close(self):
try:
conn = opensearch.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
opensearch.connections.remove_connection("default")
except Exception:
pass
def _init_output_clients(opts):
"""Create output clients based on current opts.
@@ -834,78 +986,8 @@ def _init_output_clients(opts):
"""
clients = {}
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
if opts.s3_bucket:
try:
try:
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
@@ -914,11 +996,11 @@ def _init_output_clients(opts):
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
except Exception:
logger.warning("Failed to initialize S3 client; skipping", exc_info=True)
except Exception as e:
raise RuntimeError(f"S3: {e}") from e
if opts.syslog_server:
try:
try:
if opts.syslog_server:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
@@ -934,73 +1016,150 @@ def _init_output_clients(opts):
if opts.syslog_retry_delay is not None
else 5,
)
except Exception:
logger.warning(
"Failed to initialize syslog client; skipping", exc_info=True
)
except Exception as e:
raise RuntimeError(f"Syslog: {e}") from e
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
try:
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
except Exception:
logger.warning(
"Failed to initialize Splunk HEC client; skipping", exc_info=True
)
except Exception as e:
raise RuntimeError(f"Splunk HEC: {e}") from e
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_ssl:
ssl_context = create_default_context()
try:
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
try:
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl=opts.kafka_ssl,
ssl_context=ssl_context,
)
except Exception:
logger.warning("Failed to initialize Kafka client; skipping", exc_info=True)
except Exception as e:
raise RuntimeError(f"Kafka: {e}") from e
if opts.gelf_host:
try:
try:
if opts.gelf_host:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception:
logger.warning("Failed to initialize GELF client; skipping", exc_info=True)
except Exception as e:
raise RuntimeError(f"GELF: {e}") from e
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
try:
try:
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
except Exception:
logger.warning(
"Failed to initialize webhook client; skipping", exc_info=True
)
except Exception as e:
raise RuntimeError(f"Webhook: {e}") from e
# Elasticsearch and OpenSearch mutate module-level global state via
# connections.create_connection(), which cannot be rolled back if a later
# step fails. Initialise them last so that all other clients are created
# successfully first; this minimises the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
try:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
skip_certificate_verification=opts.elasticsearch_skip_certificate_verification,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
except Exception as e:
raise RuntimeError(f"Elasticsearch: {e}") from e
try:
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
skip_certificate_verification=opts.opensearch_skip_certificate_verification,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
clients["opensearch"] = _OpenSearchHandle()
except Exception as e:
raise RuntimeError(f"OpenSearch: {e}") from e
return clients
@@ -1032,22 +1191,24 @@ def _main():
if "policy_published" in report:
domain = report["policy_published"]["domain"]
elif "reported_domain" in report:
domain = report("reported_domain")
domain = report["reported_domain"]
elif "policies" in report:
domain = report["policies"][0]["domain"]
domain = report["policies"][0]["policy_domain"]
if domain:
domain = get_base_domain(domain)
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
if domain:
domain = domain.lower()
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
return None
def process_reports(reports_):
@@ -1058,6 +1219,22 @@ def _main():
logger.error(message)
output_errors.append(message)
if index_prefix_domain_map is not None:
filtered_tls = []
for report in reports_.get("smtp_tls_reports", []):
if get_index_prefix(report) is not None:
filtered_tls.append(report)
else:
domain = "unknown"
if "policies" in report and report["policies"]:
domain = report["policies"][0].get("policy_domain", "unknown")
logger.debug(
"Ignoring SMTP TLS report for domain not in "
"index_prefix_domain_map: %s",
domain,
)
reports_["smtp_tls_reports"] = filtered_tls
indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
@@ -1529,6 +1706,7 @@ def _main():
elasticsearch_index_prefix=None,
elasticsearch_ssl=True,
elasticsearch_ssl_cert_path=None,
elasticsearch_skip_certificate_verification=False,
elasticsearch_monthly_indexes=False,
elasticsearch_username=None,
elasticsearch_password=None,
@@ -1541,6 +1719,7 @@ def _main():
opensearch_index_prefix=None,
opensearch_ssl=True,
opensearch_ssl_cert_path=None,
opensearch_skip_certificate_verification=False,
opensearch_monthly_indexes=False,
opensearch_username=None,
opensearch_password=None,
@@ -1623,11 +1802,18 @@ def _main():
index_prefix_domain_map = None
if args.config_file:
config_file = args.config_file or os.environ.get("PARSEDMARC_CONFIG_FILE")
has_env_config = any(
k.startswith("PARSEDMARC_") and k != "PARSEDMARC_CONFIG_FILE"
for k in os.environ
)
if config_file or has_env_config:
try:
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
config = _load_config(config_file)
index_prefix_domain_map = _parse_config(config, opts)
except ConfigurationError as e:
logger.error(str(e))
logger.critical(str(e))
exit(-1)
logger.setLevel(logging.ERROR)
@@ -1649,6 +1835,8 @@ def _main():
except Exception as error:
logger.warning("Unable to write to log file: {}".format(error))
opts.active_log_file = opts.log_file
if (
opts.imap_host is None
and opts.graph_client_id is None
@@ -1664,14 +1852,8 @@ def _main():
# Initialize output clients
try:
clients = _init_output_clients(opts)
except elastic.ElasticsearchError:
logger.exception("Elasticsearch Error")
exit(1)
except opensearch.OpenSearchError:
logger.exception("OpenSearch Error")
exit(1)
except ConfigurationError as e:
logger.error(str(e))
logger.critical(str(e))
exit(1)
except Exception as error_:
logger.error("Output client error: {0}".format(error_))
@@ -1807,8 +1989,9 @@ def _main():
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error(
"IMAP user and password must be specified ifhost is specified"
"IMAP user and password must be specified if host is specified"
)
exit(1)
ssl = True
verify = True
@@ -1985,8 +2168,9 @@ def _main():
def _handle_sighup(signum, frame):
nonlocal _reload_requested
# Logging is not async-signal-safe; only set the flag here.
# The log message is emitted from the main loop when the flag is read.
_reload_requested = True
logger.info("SIGHUP received, config will reload after current batch")
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
@@ -1995,6 +2179,12 @@ def _main():
logger.info("Watching for email - Quit with ctrl-c")
while True:
# Re-check mailbox_watch in case a config reload disabled watch mode
if not opts.mailbox_watch:
logger.info(
"Mailbox watch disabled in reloaded configuration, stopping watcher"
)
break
try:
watch_inbox(
mailbox_connection=mailbox_connection,
@@ -2015,7 +2205,7 @@ def _main():
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
should_reload=lambda: _reload_requested,
config_reloading=lambda: _reload_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
@@ -2027,24 +2217,36 @@ def _main():
if not _reload_requested:
break
# Reload configuration — clear the flag first so that any new
# SIGHUP arriving while we reload will be captured for the next
# iteration rather than being silently dropped.
# Reload configuration — emit the log message here (not in the
# signal handler, which is not async-signal-safe), then clear the
# flag so that any new SIGHUP arriving while we reload will be
# captured for the next iteration rather than being silently dropped.
logger.info("SIGHUP received, config will reload after current batch")
_reload_requested = False
logger.info("Reloading configuration...")
try:
# Build a fresh opts starting from CLI-only defaults so that
# sections removed from the config file actually take effect.
new_opts = Namespace(**vars(opts_from_cli))
new_index_prefix_domain_map = _parse_config_file(
args.config_file, new_opts
)
new_config = _load_config(config_file)
new_index_prefix_domain_map = _parse_config(new_config, new_opts)
new_clients = _init_output_clients(new_opts)
# All steps succeeded — commit the changes atomically.
_close_output_clients(clients)
clients = new_clients
index_prefix_domain_map = new_index_prefix_domain_map
# Reload the reverse DNS map so changes to the
# map path/URL in the config take effect.
load_reverse_dns_map(
REVERSE_DNS_MAP,
always_use_local_file=new_opts.always_use_local_files,
local_file_path=new_opts.reverse_dns_map_path,
url=new_opts.reverse_dns_map_url,
offline=new_opts.offline,
)
for k, v in vars(new_opts).items():
setattr(opts, k, v)

View File

@@ -1,3 +1,3 @@
__version__ = "9.2.1"
__version__ = "9.5.4"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -268,6 +268,7 @@ def set_hosts(
*,
use_ssl: bool = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -280,6 +281,7 @@ def set_hosts(
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -291,10 +293,11 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
@@ -735,6 +738,7 @@ def save_smtp_tls_report_to_elasticsearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date

View File

@@ -3,9 +3,7 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
@@ -14,6 +12,7 @@ from parsedmarc import (
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
log_context_data = threading.local()
@@ -37,7 +36,7 @@ class GelfClient(object):
"""
self.host = host
self.port = port
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger = logging.getLogger("parsedmarc_gelf")
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
@@ -50,7 +49,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -58,13 +57,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row

View File

@@ -55,6 +55,7 @@ def _get_creds(
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes)
creds = flow.run_local_server(open_browser=False, oauth2_port=oauth2_port)
# Save the credentials for the next run
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
with Path(token_file).open("w") as token:
token.write(creds.to_json())
return creds
@@ -175,13 +176,13 @@ class GmailConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)
if should_reload and should_reload():
if config_reloading and config_reloading():
return
check_callback(self)

View File

@@ -56,6 +56,7 @@ def _load_token(token_path: Path) -> Optional[str]:
def _cache_auth_record(record: AuthenticationRecord, token_path: Path):
token = record.serialize()
token_path.parent.mkdir(parents=True, exist_ok=True)
with token_path.open("w") as token_file:
token_file.write(token)
@@ -278,12 +279,14 @@ class MSGraphConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)

View File

@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
@@ -111,5 +113,5 @@ class IMAPConnection(MailboxConnection):
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if should_reload and should_reload():
if config_reloading and config_reloading():
return

View File

@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
raise NotImplementedError

View File

@@ -19,29 +19,54 @@ class MaildirConnection(MailboxConnection):
):
self._maildir_path = maildir_path
self._maildir_create = maildir_create
maildir_owner = os.stat(maildir_path).st_uid
if os.getuid() != maildir_owner:
if os.getuid() == 0:
logger.warning(
"Switching uid to {} to access Maildir".format(maildir_owner)
)
os.setuid(maildir_owner)
try:
maildir_owner = os.stat(maildir_path).st_uid
except OSError:
maildir_owner = None
current_uid = os.getuid()
if maildir_owner is not None and current_uid != maildir_owner:
if current_uid == 0:
try:
logger.warning(
"Switching uid to {} to access Maildir".format(maildir_owner)
)
os.setuid(maildir_owner)
except OSError as e:
logger.warning(
"Failed to switch uid to {}: {}".format(maildir_owner, e)
)
else:
ex = "runtime uid {} differ from maildir {} owner {}".format(
os.getuid(), maildir_path, maildir_owner
logger.warning(
"Runtime uid {} differs from maildir {} owner {}. "
"Access may fail if permissions are insufficient.".format(
current_uid, maildir_path, maildir_owner
)
)
raise Exception(ex)
if maildir_create:
for subdir in ("cur", "new", "tmp"):
os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True)
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
self._active_folder: mailbox.Maildir = self._client
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
def _get_folder(self, folder_name: str) -> mailbox.Maildir:
"""Return a cached subfolder handle, creating it if needed."""
if folder_name not in self._subfolder_client:
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
return self._subfolder_client[folder_name]
def create_folder(self, folder_name: str):
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
self._get_folder(folder_name)
def fetch_messages(self, reports_folder: str, **kwargs):
return self._client.keys()
if reports_folder and reports_folder != "INBOX":
self._active_folder = self._get_folder(reports_folder)
else:
self._active_folder = self._client
return self._active_folder.keys()
def fetch_message(self, message_id: str) -> str:
msg = self._client.get(message_id)
msg = self._active_folder.get(message_id)
if msg is not None:
msg = msg.as_string()
if msg is not None:
@@ -49,26 +74,27 @@ class MaildirConnection(MailboxConnection):
return ""
def delete_message(self, message_id: str):
self._client.remove(message_id)
self._active_folder.remove(message_id)
def move_message(self, message_id: str, folder_name: str):
message_data = self._client.get(message_id)
message_data = self._active_folder.get(message_id)
if message_data is None:
return
if folder_name not in self._subfolder_client:
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
self._subfolder_client[folder_name].add(message_data)
self._client.remove(message_id)
dest = self._get_folder(folder_name)
dest.add(message_data)
self._active_folder.remove(message_id)
def keepalive(self):
return
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)

View File

@@ -271,6 +271,7 @@ def set_hosts(
*,
use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -286,6 +287,7 @@ def set_hosts(
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -300,10 +302,11 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
normalized_auth_type = (auth_type or "basic").strip().lower()
if normalized_auth_type == "awssigv4":
if not aws_region:
@@ -764,6 +767,7 @@ def save_smtp_tls_report_to_opensearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date

View File

@@ -93,3 +93,11 @@ class S3Client(object):
self.bucket.put_object(
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
)
def close(self):
"""Clean up the boto3 resource."""
try:
if self.s3.meta is not None:
self.s3.meta.client.close()
except Exception:
pass

View File

@@ -58,7 +58,7 @@ class HECClient(object):
self.source = source
self.session = requests.Session()
self.timeout = timeout
self.session.verify = verify
self.verify = verify
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
host=self.host, source=self.source, index=self.index
)
@@ -124,10 +124,12 @@ class HECClient(object):
data["event"] = new_report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.session.verify:
if not self.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
@@ -161,10 +163,12 @@ class HECClient(object):
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.session.verify:
if not self.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
@@ -198,12 +202,18 @@ class HECClient(object):
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.session.verify:
if not self.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def close(self):
"""Close the underlying HTTP session."""
self.session.close()

View File

@@ -335,6 +335,76 @@ def get_ip_address_country(
return country
def load_reverse_dns_map(
reverse_dns_map: ReverseDNSMap,
*,
always_use_local_file: bool = False,
local_file_path: Optional[str] = None,
url: Optional[str] = None,
offline: bool = False,
) -> None:
"""
Loads the reverse DNS map from a URL or local file.
Clears and repopulates the given map dict in place. If the map is
fetched from a URL, that is tried first; on failure (or if offline/local
mode is selected) the bundled CSV is used as a fallback.
Args:
reverse_dns_map (dict): The map dict to populate (modified in place)
always_use_local_file (bool): Always use a local map file
local_file_path (str): Path to a local map file
url (str): URL to a reverse DNS map
offline (bool): Use the built-in copy of the reverse DNS map
"""
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map.clear()
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = {
"name": row["name"].strip(),
"type": row["type"].strip(),
}
csv_file = io.StringIO()
if not (offline or always_use_local_file):
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
def get_service_from_reverse_dns_base_domain(
base_domain,
*,
@@ -361,55 +431,21 @@ def get_service_from_reverse_dns_base_domain(
"""
base_domain = base_domain.lower().strip()
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None:
reverse_dns_map_value = {}
else:
reverse_dns_map_value = reverse_dns_map
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map_value) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
load_reverse_dns_map(
reverse_dns_map_value,
always_use_local_file=always_use_local_file,
local_file_path=local_file_path,
url=url,
offline=offline,
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
service: ReverseDNSService
try:
service = reverse_dns_map_value[base_domain]

View File

@@ -50,7 +50,7 @@ dependencies = [
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"opensearch-py>=2.4.2,<=4.0.0",
"publicsuffixlist>=0.10.0",
"pygelf>=0.4.2",
"requests>=2.22.0",

912
tests.py

File diff suppressed because it is too large Load Diff