Compare commits

..

30 Commits

Author SHA1 Message Date
Sean Whalen
2b10adaaf4 Refactor tests to use assertions consistently and improve type hints 2026-03-21 16:06:41 -04:00
Sean Whalen
49edcb98ec Refactor GelfClient methods to use specific report types instead of generic dicts 2026-03-21 15:57:08 -04:00
Sean Whalen
b0fc433599 Bump version to 9.3.0 in constants.py 2026-03-21 15:52:34 -04:00
Sean Whalen
97ca618d55 Improve error logging for Elasticsearch and OpenSearch exceptions 2026-03-21 15:38:56 -04:00
Sean Whalen
253695b30c Restore startup configuration checks 2026-03-21 15:17:25 -04:00
Sean Whalen
1035983f4b Rename 'should_reload' parameter to 'config_reloading' in mailbox connection methods for clarity 2026-03-21 15:08:27 -04:00
Sean Whalen
5607cd9411 Remove incorrect IMAP changes 2026-03-21 14:57:14 -04:00
Sean Whalen
860cfdd148 make single list items on one line in the changelog instead of doing hard wraps 2026-03-21 14:31:46 -04:00
Sean Whalen
7b6fcb19da Update CHANGELOG..md 2026-03-21 14:27:20 -04:00
Sean Whalen
ef51d6e2f9 Fix changelog entry for msgraph configuration check 2026-03-21 12:00:19 -04:00
Sean Whalen
ea225e2340 Refactor changelog entries for clarity and consistency in configuration reload section 2026-03-21 11:44:47 -04:00
Sean Whalen
7ec02137b8 Update changelog to not include fixes within the same unreleased version 2026-03-21 11:30:07 -04:00
Sean Whalen
8567c73358 Enhance resource management: add close methods for S3Client and HECClient, and improve IMAP connection handling during IDLE. Update CHANGELOG.md for config reload improvements and bug fixes. 2026-03-20 22:19:28 -04:00
Sean Whalen
4e716a6087 Add pytest command to settings for silent output during testing 2026-03-20 21:44:48 -04:00
Sean Whalen
955b098ef1 Update CHANGELOG.md to reflect config reload enhancements 2026-03-20 21:44:31 -04:00
Sean Whalen
ab89b1654e Enhance usage documentation for config reload: clarify behavior on successful reload and error handling 2026-03-20 20:12:25 -04:00
Sean Whalen
9d5a9a728d Reverted changes by copilot that turned errors into warnings 2026-03-20 19:57:07 -04:00
Copilot
051dd75b40 SIGHUP-based config reload for watch mode: address review feedback (#705)
* Initial plan

* Address review feedback: Kafka SSL context, SIGHUP handler safety, test formatting

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/8f2fd48f-32a4-4258-9a89-06f7c7ac29bf

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 19:31:16 -04:00
Copilot
d2a0f85303 Ensure SIGHUP never triggers a new email batch across all watch() implementations (#704)
* Initial plan

* Ensure SIGHUP never starts a new email batch in any watch() implementation

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/45d5be30-8f6b-4200-9bdd-15c655033f17

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 19:12:46 -04:00
Copilot
565c415280 Fix resource leak when HEC config is invalid in _init_output_clients() (#703)
* Initial plan

* Fix resource leak: validate HEC settings before creating any output clients

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/38c73e09-789d-4d41-b75e-bbc61418859d

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 18:54:06 -04:00
Sean Whalen
7688d00226 Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 18:49:09 -04:00
Copilot
8796c7e3bd Fix SIGHUP reload tight-loop in watch mode (#702)
* Initial plan

* Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 17:31:45 -04:00
Copilot
a05eb0807a Best-effort initialization for optional output clients in watch mode (#701)
* Initial plan

* Wrap optional output client init in try/except for best-effort initialization

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 17:18:08 -04:00
Sean Whalen
6fceb3e2ce Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 17:07:20 -04:00
Copilot
510d5d05a9 SIGHUP-based configuration reload: address review feedback (#700)
* Initial plan

* Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 16:57:00 -04:00
Copilot
3445438684 [WIP] SIGHUP-based configuration reload for watch mode (#699)
* Initial plan

* Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 15:54:40 -04:00
Copilot
17defb75b0 [WIP] SIGHUP-based configuration reload for watch mode (#698)
* Initial plan

* Fix reload state consistency, resource leaks, stale opts; add tests

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 15:40:44 -04:00
Sean Whalen
893d0a4f03 Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 15:28:38 -04:00
Sean Whalen
58e07140a8 Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 15:27:22 -04:00
Sean Whalen
dfdffe4947 Enhance mailbox connection watch method to support reload functionality
- Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so.
- Modified related tests to accommodate the new method signature.
- Changed logger calls from `critical` to `error` for consistency in logging severity.
- Added a new settings file for Claude with specific permissions for testing and code checks.
2026-03-20 15:00:21 -04:00
17 changed files with 265 additions and 1341 deletions

View File

@@ -52,7 +52,6 @@
"geoipupdate",
"Geolite",
"geolocation",
"getuid",
"githubpages",
"Grafana",
"hostnames",
@@ -76,7 +75,6 @@
"LISTSERV",
"loganalytics",
"lxml",
"Maildir",
"mailparser",
"mailrelay",
"mailsuite",

View File

@@ -1,66 +1,5 @@
# Changelog
## 9.5.3
### Fixed
- Fixed `FileNotFoundError` when using Maildir with Docker volume mounts. Python's `mailbox.Maildir(create=True)` only creates `cur/new/tmp` subdirectories when the top-level directory doesn't exist; Docker volume mounts pre-create the directory as empty, skipping subdirectory creation. parsedmarc now explicitly creates the subdirectories when `maildir_create` is enabled.
- Maildir UID mismatch no longer crashes the process. In Docker containers where volume ownership differs from the container UID, parsedmarc now logs a warning instead of raising an exception. Also handles `os.setuid` failures gracefully in containers without `CAP_SETUID`.
- Token file writes (MS Graph and Gmail) now create parent directories automatically, preventing `FileNotFoundError` when the token path points to a directory that doesn't yet exist.
- File paths from config (`token_file`, `credentials_file`, `cert_path`, `log_file`, `output`, `ip_db_path`, `maildir_path`, syslog cert paths, etc.) now expand `~` and `$VAR` references via `os.path.expanduser`/`os.path.expandvars`.
## 9.5.2
### Fixed
- Fixed `ValueError: invalid interpolation syntax` when config values (from env vars or INI files) contain `%` characters, such as in passwords. Disabled ConfigParser's `%`-based string interpolation.
## 9.5.1
### Changes
- Correct ISO format for MSGraphConnection timestamps (PR #706)
## 9.5.0
### Added
- Environment variable configuration support: any config option can now be set via `PARSEDMARC_{SECTION}_{KEY}` environment variables (e.g. `PARSEDMARC_IMAP_PASSWORD`, `PARSEDMARC_SPLUNK_HEC_TOKEN`). Environment variables override config file values but are overridden by CLI arguments.
- `PARSEDMARC_CONFIG_FILE` environment variable to specify the config file path without the `-c` flag.
- Env-only mode: parsedmarc can now run without a config file when `PARSEDMARC_*` environment variables are set, enabling fully file-less Docker deployments.
- Explicit read permission check on config file, giving a clear error message when the container UID cannot read the file (e.g. `chmod 600` with a UID mismatch).
## 9.4.0
### Added
- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
- Add premade OpenSearch index patterns, visualizations, and dashboards
### Changed
- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
- Bump OpenSearch support to `< 4`
### Fixed
- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
## 9.3.1
### Breaking changes
- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
### Fixed
- Splunk HEC `skip_certificate_verification` now works correctly
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
## 9.3.0
### Added

View File

@@ -15,7 +15,7 @@ services:
condition: service_healthy
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:3
image: opensearchproject/opensearch-dashboards:2
environment:
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
ports:
@@ -27,7 +27,7 @@ services:
grafana:
image: grafana/grafana:latest
environment:
- GRAFANA_PASSWORD=${GRAFANA_PASSWORD}
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
ports:
- "127.0.0.1:3000:3000"
@@ -41,7 +41,5 @@ services:
- SPLUNK_START_ARGS=--accept-license
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
ports:
- "127.0.0.1:8000:8000"
- "127.0.0.1:8088:8088"

View File

@@ -273,8 +273,6 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -302,8 +300,6 @@ The full set of configuration options are:
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
- `cert_path` - str: Path to a trusted certificates
- `skip_certificate_verification` - bool: Skip certificate
verification (not recommended)
- `index_suffix` - str: A suffix to apply to the index names
- `index_prefix` - str: A prefix to apply to the index names
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
@@ -531,96 +527,6 @@ PUT _cluster/settings
Increasing this value increases resource usage.
:::
## Environment variable configuration
Any configuration option can be set via environment variables using the
naming convention `PARSEDMARC_{SECTION}_{KEY}` (uppercase). This is
especially useful for Docker deployments where file permissions make it
difficult to use config files for secrets.
**Priority order:** CLI arguments > environment variables > config file > defaults
### Examples
```bash
# Set IMAP credentials via env vars
export PARSEDMARC_IMAP_HOST=imap.example.com
export PARSEDMARC_IMAP_USER=dmarc@example.com
export PARSEDMARC_IMAP_PASSWORD=secret
# Elasticsearch
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://localhost:9200
export PARSEDMARC_ELASTICSEARCH_SSL=false
# Splunk HEC (note: section name splunk_hec becomes SPLUNK_HEC)
export PARSEDMARC_SPLUNK_HEC_URL=https://splunk.example.com
export PARSEDMARC_SPLUNK_HEC_TOKEN=my-hec-token
export PARSEDMARC_SPLUNK_HEC_INDEX=email
# General settings
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
export PARSEDMARC_GENERAL_DEBUG=true
```
### Specifying the config file via environment variable
```bash
export PARSEDMARC_CONFIG_FILE=/etc/parsedmarc.ini
parsedmarc
```
### Running without a config file (env-only mode)
When no config file is given (neither `-c` flag nor `PARSEDMARC_CONFIG_FILE`),
parsedmarc will still pick up any `PARSEDMARC_*` environment variables. This
enables fully file-less deployments:
```bash
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
export PARSEDMARC_GENERAL_OFFLINE=true
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://elasticsearch:9200
parsedmarc /path/to/reports/*
```
### Docker Compose example
```yaml
services:
parsedmarc:
image: parsedmarc:latest
environment:
PARSEDMARC_IMAP_HOST: imap.example.com
PARSEDMARC_IMAP_USER: dmarc@example.com
PARSEDMARC_IMAP_PASSWORD: ${IMAP_PASSWORD}
PARSEDMARC_MAILBOX_WATCH: "true"
PARSEDMARC_ELASTICSEARCH_HOSTS: http://elasticsearch:9200
PARSEDMARC_GENERAL_SAVE_AGGREGATE: "true"
PARSEDMARC_GENERAL_SAVE_FORENSIC: "true"
```
### Section name mapping
For sections with underscores in the name, the full section name is used:
| Section | Env var prefix |
|------------------|-------------------------------|
| `general` | `PARSEDMARC_GENERAL_` |
| `mailbox` | `PARSEDMARC_MAILBOX_` |
| `imap` | `PARSEDMARC_IMAP_` |
| `msgraph` | `PARSEDMARC_MSGRAPH_` |
| `elasticsearch` | `PARSEDMARC_ELASTICSEARCH_` |
| `opensearch` | `PARSEDMARC_OPENSEARCH_` |
| `splunk_hec` | `PARSEDMARC_SPLUNK_HEC_` |
| `kafka` | `PARSEDMARC_KAFKA_` |
| `smtp` | `PARSEDMARC_SMTP_` |
| `s3` | `PARSEDMARC_S3_` |
| `syslog` | `PARSEDMARC_SYSLOG_` |
| `gmail_api` | `PARSEDMARC_GMAIL_API_` |
| `maildir` | `PARSEDMARC_MAILDIR_` |
| `log_analytics` | `PARSEDMARC_LOG_ANALYTICS_` |
| `gelf` | `PARSEDMARC_GELF_` |
| `webhook` | `PARSEDMARC_WEBHOOK_` |
## Performance tuning
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount

File diff suppressed because one or more lines are too long

View File

@@ -1955,7 +1955,9 @@ def get_dmarc_reports_from_mailbox(
)
current_time = datetime.now(timezone.utc).strftime("%d-%b-%Y")
elif isinstance(connection, MSGraphConnection):
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).isoformat()
since = (
datetime.now(timezone.utc) - timedelta(minutes=_since)
).isoformat() + "Z"
current_time = datetime.now(timezone.utc).isoformat() + "Z"
elif isinstance(connection, GmailConnection):
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime(

View File

@@ -19,7 +19,6 @@ import yaml
from tqdm import tqdm
from parsedmarc import (
REVERSE_DNS_MAP,
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
ParserError,
@@ -49,12 +48,7 @@ from parsedmarc.mail import (
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import (
get_base_domain,
get_reverse_dns,
is_mbox,
load_reverse_dns_map,
)
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
@@ -75,84 +69,6 @@ def _str_to_list(s):
return list(map(lambda i: i.lstrip(), _list))
def _expand_path(p: str) -> str:
"""Expand ``~`` and ``$VAR`` references in a file path."""
return os.path.expanduser(os.path.expandvars(p))
# All known INI config section names, used for env var resolution.
_KNOWN_SECTIONS = frozenset(
{
"general",
"mailbox",
"imap",
"msgraph",
"elasticsearch",
"opensearch",
"splunk_hec",
"kafka",
"smtp",
"s3",
"syslog",
"gmail_api",
"maildir",
"log_analytics",
"gelf",
"webhook",
}
)
def _resolve_section_key(suffix: str) -> tuple:
"""Resolve an env var suffix like ``IMAP_PASSWORD`` to ``('imap', 'password')``.
Uses longest-prefix matching against known section names so that
multi-word sections like ``splunk_hec`` are handled correctly.
Returns ``(None, None)`` when no known section matches.
"""
suffix_lower = suffix.lower()
best_section = None
best_key = None
for section in _KNOWN_SECTIONS:
section_prefix = section + "_"
if suffix_lower.startswith(section_prefix):
key = suffix_lower[len(section_prefix) :]
if key and (best_section is None or len(section) > len(best_section)):
best_section = section
best_key = key
return best_section, best_key
def _apply_env_overrides(config: ConfigParser) -> None:
"""Inject ``PARSEDMARC_*`` environment variables into *config*.
Environment variables matching ``PARSEDMARC_{SECTION}_{KEY}`` override
(or create) the corresponding config-file values. Sections are created
automatically when they do not yet exist.
"""
prefix = "PARSEDMARC_"
for env_key, env_value in os.environ.items():
if not env_key.startswith(prefix) or env_key == "PARSEDMARC_CONFIG_FILE":
continue
suffix = env_key[len(prefix) :]
section, key = _resolve_section_key(suffix)
if section is None:
logger.debug("Ignoring unrecognized env var: %s", env_key)
continue
if not config.has_section(section):
config.add_section(section)
config.set(section, key, env_value)
logger.debug("Config override from env: [%s] %s", section, key)
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
@@ -256,39 +172,12 @@ class ConfigurationError(Exception):
pass
def _load_config(config_file: str | None = None) -> ConfigParser:
"""Load configuration from an INI file and/or environment variables.
def _parse_config_file(config_file, opts):
"""Parse a config file and update opts in place.
Args:
config_file: Optional path to an .ini config file.
Returns:
A ``ConfigParser`` populated from the file (if given) and from any
``PARSEDMARC_*`` environment variables.
Raises:
ConfigurationError: If *config_file* is given but does not exist.
"""
config = ConfigParser(interpolation=None)
if config_file is not None:
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
if not os.access(abs_path, os.R_OK):
raise ConfigurationError(
"Unable to read {0} — check file permissions".format(abs_path)
)
config.read(config_file)
_apply_env_overrides(config)
return config
def _parse_config(config: ConfigParser, opts):
"""Apply a loaded ``ConfigParser`` to *opts* in place.
Args:
config: A ``ConfigParser`` (from ``_load_config``).
opts: Namespace object to update with parsed values.
config_file: Path to the .ini config file
opts: Namespace object to update with parsed values
Returns:
index_prefix_domain_map or None
@@ -296,8 +185,13 @@ def _parse_config(config: ConfigParser, opts):
Raises:
ConfigurationError: If required settings are missing or invalid.
"""
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
opts.silent = True
config = ConfigParser()
index_prefix_domain_map = None
config.read(config_file)
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
@@ -307,7 +201,7 @@ def _parse_config(config: ConfigParser, opts):
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
with open(_expand_path(general_config["index_prefix_domain_map"])) as f:
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
@@ -316,7 +210,7 @@ def _parse_config(config: ConfigParser, opts):
general_config.getboolean("strip_attachment_payloads")
)
if "output" in general_config:
opts.output = _expand_path(general_config["output"])
opts.output = general_config["output"]
if "aggregate_json_filename" in general_config:
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
if "forensic_json_filename" in general_config:
@@ -372,11 +266,11 @@ def _parse_config(config: ConfigParser, opts):
general_config.getboolean("fail_on_output_error")
)
if "log_file" in general_config:
opts.log_file = _expand_path(general_config["log_file"])
opts.log_file = general_config["log_file"]
if "n_procs" in general_config:
opts.n_procs = general_config.getint("n_procs")
if "ip_db_path" in general_config:
opts.ip_db_path = _expand_path(general_config["ip_db_path"])
opts.ip_db_path = general_config["ip_db_path"]
else:
opts.ip_db_path = None
if "always_use_local_files" in general_config:
@@ -384,9 +278,7 @@ def _parse_config(config: ConfigParser, opts):
general_config.getboolean("always_use_local_files")
)
if "local_reverse_dns_map_path" in general_config:
opts.reverse_dns_map_path = _expand_path(
general_config["local_reverse_dns_map_path"]
)
opts.reverse_dns_map_path = general_config["local_reverse_dns_map_path"]
if "reverse_dns_map_url" in general_config:
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
if "prettify_json" in general_config:
@@ -501,7 +393,7 @@ def _parse_config(config: ConfigParser, opts):
if "msgraph" in config.sections():
graph_config = config["msgraph"]
opts.graph_token_file = _expand_path(graph_config.get("token_file", ".token"))
opts.graph_token_file = graph_config.get("token_file", ".token")
if "auth_method" not in graph_config:
logger.info(
@@ -555,9 +447,7 @@ def _parse_config(config: ConfigParser, opts):
if opts.graph_auth_method == AuthMethod.Certificate.name:
if "certificate_path" in graph_config:
opts.graph_certificate_path = _expand_path(
graph_config["certificate_path"]
)
opts.graph_certificate_path = graph_config["certificate_path"]
else:
raise ConfigurationError(
"certificate_path setting missing from the msgraph config section"
@@ -614,13 +504,7 @@ def _parse_config(config: ConfigParser, opts):
if "ssl" in elasticsearch_config:
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = _expand_path(
elasticsearch_config["cert_path"]
)
if "skip_certificate_verification" in elasticsearch_config:
opts.elasticsearch_skip_certificate_verification = bool(
elasticsearch_config.getboolean("skip_certificate_verification")
)
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
if "user" in elasticsearch_config:
opts.elasticsearch_username = elasticsearch_config["user"]
if "password" in elasticsearch_config:
@@ -659,11 +543,7 @@ def _parse_config(config: ConfigParser, opts):
if "ssl" in opensearch_config:
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = _expand_path(opensearch_config["cert_path"])
if "skip_certificate_verification" in opensearch_config:
opts.opensearch_skip_certificate_verification = bool(
opensearch_config.getboolean("skip_certificate_verification")
)
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
if "user" in opensearch_config:
opts.opensearch_username = opensearch_config["user"]
if "password" in opensearch_config:
@@ -786,7 +666,7 @@ def _parse_config(config: ConfigParser, opts):
if "subject" in smtp_config:
opts.smtp_subject = smtp_config["subject"]
if "attachment" in smtp_config:
opts.smtp_attachment = _expand_path(smtp_config["attachment"])
opts.smtp_attachment = smtp_config["attachment"]
if "message" in smtp_config:
opts.smtp_message = smtp_config["message"]
@@ -833,11 +713,11 @@ def _parse_config(config: ConfigParser, opts):
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = _expand_path(syslog_config["cafile_path"])
opts.syslog_cafile_path = syslog_config["cafile_path"]
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = _expand_path(syslog_config["certfile_path"])
opts.syslog_certfile_path = syslog_config["certfile_path"]
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = _expand_path(syslog_config["keyfile_path"])
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
@@ -853,13 +733,8 @@ def _parse_config(config: ConfigParser, opts):
if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"]
gmail_creds = gmail_api_config.get("credentials_file")
opts.gmail_api_credentials_file = (
_expand_path(gmail_creds) if gmail_creds else gmail_creds
)
opts.gmail_api_token_file = _expand_path(
gmail_api_config.get("token_file", ".token")
)
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
opts.gmail_api_include_spam_trash = bool(
gmail_api_config.getboolean("include_spam_trash", False)
)
@@ -884,8 +759,7 @@ def _parse_config(config: ConfigParser, opts):
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
maildir_p = maildir_api_config.get("maildir_path")
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
opts.maildir_path = maildir_api_config.get("maildir_path")
opts.maildir_create = bool(
maildir_api_config.getboolean("maildir_create", fallback=False)
)
@@ -979,95 +853,77 @@ def _init_output_clients(opts):
"""
clients = {}
try:
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
except Exception as e:
raise RuntimeError(f"S3: {e}") from e
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
try:
if opts.syslog_server:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
except Exception as e:
raise RuntimeError(f"Syslog: {e}") from e
if opts.syslog_server:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
try:
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
except Exception as e:
raise RuntimeError(f"Splunk HEC: {e}") from e
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
try:
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
except Exception as e:
raise RuntimeError(f"Kafka: {e}") from e
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
try:
if opts.gelf_host:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception as e:
raise RuntimeError(f"GELF: {e}") from e
if opts.gelf_host:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
try:
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
except Exception as e:
raise RuntimeError(f"Webhook: {e}") from e
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
# Elasticsearch and OpenSearch mutate module-level global state via
# connections.create_connection(), which cannot be rolled back if a later
@@ -1075,84 +931,76 @@ def _init_output_clients(opts):
# successfully first; this minimises the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
try:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
skip_certificate_verification=opts.elasticsearch_skip_certificate_verification,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
except Exception as e:
raise RuntimeError(f"Elasticsearch: {e}") from e
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
try:
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
skip_certificate_verification=opts.opensearch_skip_certificate_verification,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
clients["opensearch"] = _OpenSearchHandle()
except Exception as e:
raise RuntimeError(f"OpenSearch: {e}") from e
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
clients["opensearch"] = _OpenSearchHandle()
return clients
@@ -1186,22 +1034,20 @@ def _main():
elif "reported_domain" in report:
domain = report["reported_domain"]
elif "policies" in report:
domain = report["policies"][0]["policy_domain"]
domain = report["policies"][0]["domain"]
if domain:
domain = get_base_domain(domain)
if domain:
domain = domain.lower()
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
return None
def process_reports(reports_):
@@ -1212,22 +1058,6 @@ def _main():
logger.error(message)
output_errors.append(message)
if index_prefix_domain_map is not None:
filtered_tls = []
for report in reports_.get("smtp_tls_reports", []):
if get_index_prefix(report) is not None:
filtered_tls.append(report)
else:
domain = "unknown"
if "policies" in report and report["policies"]:
domain = report["policies"][0].get("policy_domain", "unknown")
logger.debug(
"Ignoring SMTP TLS report for domain not in "
"index_prefix_domain_map: %s",
domain,
)
reports_["smtp_tls_reports"] = filtered_tls
indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
@@ -1699,7 +1529,6 @@ def _main():
elasticsearch_index_prefix=None,
elasticsearch_ssl=True,
elasticsearch_ssl_cert_path=None,
elasticsearch_skip_certificate_verification=False,
elasticsearch_monthly_indexes=False,
elasticsearch_username=None,
elasticsearch_password=None,
@@ -1712,7 +1541,6 @@ def _main():
opensearch_index_prefix=None,
opensearch_ssl=True,
opensearch_ssl_cert_path=None,
opensearch_skip_certificate_verification=False,
opensearch_monthly_indexes=False,
opensearch_username=None,
opensearch_password=None,
@@ -1795,16 +1623,9 @@ def _main():
index_prefix_domain_map = None
config_file = args.config_file or os.environ.get("PARSEDMARC_CONFIG_FILE")
has_env_config = any(
k.startswith("PARSEDMARC_") and k != "PARSEDMARC_CONFIG_FILE"
for k in os.environ
)
if config_file or has_env_config:
if args.config_file:
try:
config = _load_config(config_file)
index_prefix_domain_map = _parse_config(config, opts)
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
except ConfigurationError as e:
logger.critical(str(e))
exit(-1)
@@ -1845,6 +1666,12 @@ def _main():
# Initialize output clients
try:
clients = _init_output_clients(opts)
except elastic.ElasticsearchError as e:
logger.exception("Elasticsearch Error: {0}".format(e))
exit(1)
except opensearch.OpenSearchError as e:
logger.exception("OpenSearch Error: {0}".format(e))
exit(1)
except ConfigurationError as e:
logger.critical(str(e))
exit(1)
@@ -2221,25 +2048,15 @@ def _main():
# Build a fresh opts starting from CLI-only defaults so that
# sections removed from the config file actually take effect.
new_opts = Namespace(**vars(opts_from_cli))
new_config = _load_config(config_file)
new_index_prefix_domain_map = _parse_config(new_config, new_opts)
new_index_prefix_domain_map = _parse_config_file(
args.config_file, new_opts
)
new_clients = _init_output_clients(new_opts)
# All steps succeeded — commit the changes atomically.
_close_output_clients(clients)
clients = new_clients
index_prefix_domain_map = new_index_prefix_domain_map
# Reload the reverse DNS map so changes to the
# map path/URL in the config take effect.
load_reverse_dns_map(
REVERSE_DNS_MAP,
always_use_local_file=new_opts.always_use_local_files,
local_file_path=new_opts.reverse_dns_map_path,
url=new_opts.reverse_dns_map_url,
offline=new_opts.offline,
)
for k, v in vars(new_opts).items():
setattr(opts, k, v)

View File

@@ -1,3 +1,3 @@
__version__ = "9.5.3"
__version__ = "9.3.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -268,7 +268,6 @@ def set_hosts(
*,
use_ssl: bool = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -281,7 +280,6 @@ def set_hosts(
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -293,11 +291,10 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["ca_certs"] = ssl_cert_path
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
@@ -738,7 +735,6 @@ def save_smtp_tls_report_to_elasticsearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date

View File

@@ -55,7 +55,6 @@ def _get_creds(
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes)
creds = flow.run_local_server(open_browser=False, oauth2_port=oauth2_port)
# Save the credentials for the next run
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
with Path(token_file).open("w") as token:
token.write(creds.to_json())
return creds

View File

@@ -56,7 +56,6 @@ def _load_token(token_path: Path) -> Optional[str]:
def _cache_auth_record(record: AuthenticationRecord, token_path: Path):
token = record.serialize()
token_path.parent.mkdir(parents=True, exist_ok=True)
with token_path.open("w") as token_file:
token_file.write(token)

View File

@@ -19,32 +19,18 @@ class MaildirConnection(MailboxConnection):
):
self._maildir_path = maildir_path
self._maildir_create = maildir_create
try:
maildir_owner = os.stat(maildir_path).st_uid
except OSError:
maildir_owner = None
current_uid = os.getuid()
if maildir_owner is not None and current_uid != maildir_owner:
if current_uid == 0:
try:
logger.warning(
"Switching uid to {} to access Maildir".format(maildir_owner)
)
os.setuid(maildir_owner)
except OSError as e:
logger.warning(
"Failed to switch uid to {}: {}".format(maildir_owner, e)
)
else:
maildir_owner = os.stat(maildir_path).st_uid
if os.getuid() != maildir_owner:
if os.getuid() == 0:
logger.warning(
"Runtime uid {} differs from maildir {} owner {}. "
"Access may fail if permissions are insufficient.".format(
current_uid, maildir_path, maildir_owner
)
"Switching uid to {} to access Maildir".format(maildir_owner)
)
if maildir_create:
for subdir in ("cur", "new", "tmp"):
os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True)
os.setuid(maildir_owner)
else:
ex = "runtime uid {} differ from maildir {} owner {}".format(
os.getuid(), maildir_path, maildir_owner
)
raise Exception(ex)
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
self._subfolder_client: Dict[str, mailbox.Maildir] = {}

View File

@@ -271,7 +271,6 @@ def set_hosts(
*,
use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None,
skip_certificate_verification: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
@@ -287,7 +286,6 @@ def set_hosts(
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
skip_certificate_verification (bool): Skip certificate verification
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
@@ -302,11 +300,10 @@ def set_hosts(
if use_ssl:
conn_params["use_ssl"] = True
if ssl_cert_path:
conn_params["ca_certs"] = ssl_cert_path
if skip_certificate_verification:
conn_params["verify_certs"] = False
else:
conn_params["verify_certs"] = True
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
normalized_auth_type = (auth_type or "basic").strip().lower()
if normalized_auth_type == "awssigv4":
if not aws_region:
@@ -767,7 +764,6 @@ def save_smtp_tls_report_to_opensearch(
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
report = report.copy()
report["begin_date"] = begin_date
report["end_date"] = end_date

View File

@@ -58,7 +58,7 @@ class HECClient(object):
self.source = source
self.session = requests.Session()
self.timeout = timeout
self.verify = verify
self.session.verify = verify
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
host=self.host, source=self.source, index=self.index
)
@@ -124,12 +124,10 @@ class HECClient(object):
data["event"] = new_report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
@@ -163,12 +161,10 @@ class HECClient(object):
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())
@@ -202,12 +198,10 @@ class HECClient(object):
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
if not self.verify:
if not self.session.verify:
logger.debug("Skipping certificate verification for Splunk HEC")
try:
response = self.session.post(
self.url, data=json_str, verify=self.verify, timeout=self.timeout
)
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
response = response.json()
except Exception as e:
raise SplunkError(e.__str__())

View File

@@ -335,76 +335,6 @@ def get_ip_address_country(
return country
def load_reverse_dns_map(
reverse_dns_map: ReverseDNSMap,
*,
always_use_local_file: bool = False,
local_file_path: Optional[str] = None,
url: Optional[str] = None,
offline: bool = False,
) -> None:
"""
Loads the reverse DNS map from a URL or local file.
Clears and repopulates the given map dict in place. If the map is
fetched from a URL, that is tried first; on failure (or if offline/local
mode is selected) the bundled CSV is used as a fallback.
Args:
reverse_dns_map (dict): The map dict to populate (modified in place)
always_use_local_file (bool): Always use a local map file
local_file_path (str): Path to a local map file
url (str): URL to a reverse DNS map
offline (bool): Use the built-in copy of the reverse DNS map
"""
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map.clear()
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = {
"name": row["name"].strip(),
"type": row["type"].strip(),
}
csv_file = io.StringIO()
if not (offline or always_use_local_file):
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
def get_service_from_reverse_dns_base_domain(
base_domain,
*,
@@ -431,21 +361,55 @@ def get_service_from_reverse_dns_base_domain(
"""
base_domain = base_domain.lower().strip()
if url is None:
url = (
"https://raw.githubusercontent.com/domainaware"
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None:
reverse_dns_map_value = {}
else:
reverse_dns_map_value = reverse_dns_map
if len(reverse_dns_map_value) == 0:
load_reverse_dns_map(
reverse_dns_map_value,
always_use_local_file=always_use_local_file,
local_file_path=local_file_path,
url=url,
offline=offline,
)
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers)
response.raise_for_status()
csv_file.write(response.text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
except Exception:
logger.warning("Not a valid CSV file")
csv_file.seek(0)
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map_value) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
)
if local_file_path is not None:
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
service: ReverseDNSService
try:
service = reverse_dns_map_value[base_domain]

View File

@@ -50,7 +50,7 @@ dependencies = [
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=4.0.0",
"opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0",
"pygelf>=0.4.2",
"requests>=2.22.0",

670
tests.py
View File

@@ -3,15 +3,12 @@
from __future__ import absolute_import, print_function, unicode_literals
import io
import json
import os
import signal
import sys
import tempfile
import unittest
from base64 import urlsafe_b64encode
from configparser import ConfigParser
from glob import glob
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
@@ -205,7 +202,9 @@ class Test(unittest.TestCase):
sample_content, offline=OFFLINE_MODE
)
assert email_result["report_type"] == "forensic"
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
result = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)
assert result["report_type"] == "forensic"
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
print("Passed!")
@@ -218,7 +217,9 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
result = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)
assert result["report_type"] == "smtp_tls"
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
print("Passed!")
@@ -1280,22 +1281,6 @@ class TestImapFallbacks(unittest.TestCase):
class TestMailboxWatchSince(unittest.TestCase):
def setUp(self):
from parsedmarc.log import logger as _logger
_logger.disabled = True
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
self._stdout_patch.start()
self._stderr_patch.start()
def tearDown(self):
from parsedmarc.log import logger as _logger
_logger.disabled = False
self._stderr_patch.stop()
self._stdout_patch.stop()
def testWatchInboxPassesSinceToMailboxFetch(self):
mailbox_connection = SimpleNamespace()
@@ -1310,9 +1295,7 @@ class TestMailboxWatchSince(unittest.TestCase):
) as mocked:
with self.assertRaises(_BreakLoop):
parsedmarc.watch_inbox(
mailbox_connection=cast(
parsedmarc.MailboxConnection, mailbox_connection
),
mailbox_connection=cast(parsedmarc.MailboxConnection, mailbox_connection),
callback=callback,
check_timeout=1,
batch_size=10,
@@ -1388,22 +1371,6 @@ class _DummyMailboxConnection(parsedmarc.MailboxConnection):
class TestMailboxPerformance(unittest.TestCase):
def setUp(self):
from parsedmarc.log import logger as _logger
_logger.disabled = True
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
self._stdout_patch.start()
self._stderr_patch.start()
def tearDown(self):
from parsedmarc.log import logger as _logger
_logger.disabled = False
self._stderr_patch.stop()
self._stdout_patch.stop()
def testBatchModeAvoidsExtraFullFetch(self):
connection = _DummyMailboxConnection()
parsedmarc.get_dmarc_reports_from_mailbox(
@@ -1953,22 +1920,6 @@ certificate_path = /tmp/msgraph-cert.pem
class TestSighupReload(unittest.TestCase):
"""Tests for SIGHUP-driven configuration reload in watch mode."""
def setUp(self):
from parsedmarc.log import logger as _logger
_logger.disabled = True
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
self._stdout_patch.start()
self._stderr_patch.start()
def tearDown(self):
from parsedmarc.log import logger as _logger
_logger.disabled = False
self._stderr_patch.stop()
self._stdout_patch.stop()
_BASE_CONFIG = """[general]
silent = true
@@ -1986,8 +1937,7 @@ watch = true
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
@@ -1996,7 +1946,6 @@ watch = true
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
@@ -2010,9 +1959,7 @@ watch = true
"smtp_tls_reports": [],
}
mock_load_config.return_value = ConfigParser()
def parse_side_effect(config, opts):
def parse_side_effect(config_file, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
@@ -2051,7 +1998,7 @@ watch = true
self.assertEqual(cm.exception.code, 1)
# watch_inbox was called twice: initial run + after reload
self.assertEqual(mock_watch.call_count, 2)
# _parse_config called for initial load + reload
# _parse_config_file called for initial load + reload
self.assertGreaterEqual(mock_parse_config.call_count, 2)
@unittest.skipUnless(
@@ -2059,8 +2006,7 @@ watch = true
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
@@ -2069,7 +2015,6 @@ watch = true
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
@@ -2083,13 +2028,11 @@ watch = true
"smtp_tls_reports": [],
}
mock_load_config.return_value = ConfigParser()
# Initial parse sets required opts; reload parse raises
initial_map = {"prefix_": ["example.com"]}
call_count = [0]
def parse_side_effect(config, opts):
def parse_side_effect(config_file, opts):
call_count[0] += 1
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
@@ -2139,8 +2082,7 @@ watch = true
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
@@ -2149,7 +2091,6 @@ watch = true
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
@@ -2163,9 +2104,7 @@ watch = true
"smtp_tls_reports": [],
}
mock_load_config.return_value = ConfigParser()
def parse_side_effect(config, opts):
def parse_side_effect(config_file, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
@@ -2292,587 +2231,6 @@ watch = true
# Second init (after reload with v2 config): kafka_hosts should be None
self.assertIsNone(init_opts_captures[1].kafka_hosts)
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testReloadRefreshesReverseDnsMap(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
"""SIGHUP reload repopulates the reverse DNS map so lookups still work."""
import signal as signal_module
from parsedmarc import REVERSE_DNS_MAP
mock_imap.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
mock_load_config.return_value = ConfigParser()
def parse_side_effect(config, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
opts.mailbox_watch = True
return None
mock_parse_config.side_effect = parse_side_effect
mock_init_clients.return_value = {}
# Snapshot the map state after each watch_inbox call
map_snapshots = []
watch_calls = [0]
def watch_side_effect(*args, **kwargs):
watch_calls[0] += 1
if watch_calls[0] == 1:
if hasattr(signal_module, "SIGHUP"):
import os
os.kill(os.getpid(), signal_module.SIGHUP)
return
else:
# Capture the map state after reload, before we stop the loop
map_snapshots.append(dict(REVERSE_DNS_MAP))
raise FileExistsError("stop")
mock_watch.side_effect = watch_side_effect
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
cfg.write(self._BASE_CONFIG)
cfg_path = cfg.name
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
# Pre-populate the map so we can verify it gets refreshed
REVERSE_DNS_MAP.clear()
REVERSE_DNS_MAP["stale.example.com"] = {
"name": "Stale",
"type": "stale",
}
original_contents = dict(REVERSE_DNS_MAP)
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
with self.assertRaises(SystemExit):
parsedmarc.cli._main()
self.assertEqual(mock_watch.call_count, 2)
# The map should have been repopulated (not empty, not the stale data)
self.assertEqual(len(map_snapshots), 1)
refreshed = map_snapshots[0]
self.assertGreater(len(refreshed), 0, "Map should not be empty after reload")
self.assertNotEqual(
refreshed,
original_contents,
"Map should have been refreshed, not kept stale data",
)
self.assertNotIn(
"stale.example.com",
refreshed,
"Stale entry should have been cleared by reload",
)
class TestIndexPrefixDomainMapTlsFiltering(unittest.TestCase):
"""Tests that SMTP TLS reports for unmapped domains are filtered out
when index_prefix_domain_map is configured."""
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.IMAPConnection")
def testTlsReportsFilteredByDomainMap(
self,
mock_imap_connection,
mock_get_reports,
):
"""TLS reports for domains not in the map should be silently dropped."""
mock_imap_connection.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [
{
"organization_name": "Allowed Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "allowed-1",
"contact_info": "tls@allowed.example.com",
"policies": [
{
"policy_domain": "allowed.example.com",
"policy_type": "sts",
"successful_session_count": 1,
"failed_session_count": 0,
}
],
},
{
"organization_name": "Unmapped Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "unmapped-1",
"contact_info": "tls@unmapped.example.net",
"policies": [
{
"policy_domain": "unmapped.example.net",
"policy_type": "sts",
"successful_session_count": 5,
"failed_session_count": 0,
}
],
},
{
"organization_name": "Mixed Case Org",
"begin_date": "2024-01-01T00:00:00Z",
"end_date": "2024-01-01T23:59:59Z",
"report_id": "mixed-case-1",
"contact_info": "tls@mixedcase.example.com",
"policies": [
{
"policy_domain": "MixedCase.Example.Com",
"policy_type": "sts",
"successful_session_count": 2,
"failed_session_count": 0,
}
],
},
],
}
domain_map = {"tenant_a": ["example.com"]}
with NamedTemporaryFile("w", suffix=".yaml", delete=False) as map_file:
import yaml
yaml.dump(domain_map, map_file)
map_path = map_file.name
self.addCleanup(lambda: os.path.exists(map_path) and os.remove(map_path))
config = f"""[general]
save_smtp_tls = true
silent = false
index_prefix_domain_map = {map_path}
[imap]
host = imap.example.com
user = test-user
password = test-password
"""
with NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
config_file.write(config)
config_path = config_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
captured = io.StringIO()
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
with patch("sys.stdout", captured):
parsedmarc.cli._main()
output = json.loads(captured.getvalue())
tls_reports = output["smtp_tls_reports"]
self.assertEqual(len(tls_reports), 2)
report_ids = {r["report_id"] for r in tls_reports}
self.assertIn("allowed-1", report_ids)
self.assertIn("mixed-case-1", report_ids)
self.assertNotIn("unmapped-1", report_ids)
class TestMaildirConnection(unittest.TestCase):
"""Tests for MaildirConnection subdirectory creation."""
def test_create_subdirs_when_missing(self):
"""maildir_create=True creates cur/new/tmp in an empty directory."""
from parsedmarc.mail.maildir import MaildirConnection
with TemporaryDirectory() as d:
for subdir in ("cur", "new", "tmp"):
self.assertFalse(os.path.exists(os.path.join(d, subdir)))
conn = MaildirConnection(d, maildir_create=True)
for subdir in ("cur", "new", "tmp"):
self.assertTrue(os.path.isdir(os.path.join(d, subdir)))
# Should be able to list messages without error
self.assertEqual(conn.fetch_messages("INBOX"), [])
def test_create_subdirs_idempotent(self):
"""maildir_create=True is safe when subdirs already exist."""
from parsedmarc.mail.maildir import MaildirConnection
with TemporaryDirectory() as d:
for subdir in ("cur", "new", "tmp"):
os.makedirs(os.path.join(d, subdir))
# Should not raise
conn = MaildirConnection(d, maildir_create=True)
self.assertEqual(conn.fetch_messages("INBOX"), [])
def test_no_create_raises_on_missing_subdirs(self):
"""maildir_create=False does not create subdirs; keys() fails."""
from parsedmarc.mail.maildir import MaildirConnection
with TemporaryDirectory() as d:
conn = MaildirConnection(d, maildir_create=False)
with self.assertRaises(FileNotFoundError):
conn.fetch_messages("INBOX")
def test_fetch_and_delete_message(self):
"""Round-trip: add a message, fetch it, delete it."""
from parsedmarc.mail.maildir import MaildirConnection
with TemporaryDirectory() as d:
conn = MaildirConnection(d, maildir_create=True)
# Add a message via the underlying client
msg_key = conn._client.add("From: test@example.com\n\nHello")
keys = conn.fetch_messages("INBOX")
self.assertIn(msg_key, keys)
content = conn.fetch_message(msg_key)
self.assertIn("test@example.com", content)
conn.delete_message(msg_key)
self.assertEqual(conn.fetch_messages("INBOX"), [])
def test_move_message_creates_subfolder(self):
"""move_message auto-creates the destination subfolder."""
from parsedmarc.mail.maildir import MaildirConnection
with TemporaryDirectory() as d:
conn = MaildirConnection(d, maildir_create=True)
msg_key = conn._client.add("From: test@example.com\n\nHello")
conn.move_message(msg_key, "archive")
# Original should be gone
self.assertEqual(conn.fetch_messages("INBOX"), [])
# Archive subfolder should have the message
self.assertIn("archive", conn._subfolder_client)
self.assertEqual(len(conn._subfolder_client["archive"].keys()), 1)
class TestMaildirUidHandling(unittest.TestCase):
"""Tests for Maildir UID mismatch handling in Docker-like environments."""
def test_uid_mismatch_warns_instead_of_crashing(self):
"""UID mismatch logs a warning instead of raising an exception."""
from parsedmarc.mail.maildir import MaildirConnection
with TemporaryDirectory() as d:
# Create subdirs so Maildir works
for subdir in ("cur", "new", "tmp"):
os.makedirs(os.path.join(d, subdir))
# Mock os.stat to return a different UID than os.getuid
fake_stat = os.stat(d)
with (
patch("parsedmarc.mail.maildir.os.stat") as mock_stat,
patch("parsedmarc.mail.maildir.os.getuid", return_value=9999),
):
mock_stat.return_value = fake_stat
# Should not raise — just warn
conn = MaildirConnection(d, maildir_create=False)
self.assertEqual(conn.fetch_messages("INBOX"), [])
def test_uid_match_no_warning(self):
"""No warning when UIDs match."""
from parsedmarc.mail.maildir import MaildirConnection
with TemporaryDirectory() as d:
conn = MaildirConnection(d, maildir_create=True)
self.assertEqual(conn.fetch_messages("INBOX"), [])
def test_stat_failure_does_not_crash(self):
"""If os.stat fails on the maildir path, we don't crash."""
from parsedmarc.mail.maildir import MaildirConnection
with TemporaryDirectory() as d:
for subdir in ("cur", "new", "tmp"):
os.makedirs(os.path.join(d, subdir))
original_stat = os.stat
def stat_that_fails_once(path, *args, **kwargs):
"""Fail on the first call (UID check), pass through after."""
stat_that_fails_once.calls += 1
if stat_that_fails_once.calls == 1:
raise OSError("no stat")
return original_stat(path, *args, **kwargs)
stat_that_fails_once.calls = 0
with patch(
"parsedmarc.mail.maildir.os.stat", side_effect=stat_that_fails_once
):
conn = MaildirConnection(d, maildir_create=False)
self.assertEqual(conn.fetch_messages("INBOX"), [])
class TestExpandPath(unittest.TestCase):
"""Tests for _expand_path config path expansion."""
def test_expand_tilde(self):
from parsedmarc.cli import _expand_path
result = _expand_path("~/some/path")
self.assertFalse(result.startswith("~"))
self.assertTrue(result.endswith("/some/path"))
def test_expand_env_var(self):
from parsedmarc.cli import _expand_path
with patch.dict(os.environ, {"PARSEDMARC_TEST_DIR": "/opt/data"}):
result = _expand_path("$PARSEDMARC_TEST_DIR/tokens/.token")
self.assertEqual(result, "/opt/data/tokens/.token")
def test_expand_both(self):
from parsedmarc.cli import _expand_path
with patch.dict(os.environ, {"MY_APP": "parsedmarc"}):
result = _expand_path("~/$MY_APP/config")
self.assertNotIn("~", result)
self.assertIn("parsedmarc/config", result)
def test_no_expansion_needed(self):
from parsedmarc.cli import _expand_path
self.assertEqual(_expand_path("/absolute/path"), "/absolute/path")
self.assertEqual(_expand_path("relative/path"), "relative/path")
class TestTokenParentDirCreation(unittest.TestCase):
"""Tests for parent directory creation when writing token files."""
def test_graph_cache_creates_parent_dirs(self):
from parsedmarc.mail.graph import _cache_auth_record
with TemporaryDirectory() as d:
token_path = Path(d) / "subdir" / "nested" / ".token"
self.assertFalse(token_path.parent.exists())
mock_record = MagicMock()
mock_record.serialize.return_value = "serialized-token"
_cache_auth_record(mock_record, token_path)
self.assertTrue(token_path.exists())
self.assertEqual(token_path.read_text(), "serialized-token")
def test_gmail_token_write_creates_parent_dirs(self):
"""Gmail token write creates parent directories."""
with TemporaryDirectory() as d:
token_path = Path(d) / "deep" / "nested" / "token.json"
self.assertFalse(token_path.parent.exists())
# Directly test the mkdir + open pattern
token_path.parent.mkdir(parents=True, exist_ok=True)
with token_path.open("w") as f:
f.write('{"token": "test"}')
self.assertTrue(token_path.exists())
self.assertEqual(token_path.read_text(), '{"token": "test"}')
class TestEnvVarConfig(unittest.TestCase):
"""Tests for environment variable configuration support."""
def test_resolve_section_key_simple(self):
"""Simple section names resolve correctly."""
from parsedmarc.cli import _resolve_section_key
self.assertEqual(_resolve_section_key("IMAP_PASSWORD"), ("imap", "password"))
self.assertEqual(_resolve_section_key("GENERAL_DEBUG"), ("general", "debug"))
self.assertEqual(_resolve_section_key("S3_BUCKET"), ("s3", "bucket"))
self.assertEqual(_resolve_section_key("GELF_HOST"), ("gelf", "host"))
def test_resolve_section_key_underscore_sections(self):
"""Multi-word section names (splunk_hec, gmail_api, etc.) resolve correctly."""
from parsedmarc.cli import _resolve_section_key
self.assertEqual(
_resolve_section_key("SPLUNK_HEC_TOKEN"), ("splunk_hec", "token")
)
self.assertEqual(
_resolve_section_key("GMAIL_API_CREDENTIALS_FILE"),
("gmail_api", "credentials_file"),
)
self.assertEqual(
_resolve_section_key("LOG_ANALYTICS_CLIENT_ID"),
("log_analytics", "client_id"),
)
def test_resolve_section_key_unknown(self):
"""Unknown prefixes return (None, None)."""
from parsedmarc.cli import _resolve_section_key
self.assertEqual(_resolve_section_key("UNKNOWN_FOO"), (None, None))
# Just a section name with no key should not match
self.assertEqual(_resolve_section_key("IMAP"), (None, None))
def test_apply_env_overrides_injects_values(self):
"""Env vars are injected into an existing ConfigParser."""
from configparser import ConfigParser
from parsedmarc.cli import _apply_env_overrides
config = ConfigParser()
config.add_section("imap")
config.set("imap", "host", "original.example.com")
env = {
"PARSEDMARC_IMAP_HOST": "new.example.com",
"PARSEDMARC_IMAP_PASSWORD": "secret123",
}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertEqual(config.get("imap", "host"), "new.example.com")
self.assertEqual(config.get("imap", "password"), "secret123")
def test_apply_env_overrides_creates_sections(self):
"""Env vars create new sections when they don't exist."""
from configparser import ConfigParser
from parsedmarc.cli import _apply_env_overrides
config = ConfigParser()
env = {"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200"}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertTrue(config.has_section("elasticsearch"))
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
def test_apply_env_overrides_ignores_config_file_var(self):
"""PARSEDMARC_CONFIG_FILE is not injected as a config key."""
from configparser import ConfigParser
from parsedmarc.cli import _apply_env_overrides
config = ConfigParser()
env = {"PARSEDMARC_CONFIG_FILE": "/some/path.ini"}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertEqual(config.sections(), [])
def test_load_config_with_file_and_env_override(self):
"""Env vars override values from an INI file."""
from parsedmarc.cli import _load_config
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
f.write(
"[imap]\nhost = file.example.com\nuser = alice\npassword = fromfile\n"
)
f.flush()
config_path = f.name
try:
env = {"PARSEDMARC_IMAP_PASSWORD": "fromenv"}
with patch.dict(os.environ, env, clear=False):
config = _load_config(config_path)
self.assertEqual(config.get("imap", "host"), "file.example.com")
self.assertEqual(config.get("imap", "user"), "alice")
self.assertEqual(config.get("imap", "password"), "fromenv")
finally:
os.unlink(config_path)
def test_load_config_env_only(self):
"""Config can be loaded purely from env vars with no file."""
from parsedmarc.cli import _load_config
env = {
"PARSEDMARC_GENERAL_DEBUG": "true",
"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200",
}
with patch.dict(os.environ, env, clear=False):
config = _load_config(None)
self.assertEqual(config.get("general", "debug"), "true")
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
def test_parse_config_from_env(self):
"""Full round-trip: env vars -> ConfigParser -> opts."""
from argparse import Namespace
from parsedmarc.cli import _load_config, _parse_config
env = {
"PARSEDMARC_GENERAL_DEBUG": "true",
"PARSEDMARC_GENERAL_SAVE_AGGREGATE": "true",
"PARSEDMARC_GENERAL_OFFLINE": "true",
}
with patch.dict(os.environ, env, clear=False):
config = _load_config(None)
opts = Namespace()
_parse_config(config, opts)
self.assertTrue(opts.debug)
self.assertTrue(opts.save_aggregate)
self.assertTrue(opts.offline)
def test_config_file_env_var(self):
"""PARSEDMARC_CONFIG_FILE env var specifies the config file path."""
from argparse import Namespace
from parsedmarc.cli import _load_config, _parse_config
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
f.write("[general]\ndebug = true\noffline = true\n")
f.flush()
config_path = f.name
try:
env = {"PARSEDMARC_CONFIG_FILE": config_path}
with patch.dict(os.environ, env, clear=False):
config = _load_config(os.environ.get("PARSEDMARC_CONFIG_FILE"))
opts = Namespace()
_parse_config(config, opts)
self.assertTrue(opts.debug)
self.assertTrue(opts.offline)
finally:
os.unlink(config_path)
def test_boolean_values_from_env(self):
"""Various boolean string representations work through ConfigParser."""
from configparser import ConfigParser
from parsedmarc.cli import _apply_env_overrides
for true_val in ("true", "yes", "1", "on", "True", "YES"):
config = ConfigParser()
env = {"PARSEDMARC_GENERAL_DEBUG": true_val}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertTrue(
config.getboolean("general", "debug"),
f"Expected truthy for {true_val!r}",
)
for false_val in ("false", "no", "0", "off", "False", "NO"):
config = ConfigParser()
env = {"PARSEDMARC_GENERAL_DEBUG": false_val}
with patch.dict(os.environ, env, clear=False):
_apply_env_overrides(config)
self.assertFalse(
config.getboolean("general", "debug"),
f"Expected falsy for {false_val!r}",
)
if __name__ == "__main__":
unittest.main(verbosity=2)