Compare commits

..

7 Commits

Author SHA1 Message Date
Sean Whalen
ff0ca6538c 9.5.0
Add environment variable configuration support and update documentation

- Introduced support for configuration via environment variables using the `PARSEDMARC_{SECTION}_{KEY}` format.
- Added `PARSEDMARC_CONFIG_FILE` variable to specify the config file path.
- Enabled env-only mode for file-less Docker deployments.
- Implemented explicit read permission checks on config files.
- Updated changelog and usage documentation to reflect these changes.
2026-03-25 19:25:21 -04:00
Sean Whalen
2032438d3b 9.4.0
### Added

- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
- Add premade OpenSearch index patterns, visualizations, and dashboards

### Changed

- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
- Bump OpenSearch support to `< 4`

### Fixed

- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
2026-03-23 17:08:26 -04:00
Sean Whalen
1e95c5d30b 9.3.1
Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
- Splunk HEC `skip_certificate_verification` now works correctly with self-signed certificates
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
- Enhanced error handling for output client initialization
2026-03-22 14:38:32 -04:00
Sean Whalen
cb2384be83 Copy report before modifying begin_date and end_date in save_smtp_tls_report functions 2026-03-22 13:13:21 -04:00
Sean Whalen
9a5b5310fa Update Grafana and Splunk environment variables in docker-compose for consistency 2026-03-22 12:40:42 -04:00
Sean Whalen
9849598100 Formatting 2026-03-21 16:17:35 -04:00
Sean Whalen
e82f3e58a1 SIGHUP-based configuration reload for watch mode (#697)
* Enhance mailbox connection watch method to support reload functionality

- Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so.
- Modified related tests to accommodate the new method signature.
- Changed logger calls from `critical` to `error` for consistency in logging severity.
- Added a new settings file for Claude with specific permissions for testing and code checks.

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#698)

* Initial plan

* Fix reload state consistency, resource leaks, stale opts; add tests

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#699)

* Initial plan

* Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based configuration reload: address review feedback (#700)

* Initial plan

* Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Best-effort initialization for optional output clients in watch mode (#701)

* Initial plan

* Wrap optional output client init in try/except for best-effort initialization

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Fix SIGHUP reload tight-loop in watch mode (#702)

* Initial plan

* Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fix resource leak when HEC config is invalid in `_init_output_clients()` (#703)

* Initial plan

* Fix resource leak: validate HEC settings before creating any output clients

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/38c73e09-789d-4d41-b75e-bbc61418859d

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Ensure SIGHUP never triggers a new email batch across all watch() implementations (#704)

* Initial plan

* Ensure SIGHUP never starts a new email batch in any watch() implementation

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/45d5be30-8f6b-4200-9bdd-15c655033f17

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based config reload for watch mode: address review feedback (#705)

* Initial plan

* Address review feedback: Kafka SSL context, SIGHUP handler safety, test formatting

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/8f2fd48f-32a4-4258-9a89-06f7c7ac29bf

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Reverted changes by copilot that turned errors into warnings

* Enhance usage documentation for config reload: clarify behavior on successful reload and error handling

* Update CHANGELOG.md to reflect config reload enhancements

* Add pytest command to settings for silent output during testing

* Enhance resource management: add close methods for S3Client and HECClient, and improve IMAP connection handling during IDLE. Update CHANGELOG.md for config reload improvements and bug fixes.

* Update changelog to not include fixes within the same unreleased version

* Refactor changelog entries for clarity and consistency in configuration reload section

* Fix changelog entry for msgraph configuration check

* Update CHANGELOG..md

* make single list items on one line in the changelog instead of doing hard wraps

* Remove incorrect IMAP changes

* Rename 'should_reload' parameter to 'config_reloading' in mailbox connection methods for clarity

* Restore startup configuration checks

* Improve error logging for Elasticsearch and OpenSearch exceptions

* Bump version to 9.3.0 in constants.py

* Refactor GelfClient methods to use specific report types instead of generic dicts

* Refactor tests to use assertions consistently and improve type hints

---------

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2026-03-21 16:14:48 -04:00
21 changed files with 1269 additions and 330 deletions

View File

@@ -7,7 +7,8 @@
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
"Bash(ls tests*)",
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
"Bash(python -m pytest tests.py --no-header -q)"
],
"additionalDirectories": [
"/tmp"

View File

@@ -1,23 +1,68 @@
# Changelog
## 9.5.0
### Added
- Environment variable configuration support: any config option can now be set via `PARSEDMARC_{SECTION}_{KEY}` environment variables (e.g. `PARSEDMARC_IMAP_PASSWORD`, `PARSEDMARC_SPLUNK_HEC_TOKEN`). Environment variables override config file values but are overridden by CLI arguments.
- `PARSEDMARC_CONFIG_FILE` environment variable to specify the config file path without the `-c` flag.
- Env-only mode: parsedmarc can now run without a config file when `PARSEDMARC_*` environment variables are set, enabling fully file-less Docker deployments.
- Explicit read permission check on config file, giving a clear error message when the container UID cannot read the file (e.g. `chmod 600` with a UID mismatch).
## 9.4.0
### Added
- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
- Add premade OpenSearch index patterns, visualizations, and dashboards
### Changed
- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
- Bump OpenSearch support to `< 4`
### Fixed
- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
## 9.3.1
### Breaking changes
- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
### Fixed
- Splunk HEC `skip_certificate_verification` now works correctly
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
## 9.3.0
### Added
- SIGHUP-based configuration reload for watch mode — update output
destinations, DNS/GeoIP settings, processing flags, and log level
without restarting the service or interrupting in-progress report
processing. Use `systemctl reload parsedmarc` when running under
systemd.
- Extracted `_parse_config_file()` and `_init_output_clients()` from
`_main()` in `cli.py` to support config reload and reduce code
duplication.
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
- Use `systemctl reload parsedmarc` when running under `systemd`.
- On a successful reload, old output clients are closed and recreated.
- On a failed reload, the previous configuration remains fully active.
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
### Fixed
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
## 9.2.1
### Added
- Better checking of `msconfig` configuration (PR #695)
- Better checking of `msgraph` configuration (PR #695)
### Changed

View File

@@ -15,7 +15,7 @@ services:
condition: service_healthy
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:2
image: opensearchproject/opensearch-dashboards:3
environment:
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
ports:
@@ -27,7 +27,7 @@ services:
grafana:
image: grafana/grafana:latest
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
- GRAFANA_PASSWORD=${GRAFANA_PASSWORD}
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
ports:
- "127.0.0.1:3000:3000"
@@ -41,5 +41,7 @@ services:
- SPLUNK_START_ARGS=--accept-license
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
ports:
- "127.0.0.1:8000:8000"
- "127.0.0.1:8088:8088"

View File

@@ -273,6 +273,8 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -300,6 +302,8 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -527,6 +531,96 @@ PUT _cluster/settings
Increasing this value increases resource usage.
:::
## Environment variable configuration
Any configuration option can be set via environment variables using the
naming convention `PARSEDMARC_{SECTION}_{KEY}` (uppercase). This is
especially useful for Docker deployments where file permissions make it
difficult to use config files for secrets.
**Priority order:** CLI arguments > environment variables > config file > defaults
### Examples
```bash
# Set IMAP credentials via env vars
export PARSEDMARC_IMAP_HOST=imap.example.com
export PARSEDMARC_IMAP_USER=dmarc@example.com
export PARSEDMARC_IMAP_PASSWORD=secret
# Elasticsearch
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://localhost:9200
export PARSEDMARC_ELASTICSEARCH_SSL=false
# Splunk HEC (note: section name splunk_hec becomes SPLUNK_HEC)
export PARSEDMARC_SPLUNK_HEC_URL=https://splunk.example.com
export PARSEDMARC_SPLUNK_HEC_TOKEN=my-hec-token
export PARSEDMARC_SPLUNK_HEC_INDEX=email
# General settings
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
export PARSEDMARC_GENERAL_DEBUG=true
```
### Specifying the config file via environment variable
```bash
export PARSEDMARC_CONFIG_FILE=/etc/parsedmarc.ini
parsedmarc
```
### Running without a config file (env-only mode)
When no config file is given (neither `-c` flag nor `PARSEDMARC_CONFIG_FILE`),
parsedmarc will still pick up any `PARSEDMARC_*` environment variables. This
enables fully file-less deployments:
```bash
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
export PARSEDMARC_GENERAL_OFFLINE=true
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://elasticsearch:9200
parsedmarc /path/to/reports/*
```
### Docker Compose example
```yaml
services:
parsedmarc:
image: parsedmarc:latest
environment:
PARSEDMARC_IMAP_HOST: imap.example.com
PARSEDMARC_IMAP_USER: dmarc@example.com
PARSEDMARC_IMAP_PASSWORD: ${IMAP_PASSWORD}
PARSEDMARC_MAILBOX_WATCH: "true"
PARSEDMARC_ELASTICSEARCH_HOSTS: http://elasticsearch:9200
PARSEDMARC_GENERAL_SAVE_AGGREGATE: "true"
PARSEDMARC_GENERAL_SAVE_FORENSIC: "true"
```
### Section name mapping
For sections with underscores in the name, the full section name is used:
| Section | Env var prefix |
|------------------|-------------------------------|
| `general` | `PARSEDMARC_GENERAL_` |
| `mailbox` | `PARSEDMARC_MAILBOX_` |
| `imap` | `PARSEDMARC_IMAP_` |
| `msgraph` | `PARSEDMARC_MSGRAPH_` |
| `elasticsearch` | `PARSEDMARC_ELASTICSEARCH_` |
| `opensearch` | `PARSEDMARC_OPENSEARCH_` |
| `splunk_hec` | `PARSEDMARC_SPLUNK_HEC_` |
| `kafka` | `PARSEDMARC_KAFKA_` |
| `smtp` | `PARSEDMARC_SMTP_` |
| `s3` | `PARSEDMARC_S3_` |
| `syslog` | `PARSEDMARC_SYSLOG_` |
| `gmail_api` | `PARSEDMARC_GMAIL_API_` |
| `maildir` | `PARSEDMARC_MAILDIR_` |
| `log_analytics` | `PARSEDMARC_LOG_ANALYTICS_` |
| `gelf` | `PARSEDMARC_GELF_` |
| `webhook` | `PARSEDMARC_WEBHOOK_` |
## Performance tuning
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
@@ -666,8 +760,15 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
Gmail API, Maildir path) are **not** reloaded — changing those still
requires a full restart.
If the new configuration file contains errors, the reload is aborted
and the previous configuration remains active. Check the logs for
On a **successful** reload, existing output client connections are
closed and new ones are created from the updated configuration. The
service then resumes watching with the new settings.
If the new configuration file contains errors (missing required
settings, unreachable output destinations, etc.), the **entire reload
is aborted** — no output clients are replaced and the previous
configuration remains fully active. This means a typo in one section
will not take down an otherwise working setup. Check the logs for
details:
```bash

File diff suppressed because one or more lines are too long

View File

@@ -2195,7 +2195,7 @@ def watch_inbox(
batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24,
should_reload: Optional[Callable] = None,
config_reloading: Optional[Callable] = None,
):
"""
Watches the mailbox for new messages and
@@ -2223,7 +2223,7 @@ def watch_inbox(
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
should_reload: Optional callable that returns True when a config
config_reloading: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
"""
@@ -2253,8 +2253,8 @@ def watch_inbox(
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if should_reload is not None:
watch_kwargs["should_reload"] = should_reload
if config_reloading is not None:
watch_kwargs["config_reloading"] = config_reloading
mailbox_connection.watch(**watch_kwargs)

View File

@@ -19,6 +19,7 @@ import yaml
from tqdm import tqdm
from parsedmarc import (
REVERSE_DNS_MAP,
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
ParserError,
@@ -48,7 +49,12 @@ from parsedmarc.mail import (
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
from parsedmarc.utils import (
get_base_domain,
get_reverse_dns,
is_mbox,
load_reverse_dns_map,
)
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
@@ -69,6 +75,79 @@ def _str_to_list(s):
return list(map(lambda i: i.lstrip(), _list))
# All known INI config section names, used for env var resolution.
_KNOWN_SECTIONS = frozenset(
{
"general",
"mailbox",
"imap",
"msgraph",
"elasticsearch",
"opensearch",
"splunk_hec",
"kafka",
"smtp",
"s3",
"syslog",
"gmail_api",
"maildir",
"log_analytics",
"gelf",
"webhook",
}
)
def _resolve_section_key(suffix: str) -> tuple:
"""Resolve an env var suffix like ``IMAP_PASSWORD`` to ``('imap', 'password')``.
Uses longest-prefix matching against known section names so that
multi-word sections like ``splunk_hec`` are handled correctly.
Returns ``(None, None)`` when no known section matches.
"""
suffix_lower = suffix.lower()
best_section = None
best_key = None
for section in _KNOWN_SECTIONS:
section_prefix = section + "_"
if suffix_lower.startswith(section_prefix):
key = suffix_lower[len(section_prefix) :]
if key and (best_section is None or len(section) > len(best_section)):
best_section = section
best_key = key
return best_section, best_key
def _apply_env_overrides(config: ConfigParser) -> None:
"""Inject ``PARSEDMARC_*`` environment variables into *config*.
Environment variables matching ``PARSEDMARC_{SECTION}_{KEY}`` override
(or create) the corresponding config-file values. Sections are created
automatically when they do not yet exist.
"""
prefix = "PARSEDMARC_"
for env_key, env_value in os.environ.items():
if not env_key.startswith(prefix) or env_key == "PARSEDMARC_CONFIG_FILE":
continue
suffix = env_key[len(prefix) :]
section, key = _resolve_section_key(suffix)
if section is None:
logger.debug("Ignoring unrecognized env var: %s", env_key)
continue
if not config.has_section(section):
config.add_section(section)
config.set(section, key, env_value)
logger.debug("Config override from env: [%s] %s", section, key)
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
@@ -172,12 +251,39 @@ class ConfigurationError(Exception):
pass
def _parse_config_file(config_file, opts):
"""Parse a config file and update opts in place.
def _load_config(config_file: str | None = None) -> ConfigParser:
"""Load configuration from an INI file and/or environment variables.
Args:
config_file: Path to the .ini config file
opts: Namespace object to update with parsed values
config_file: Optional path to an .ini config file.
Returns:
A ``ConfigParser`` populated from the file (if given) and from any
``PARSEDMARC_*`` environment variables.
Raises:
ConfigurationError: If *config_file* is given but does not exist.
"""
config = ConfigParser()
if config_file is not None:
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
if not os.access(abs_path, os.R_OK):
raise ConfigurationError(
"Unable to read {0} — check file permissions".format(abs_path)
)
config.read(config_file)
_apply_env_overrides(config)
return config
def _parse_config(config: ConfigParser, opts):
"""Apply a loaded ``ConfigParser`` to *opts* in place.
Args:
config: A ``ConfigParser`` (from ``_load_config``).
opts: Namespace object to update with parsed values.
Returns:
index_prefix_domain_map or None
@@ -185,13 +291,8 @@ def _parse_config_file(config_file, opts):
Raises:
ConfigurationError: If required settings are missing or invalid.
"""
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
opts.silent = True
config = ConfigParser()
index_prefix_domain_map = None
config.read(config_file)
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
@@ -201,20 +302,8 @@ def _parse_config_file(config_file, opts):
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
map_path = general_config["index_prefix_domain_map"]
try:
with open(map_path) as f:
index_prefix_domain_map = yaml.safe_load(f)
except OSError as exc:
raise ConfigurationError(
"Failed to read index_prefix_domain_map file "
"'{0}': {1}".format(map_path, exc)
) from exc
except yaml.YAMLError as exc:
raise ConfigurationError(
"Failed to parse YAML in index_prefix_domain_map "
"file '{0}': {1}".format(map_path, exc)
) from exc
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
if "strip_attachment_payloads" in general_config:
@@ -254,7 +343,7 @@ def _parse_config_file(config_file, opts):
except Exception as ns_error:
raise ConfigurationError(
"DNS pre-flight check failed: {}".format(ns_error)
)
) from ns_error
if not dummy_hostname:
raise ConfigurationError(
"DNS pre-flight check failed: no PTR record for {} from {}".format(
@@ -271,8 +360,6 @@ def _parse_config_file(config_file, opts):
opts.debug = bool(general_config.getboolean("debug"))
if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose"))
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
if "fail_on_output_error" in general_config:
@@ -519,6 +606,10 @@ def _parse_config_file(config_file, opts):
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
if "skip_certificate_verification" in elasticsearch_config:
opts.elasticsearch_skip_certificate_verification = bool(
elasticsearch_config.getboolean("skip_certificate_verification")
)
if "user" in elasticsearch_config:
opts.elasticsearch_username = elasticsearch_config["user"]
if "password" in elasticsearch_config:
@@ -558,6 +649,10 @@ def _parse_config_file(config_file, opts):
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
if "skip_certificate_verification" in opensearch_config:
opts.opensearch_skip_certificate_verification = bool(
opensearch_config.getboolean("skip_certificate_verification")
)
if "user" in opensearch_config:
opts.opensearch_username = opensearch_config["user"]
if "password" in opensearch_config:
@@ -761,15 +856,15 @@ def _parse_config_file(config_file, opts):
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
opts.gmail_api_service_account_user = gmail_api_config[
"service_account_user"
).strip()
].strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
opts.gmail_api_service_account_user = gmail_api_config[
"delegated_user"
).strip()
].strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
@@ -824,6 +919,38 @@ def _parse_config_file(config_file, opts):
return index_prefix_domain_map
class _ElasticsearchHandle:
"""Sentinel so Elasticsearch participates in _close_output_clients."""
def close(self):
try:
conn = elastic.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
elastic.connections.remove_connection("default")
except Exception:
pass
class _OpenSearchHandle:
"""Sentinel so OpenSearch participates in _close_output_clients."""
def close(self):
try:
conn = opensearch.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
opensearch.connections.remove_connection("default")
except Exception:
pass
def _init_output_clients(opts):
"""Create output clients based on current opts.
@@ -835,147 +962,180 @@ def _init_output_clients(opts):
"""
clients = {}
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
try:
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
except Exception as e:
raise RuntimeError(f"S3: {e}") from e
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
try:
if opts.syslog_server:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
if opts.syslog_server:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
except Exception as e:
raise RuntimeError(f"Syslog: {e}") from e
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
try:
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
except Exception as e:
raise RuntimeError(f"Splunk HEC: {e}") from e
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
try:
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
except Exception as e:
raise RuntimeError(f"Kafka: {e}") from e
if opts.gelf_host:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
try:
if opts.gelf_host:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception as e:
raise RuntimeError(f"GELF: {e}") from e
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
try:
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
except Exception as e:
raise RuntimeError(f"Webhook: {e}") from e
# Elasticsearch and OpenSearch mutate module-level global state via
# connections.create_connection(), which cannot be rolled back if a later
# step fails. Initialise them last so that all other clients are created
# successfully first; this minimises the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
try:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
skip_certificate_verification=opts.elasticsearch_skip_certificate_verification,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
except Exception as e:
raise RuntimeError(f"Elasticsearch: {e}") from e
try:
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
skip_certificate_verification=opts.opensearch_skip_certificate_verification,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
clients["opensearch"] = _OpenSearchHandle()
except Exception as e:
raise RuntimeError(f"OpenSearch: {e}") from e
return clients
@@ -1007,22 +1167,24 @@ def _main():
if "policy_published" in report:
domain = report["policy_published"]["domain"]
elif "reported_domain" in report:
domain = report("reported_domain")
domain = report["reported_domain"]
elif "policies" in report:
domain = report["policies"][0]["domain"]
domain = report["policies"][0]["policy_domain"]
if domain:
domain = get_base_domain(domain)
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
if domain:
domain = domain.lower()
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
return None
def process_reports(reports_):
@@ -1033,6 +1195,22 @@ def _main():
logger.error(message)
output_errors.append(message)
if index_prefix_domain_map is not None:
filtered_tls = []
for report in reports_.get("smtp_tls_reports", []):
if get_index_prefix(report) is not None:
filtered_tls.append(report)
else:
domain = "unknown"
if "policies" in report and report["policies"]:
domain = report["policies"][0].get("policy_domain", "unknown")
logger.debug(
"Ignoring SMTP TLS report for domain not in "
"index_prefix_domain_map: %s",
domain,
)
reports_["smtp_tls_reports"] = filtered_tls
indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
@@ -1504,6 +1682,7 @@ def _main():
elasticsearch_index_prefix=None,
elasticsearch_ssl=True,
elasticsearch_ssl_cert_path=None,
elasticsearch_skip_certificate_verification=False,
elasticsearch_monthly_indexes=False,
elasticsearch_username=None,
elasticsearch_password=None,
@@ -1516,6 +1695,7 @@ def _main():
opensearch_index_prefix=None,
opensearch_ssl=True,
opensearch_ssl_cert_path=None,
opensearch_skip_certificate_verification=False,
opensearch_monthly_indexes=False,
opensearch_username=None,
opensearch_password=None,
@@ -1598,11 +1778,18 @@ def _main():
index_prefix_domain_map = None
if args.config_file:
config_file = args.config_file or os.environ.get("PARSEDMARC_CONFIG_FILE")
has_env_config = any(
k.startswith("PARSEDMARC_") and k != "PARSEDMARC_CONFIG_FILE"
for k in os.environ
)
if config_file or has_env_config:
try:
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
config = _load_config(config_file)
index_prefix_domain_map = _parse_config(config, opts)
except ConfigurationError as e:
logger.error(str(e))
logger.critical(str(e))
exit(-1)
logger.setLevel(logging.ERROR)
@@ -1624,6 +1811,8 @@ def _main():
except Exception as error:
logger.warning("Unable to write to log file: {}".format(error))
opts.active_log_file = opts.log_file
if (
opts.imap_host is None
and opts.graph_client_id is None
@@ -1639,14 +1828,8 @@ def _main():
# Initialize output clients
try:
clients = _init_output_clients(opts)
except elastic.ElasticsearchError:
logger.exception("Elasticsearch Error")
exit(1)
except opensearch.OpenSearchError:
logger.exception("OpenSearch Error")
exit(1)
except ConfigurationError as e:
logger.error(str(e))
logger.critical(str(e))
exit(1)
except Exception as error_:
logger.error("Output client error: {0}".format(error_))
@@ -1782,8 +1965,9 @@ def _main():
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error(
"IMAP user and password must be specified ifhost is specified"
"IMAP user and password must be specified if host is specified"
)
exit(1)
ssl = True
verify = True
@@ -1960,8 +2144,9 @@ def _main():
def _handle_sighup(signum, frame):
nonlocal _reload_requested
# Logging is not async-signal-safe; only set the flag here.
# The log message is emitted from the main loop when the flag is read.
_reload_requested = True
logger.info("SIGHUP received, config will reload after current batch")
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
@@ -1970,7 +2155,12 @@ def _main():
logger.info("Watching for email - Quit with ctrl-c")
while True:
_reload_requested = False
# Re-check mailbox_watch in case a config reload disabled watch mode
if not opts.mailbox_watch:
logger.info(
"Mailbox watch disabled in reloaded configuration, stopping watcher"
)
break
try:
watch_inbox(
mailbox_connection=mailbox_connection,
@@ -1991,7 +2181,7 @@ def _main():
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
should_reload=lambda: _reload_requested,
config_reloading=lambda: _reload_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
@@ -2003,21 +2193,36 @@ def _main():
if not _reload_requested:
break
# Reload configuration
# Reload configuration — emit the log message here (not in the
# signal handler, which is not async-signal-safe), then clear the
# flag so that any new SIGHUP arriving while we reload will be
# captured for the next iteration rather than being silently dropped.
logger.info("SIGHUP received, config will reload after current batch")
_reload_requested = False
logger.info("Reloading configuration...")
try:
# Build a fresh opts starting from CLI-only defaults so that
# sections removed from the config file actually take effect.
new_opts = Namespace(**vars(opts_from_cli))
new_index_prefix_domain_map = _parse_config_file(
args.config_file, new_opts
)
new_config = _load_config(config_file)
new_index_prefix_domain_map = _parse_config(new_config, new_opts)
new_clients = _init_output_clients(new_opts)
# All steps succeeded — commit the changes atomically.
_close_output_clients(clients)
clients = new_clients
index_prefix_domain_map = new_index_prefix_domain_map
# Reload the reverse DNS map so changes to the
# map path/URL in the config take effect.
load_reverse_dns_map(
REVERSE_DNS_MAP,
always_use_local_file=new_opts.always_use_local_files,
local_file_path=new_opts.reverse_dns_map_path,
url=new_opts.reverse_dns_map_url,
offline=new_opts.offline,
)
for k, v in vars(new_opts).items():
setattr(opts, k, v)
@@ -2047,6 +2252,31 @@ def _main():
if opts.debug:
logger.setLevel(logging.DEBUG)
# Refresh FileHandler if log_file changed
old_log_file = getattr(opts, "active_log_file", None)
new_log_file = opts.log_file
if old_log_file != new_log_file:
# Remove old FileHandlers
for h in list(logger.handlers):
if isinstance(h, logging.FileHandler):
h.close()
logger.removeHandler(h)
# Add new FileHandler if configured
if new_log_file:
try:
fh = logging.FileHandler(new_log_file, "a")
file_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s"
" - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(file_formatter)
logger.addHandler(fh)
except Exception as log_error:
logger.warning(
"Unable to write to log file: {}".format(log_error)
)
opts.active_log_file = new_log_file
logger.info("Configuration reloaded successfully")
except Exception:
logger.exception(

View File

@@ -1,3 +1,3 @@
__version__ = "9.2.1"
__version__ = "9.5.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -268,6 +268,7 @@ def set_hosts(
*,
use_ssl: bool = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -280,6 +281,7 @@ def set_hosts(
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -291,10 +293,11 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
@@ -735,6 +738,7 @@ def save_smtp_tls_report_to_elasticsearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date

View File

@@ -3,9 +3,7 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
@@ -14,6 +12,7 @@ from parsedmarc import (
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
log_context_data = threading.local()
@@ -37,7 +36,7 @@ class GelfClient(object):
"""
self.host = host
self.port = port
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger = logging.getLogger("parsedmarc_gelf")
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
@@ -50,7 +49,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -58,13 +57,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row

View File

@@ -175,13 +175,15 @@ class GmailConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
sleep(check_timeout)
check_callback(self)
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)
def _find_label_id_for_label(self, label_name: str) -> str:

View File

@@ -278,13 +278,15 @@ class MSGraphConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
sleep(check_timeout)
check_callback(self)
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)
def _find_folder_id_from_folder_path(self, folder_name: str) -> str:

View File

@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
@@ -111,5 +113,5 @@ class IMAPConnection(MailboxConnection):
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if should_reload and should_reload():
if config_reloading and config_reloading():
return

View File

@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
raise NotImplementedError

View File

@@ -63,12 +63,14 @@ class MaildirConnection(MailboxConnection):
def keepalive(self):
return
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)

View File

@@ -271,6 +271,7 @@ def set_hosts(
*,
use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -286,6 +287,7 @@ def set_hosts(
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -300,10 +302,11 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
normalized_auth_type = (auth_type or "basic").strip().lower()
if normalized_auth_type == "awssigv4":
if not aws_region:
@@ -764,6 +767,7 @@ def save_smtp_tls_report_to_opensearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date

View File

@@ -93,3 +93,11 @@ class S3Client(object):
self.bucket.put_object(
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
)
def close(self):
"""Clean up the boto3 resource."""
try:
if self.s3.meta is not None:
self.s3.meta.client.close()
except Exception:
pass

View File

@@ -58,7 +58,7 @@ class HECClient(object):
self.source = source
self.session = requests.Session()
self.timeout = timeout
self.session.verify = verify
self.verify = verify
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
host=self.host, source=self.source, index=self.index
)
@@ -124,10 +124,12 @@ class HECClient(object):
data["event"] = new_report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.session.verify:
if not self.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
@@ -161,10 +163,12 @@ class HECClient(object):
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.session.verify:
if not self.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
@@ -198,12 +202,18 @@ class HECClient(object):
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.session.verify:
if not self.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def close(self):
"""Close the underlying HTTP session."""
self.session.close()

View File

@@ -335,6 +335,76 @@ def get_ip_address_country(
return country
def load_reverse_dns_map(
reverse_dns_map: ReverseDNSMap,
*,
always_use_local_file: bool = False,
local_file_path: Optional[str] = None,
url: Optional[str] = None,
offline: bool = False,
) -> None:
"""
Loads the reverse DNS map from a URL or local file.
Clears and repopulates the given map dict in place. If the map is
fetched from a URL, that is tried first; on failure (or if offline/local
mode is selected) the bundled CSV is used as a fallback.
Args:
reverse_dns_map (dict): The map dict to populate (modified in place)
always_use_local_file (bool): Always use a local map file
local_file_path (str): Path to a local map file
url (str): URL to a reverse DNS map
offline (bool): Use the built-in copy of the reverse DNS map
"""
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map.clear()
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = {
"name": row["name"].strip(),
"type": row["type"].strip(),
}
csv_file = io.StringIO()
if not (offline or always_use_local_file):
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
def get_service_from_reverse_dns_base_domain(
base_domain,
*,
@@ -361,55 +431,21 @@ def get_service_from_reverse_dns_base_domain(
"""
base_domain = base_domain.lower().strip()
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None:
reverse_dns_map_value = {}
else:
reverse_dns_map_value = reverse_dns_map
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map_value) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
load_reverse_dns_map(
reverse_dns_map_value,
always_use_local_file=always_use_local_file,
local_file_path=local_file_path,
url=url,
offline=offline,
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
service: ReverseDNSService
try:
service = reverse_dns_map_value[base_domain]

View File

@@ -50,7 +50,7 @@ dependencies = [
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"opensearch-py>=2.4.2,<=4.0.0",
"publicsuffixlist>=0.10.0",
"pygelf>=0.4.2",
"requests>=2.22.0",

557
tests.py
View File

@@ -3,19 +3,23 @@
from __future__ import absolute_import, print_function, unicode_literals
import io
import json
import os
import signal
import sys
import tempfile
import unittest
from base64 import urlsafe_b64encode
from configparser import ConfigParser
from glob import glob
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import cast
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from lxml import etree
from lxml import etree # type: ignore[import-untyped]
from googleapiclient.errors import HttpError
from httplib2 import Response
from imapclient.exceptions import IMAPClientError
@@ -32,6 +36,7 @@ from parsedmarc.mail.imap import IMAPConnection
import parsedmarc.mail.gmail as gmail_module
import parsedmarc.mail.graph as graph_module
import parsedmarc.mail.imap as imap_module
import parsedmarc.elastic
import parsedmarc.opensearch as opensearch_module
import parsedmarc.utils
@@ -154,7 +159,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
self.assertEqual(result["report_type"], "aggregate")
assert result["report_type"] == "aggregate"
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
def testParseReportFileAcceptsPathForEmail(self):
@@ -165,7 +170,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
self.assertEqual(result["report_type"], "aggregate")
assert result["report_type"] == "aggregate"
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
def testAggregateSamples(self):
@@ -176,10 +181,11 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
result = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
)
assert result["report_type"] == "aggregate"
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
print("Passed!")
def testEmptySample(self):
@@ -195,13 +201,13 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file:
sample_content = sample_file.read()
parsed_report = parsedmarc.parse_report_email(
email_result = parsedmarc.parse_report_email(
sample_content, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
)
assert email_result["report_type"] == "forensic"
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "forensic"
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
print("Passed!")
def testSmtpTlsSamples(self):
@@ -212,10 +218,9 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "smtp_tls"
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
print("Passed!")
def testOpenSearchSigV4RequiresRegion(self):
@@ -1275,10 +1280,26 @@ class TestImapFallbacks(unittest.TestCase):
class TestMailboxWatchSince(unittest.TestCase):
def setUp(self):
from parsedmarc.log import logger as _logger
_logger.disabled = True
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
self._stdout_patch.start()
self._stderr_patch.start()
def tearDown(self):
from parsedmarc.log import logger as _logger
_logger.disabled = False
self._stderr_patch.stop()
self._stdout_patch.stop()
def testWatchInboxPassesSinceToMailboxFetch(self):
mailbox_connection = SimpleNamespace()
def fake_watch(check_callback, check_timeout, should_reload=None):
def fake_watch(check_callback, check_timeout, config_reloading=None):
check_callback(mailbox_connection)
raise _BreakLoop()
@@ -1289,7 +1310,9 @@ class TestMailboxWatchSince(unittest.TestCase):
) as mocked:
with self.assertRaises(_BreakLoop):
parsedmarc.watch_inbox(
mailbox_connection=mailbox_connection,
mailbox_connection=cast(
parsedmarc.MailboxConnection, mailbox_connection
),
callback=callback,
check_timeout=1,
batch_size=10,
@@ -1337,34 +1360,50 @@ since = 2d
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
class _DummyMailboxConnection:
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
def __init__(self):
self.fetch_calls = []
self.fetch_calls: list[dict[str, object]] = []
def create_folder(self, folder_name):
def create_folder(self, folder_name: str):
return None
def fetch_messages(self, reports_folder, **kwargs):
def fetch_messages(self, reports_folder: str, **kwargs):
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
return []
def fetch_message(self, message_id, **kwargs):
def fetch_message(self, message_id) -> str:
return ""
def delete_message(self, message_id):
return None
def move_message(self, message_id, folder_name):
def move_message(self, message_id, folder_name: str):
return None
def keepalive(self):
return None
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
return None
class TestMailboxPerformance(unittest.TestCase):
def setUp(self):
from parsedmarc.log import logger as _logger
_logger.disabled = True
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
self._stdout_patch.start()
self._stderr_patch.start()
def tearDown(self):
from parsedmarc.log import logger as _logger
_logger.disabled = False
self._stderr_patch.stop()
self._stdout_patch.stop()
def testBatchModeAvoidsExtraFullFetch(self):
connection = _DummyMailboxConnection()
parsedmarc.get_dmarc_reports_from_mailbox(
@@ -1446,7 +1485,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"certificate_path setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1518,7 +1557,7 @@ user = owner@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"password setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1559,7 +1598,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testWellKnownFolderFallback(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient()
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1579,7 +1618,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testUnknownFolderStillFails(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient()
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1675,7 +1714,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"client_secret setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1707,7 +1746,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1739,7 +1778,7 @@ tenant_id = tenant-id
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1809,7 +1848,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1840,7 +1879,7 @@ tenant_id = tenant-id
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1872,7 +1911,7 @@ certificate_path = /tmp/msgraph-cert.pem
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1904,7 +1943,7 @@ certificate_path = /tmp/msgraph-cert.pem
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1914,6 +1953,22 @@ certificate_path = /tmp/msgraph-cert.pem
class TestSighupReload(unittest.TestCase):
"""Tests for SIGHUP-driven configuration reload in watch mode."""
def setUp(self):
from parsedmarc.log import logger as _logger
_logger.disabled = True
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
self._stdout_patch.start()
self._stderr_patch.start()
def tearDown(self):
from parsedmarc.log import logger as _logger
_logger.disabled = False
self._stderr_patch.stop()
self._stdout_patch.stop()
_BASE_CONFIG = """[general]
silent = true
@@ -1926,9 +1981,13 @@ password = pass
watch = true
"""
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
@@ -1937,6 +1996,7 @@ watch = true
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
@@ -1950,7 +2010,9 @@ watch = true
"smtp_tls_reports": [],
}
def parse_side_effect(config_file, opts):
mock_load_config.return_value = ConfigParser()
def parse_side_effect(config, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
@@ -1989,12 +2051,16 @@ watch = true
self.assertEqual(cm.exception.code, 1)
# watch_inbox was called twice: initial run + after reload
self.assertEqual(mock_watch.call_count, 2)
# _parse_config_file called for initial load + reload
# _parse_config called for initial load + reload
self.assertGreaterEqual(mock_parse_config.call_count, 2)
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
@@ -2003,6 +2069,7 @@ watch = true
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
@@ -2016,11 +2083,13 @@ watch = true
"smtp_tls_reports": [],
}
mock_load_config.return_value = ConfigParser()
# Initial parse sets required opts; reload parse raises
initial_map = {"prefix_": ["example.com"]}
call_count = [0]
def parse_side_effect(config_file, opts):
def parse_side_effect(config, opts):
call_count[0] += 1
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
@@ -2065,9 +2134,13 @@ watch = true
# The failed reload must not have closed the original clients
initial_clients["s3_client"].close.assert_not_called()
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
@@ -2076,6 +2149,7 @@ watch = true
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
@@ -2089,7 +2163,9 @@ watch = true
"smtp_tls_reports": [],
}
def parse_side_effect(config_file, opts):
mock_load_config.return_value = ConfigParser()
def parse_side_effect(config, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
@@ -2137,7 +2213,10 @@ watch = true
# Old client must have been closed when reload succeeded
old_client.close.assert_called_once()
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@@ -2213,6 +2292,390 @@ watch = true
# Second init (after reload with v2 config): kafka_hosts should be None
self.assertIsNone(init_opts_captures[1].kafka_hosts)
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testReloadRefreshesReverseDnsMap(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
"""SIGHUP reload repopulates the reverse DNS map so lookups still work."""
import signal as signal_module
from parsedmarc import REVERSE_DNS_MAP
mock_imap.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
mock_load_config.return_value = ConfigParser()
def parse_side_effect(config, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
opts.mailbox_watch = True
return None
mock_parse_config.side_effect = parse_side_effect
mock_init_clients.return_value = {}
# Snapshot the map state after each watch_inbox call
map_snapshots = []
watch_calls = [0]
def watch_side_effect(*args, **kwargs):
watch_calls[0] += 1
if watch_calls[0] == 1:
if hasattr(signal_module, "SIGHUP"):
import os
os.kill(os.getpid(), signal_module.SIGHUP)
return
else:
# Capture the map state after reload, before we stop the loop
map_snapshots.append(dict(REVERSE_DNS_MAP))
raise FileExistsError("stop")
mock_watch.side_effect = watch_side_effect
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
cfg.write(self._BASE_CONFIG)
cfg_path = cfg.name
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
# Pre-populate the map so we can verify it gets refreshed
REVERSE_DNS_MAP.clear()
REVERSE_DNS_MAP["stale.example.com"] = {
"name": "Stale",
"type": "stale",
}
original_contents = dict(REVERSE_DNS_MAP)
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
with self.assertRaises(SystemExit):
parsedmarc.cli._main()
self.assertEqual(mock_watch.call_count, 2)
# The map should have been repopulated (not empty, not the stale data)
self.assertEqual(len(map_snapshots), 1)
refreshed = map_snapshots[0]
self.assertGreater(len(refreshed), 0, "Map should not be empty after reload")
self.assertNotEqual(
refreshed,
original_contents,
"Map should have been refreshed, not kept stale data",
)
self.assertNotIn(
"stale.example.com",
refreshed,
"Stale entry should have been cleared by reload",
)
class TestIndexPrefixDomainMapTlsFiltering(unittest.TestCase):
"""Tests that SMTP TLS reports for unmapped domains are filtered out
when index_prefix_domain_map is configured."""
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.IMAPConnection")
def testTlsReportsFilteredByDomainMap(
self,
mock_imap_connection,
mock_get_reports,
):
"""TLS reports for domains not in the map should be silently dropped."""
mock_imap_connection.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [
{
"organization_name": "Allowed Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "allowed-1",
"contact_info": "tls@allowed.example.com",
"policies": [
{
"policy_domain": "allowed.example.com",
"policy_type": "sts",
"successful_session_count": 1,
"failed_session_count": 0,
}
],
},
{
"organization_name": "Unmapped Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "unmapped-1",
"contact_info": "tls@unmapped.example.net",
"policies": [
{
"policy_domain": "unmapped.example.net",
"policy_type": "sts",
"successful_session_count": 5,
"failed_session_count": 0,
}
],
},
{
"organization_name": "Mixed Case Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "mixed-case-1",
"contact_info": "tls@mixedcase.example.com",
"policies": [
{
"policy_domain": "MixedCase.Example.Com",
"policy_type": "sts",
"successful_session_count": 2,
"failed_session_count": 0,
}
],
},
],
}
domain_map = {"tenant_a": ["example.com"]}
with NamedTemporaryFile("w", suffix=".yaml", delete=False) as map_file:
import yaml
yaml.dump(domain_map, map_file)
map_path = map_file.name
self.addCleanup(lambda: os.path.exists(map_path) and os.remove(map_path))
config = f"""[general]
save_smtp_tls = true
silent = false
index_prefix_domain_map = {map_path}
[imap]
host = imap.example.com
user = test-user
password = test-password
"""
with NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
config_file.write(config)
config_path = config_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
captured = io.StringIO()
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
with patch("sys.stdout", captured):
parsedmarc.cli._main()
output = json.loads(captured.getvalue())
tls_reports = output["smtp_tls_reports"]
self.assertEqual(len(tls_reports), 2)
report_ids = {r["report_id"] for r in tls_reports}
self.assertIn("allowed-1", report_ids)
self.assertIn("mixed-case-1", report_ids)
self.assertNotIn("unmapped-1", report_ids)
class TestEnvVarConfig(unittest.TestCase):
"""Tests for environment variable configuration support."""
def test_resolve_section_key_simple(self):
"""Simple section names resolve correctly."""
from parsedmarc.cli import _resolve_section_key
self.assertEqual(_resolve_section_key("IMAP_PASSWORD"), ("imap", "password"))
self.assertEqual(_resolve_section_key("GENERAL_DEBUG"), ("general", "debug"))
self.assertEqual(_resolve_section_key("S3_BUCKET"), ("s3", "bucket"))
self.assertEqual(_resolve_section_key("GELF_HOST"), ("gelf", "host"))
def test_resolve_section_key_underscore_sections(self):
"""Multi-word section names (splunk_hec, gmail_api, etc.) resolve correctly."""
from parsedmarc.cli import _resolve_section_key
self.assertEqual(
_resolve_section_key("SPLUNK_HEC_TOKEN"), ("splunk_hec", "token")
)
self.assertEqual(
_resolve_section_key("GMAIL_API_CREDENTIALS_FILE"),
("gmail_api", "credentials_file"),
)
self.assertEqual(
_resolve_section_key("LOG_ANALYTICS_CLIENT_ID"),
("log_analytics", "client_id"),
)
def test_resolve_section_key_unknown(self):
"""Unknown prefixes return (None, None)."""
from parsedmarc.cli import _resolve_section_key
self.assertEqual(_resolve_section_key("UNKNOWN_FOO"), (None, None))
# Just a section name with no key should not match
self.assertEqual(_resolve_section_key("IMAP"), (None, None))
def test_apply_env_overrides_injects_values(self):
"""Env vars are injected into an existing ConfigParser."""
from configparser import ConfigParser
from parsedmarc.cli import _apply_env_overrides
config = ConfigParser()
config.add_section("imap")
config.set("imap", "host", "original.example.com")
env = {
"PARSEDMARC_IMAP_HOST": "new.example.com",
"PARSEDMARC_IMAP_PASSWORD": "secret123",
}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertEqual(config.get("imap", "host"), "new.example.com")
self.assertEqual(config.get("imap", "password"), "secret123")
def test_apply_env_overrides_creates_sections(self):
"""Env vars create new sections when they don't exist."""
from configparser import ConfigParser
from parsedmarc.cli import _apply_env_overrides
config = ConfigParser()
env = {"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200"}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertTrue(config.has_section("elasticsearch"))
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
def test_apply_env_overrides_ignores_config_file_var(self):
"""PARSEDMARC_CONFIG_FILE is not injected as a config key."""
from configparser import ConfigParser
from parsedmarc.cli import _apply_env_overrides
config = ConfigParser()
env = {"PARSEDMARC_CONFIG_FILE": "/some/path.ini"}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertEqual(config.sections(), [])
def test_load_config_with_file_and_env_override(self):
"""Env vars override values from an INI file."""
from parsedmarc.cli import _load_config
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
f.write(
"[imap]\nhost = file.example.com\nuser = alice\npassword = fromfile\n"
)
f.flush()
config_path = f.name
try:
env = {"PARSEDMARC_IMAP_PASSWORD": "fromenv"}
with patch.dict(os.environ, env, clear=False):
config = _load_config(config_path)
self.assertEqual(config.get("imap", "host"), "file.example.com")
self.assertEqual(config.get("imap", "user"), "alice")
self.assertEqual(config.get("imap", "password"), "fromenv")
finally:
os.unlink(config_path)
def test_load_config_env_only(self):
"""Config can be loaded purely from env vars with no file."""
from parsedmarc.cli import _load_config
env = {
"PARSEDMARC_GENERAL_DEBUG": "true",
"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200",
}
with patch.dict(os.environ, env, clear=False):
config = _load_config(None)
self.assertEqual(config.get("general", "debug"), "true")
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
def test_parse_config_from_env(self):
"""Full round-trip: env vars -> ConfigParser -> opts."""
from argparse import Namespace
from parsedmarc.cli import _load_config, _parse_config
env = {
"PARSEDMARC_GENERAL_DEBUG": "true",
"PARSEDMARC_GENERAL_SAVE_AGGREGATE": "true",
"PARSEDMARC_GENERAL_OFFLINE": "true",
}
with patch.dict(os.environ, env, clear=False):
config = _load_config(None)
opts = Namespace()
_parse_config(config, opts)
self.assertTrue(opts.debug)
self.assertTrue(opts.save_aggregate)
self.assertTrue(opts.offline)
def test_config_file_env_var(self):
"""PARSEDMARC_CONFIG_FILE env var specifies the config file path."""
from argparse import Namespace
from parsedmarc.cli import _load_config, _parse_config
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
f.write("[general]\ndebug = true\noffline = true\n")
f.flush()
config_path = f.name
try:
env = {"PARSEDMARC_CONFIG_FILE": config_path}
with patch.dict(os.environ, env, clear=False):
config = _load_config(os.environ.get("PARSEDMARC_CONFIG_FILE"))
opts = Namespace()
_parse_config(config, opts)
self.assertTrue(opts.debug)
self.assertTrue(opts.offline)
finally:
os.unlink(config_path)
def test_boolean_values_from_env(self):
"""Various boolean string representations work through ConfigParser."""
from configparser import ConfigParser
from parsedmarc.cli import _apply_env_overrides
for true_val in ("true", "yes", "1", "on", "True", "YES"):
config = ConfigParser()
env = {"PARSEDMARC_GENERAL_DEBUG": true_val}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertTrue(
config.getboolean("general", "debug"),
f"Expected truthy for {true_val!r}",
)
for false_val in ("false", "no", "0", "off", "False", "NO"):
config = ConfigParser()
env = {"PARSEDMARC_GENERAL_DEBUG": false_val}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertFalse(
config.getboolean("general", "debug"),
f"Expected falsy for {false_val!r}",
)
if __name__ == "__main__":
unittest.main(verbosity=2)