mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-26 16:32:48 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fb3c38a8b8 | ||
|
|
c9a6145505 | ||
|
|
e1bdbeb257 | ||
|
|
12c4676b79 | ||
|
|
cda039ee27 | ||
|
|
ff0ca6538c |
2
.vscode/settings.json
vendored
2
.vscode/settings.json
vendored
@@ -52,6 +52,7 @@
|
||||
"geoipupdate",
|
||||
"Geolite",
|
||||
"geolocation",
|
||||
"getuid",
|
||||
"githubpages",
|
||||
"Grafana",
|
||||
"hostnames",
|
||||
@@ -75,6 +76,7 @@
|
||||
"LISTSERV",
|
||||
"loganalytics",
|
||||
"lxml",
|
||||
"Maildir",
|
||||
"mailparser",
|
||||
"mailrelay",
|
||||
"mailsuite",
|
||||
|
||||
30
CHANGELOG.md
30
CHANGELOG.md
@@ -1,5 +1,35 @@
|
||||
# Changelog
|
||||
|
||||
## 9.5.3
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `FileNotFoundError` when using Maildir with Docker volume mounts. Python's `mailbox.Maildir(create=True)` only creates `cur/new/tmp` subdirectories when the top-level directory doesn't exist; Docker volume mounts pre-create the directory as empty, skipping subdirectory creation. parsedmarc now explicitly creates the subdirectories when `maildir_create` is enabled.
|
||||
- Maildir UID mismatch no longer crashes the process. In Docker containers where volume ownership differs from the container UID, parsedmarc now logs a warning instead of raising an exception. Also handles `os.setuid` failures gracefully in containers without `CAP_SETUID`.
|
||||
- Token file writes (MS Graph and Gmail) now create parent directories automatically, preventing `FileNotFoundError` when the token path points to a directory that doesn't yet exist.
|
||||
- File paths from config (`token_file`, `credentials_file`, `cert_path`, `log_file`, `output`, `ip_db_path`, `maildir_path`, syslog cert paths, etc.) now expand `~` and `$VAR` references via `os.path.expanduser`/`os.path.expandvars`.
|
||||
|
||||
## 9.5.2
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `ValueError: invalid interpolation syntax` when config values (from env vars or INI files) contain `%` characters, such as in passwords. Disabled ConfigParser's `%`-based string interpolation.
|
||||
|
||||
## 9.5.1
|
||||
|
||||
### Changes
|
||||
|
||||
- Correct ISO format for MSGraphConnection timestamps (PR #706)
|
||||
|
||||
## 9.5.0
|
||||
|
||||
### Added
|
||||
|
||||
- Environment variable configuration support: any config option can now be set via `PARSEDMARC_{SECTION}_{KEY}` environment variables (e.g. `PARSEDMARC_IMAP_PASSWORD`, `PARSEDMARC_SPLUNK_HEC_TOKEN`). Environment variables override config file values but are overridden by CLI arguments.
|
||||
- `PARSEDMARC_CONFIG_FILE` environment variable to specify the config file path without the `-c` flag.
|
||||
- Env-only mode: parsedmarc can now run without a config file when `PARSEDMARC_*` environment variables are set, enabling fully file-less Docker deployments.
|
||||
- Explicit read permission check on config file, giving a clear error message when the container UID cannot read the file (e.g. `chmod 600` with a UID mismatch).
|
||||
|
||||
## 9.4.0
|
||||
|
||||
### Added
|
||||
|
||||
@@ -531,6 +531,96 @@ PUT _cluster/settings
|
||||
Increasing this value increases resource usage.
|
||||
:::
|
||||
|
||||
## Environment variable configuration
|
||||
|
||||
Any configuration option can be set via environment variables using the
|
||||
naming convention `PARSEDMARC_{SECTION}_{KEY}` (uppercase). This is
|
||||
especially useful for Docker deployments where file permissions make it
|
||||
difficult to use config files for secrets.
|
||||
|
||||
**Priority order:** CLI arguments > environment variables > config file > defaults
|
||||
|
||||
### Examples
|
||||
|
||||
```bash
|
||||
# Set IMAP credentials via env vars
|
||||
export PARSEDMARC_IMAP_HOST=imap.example.com
|
||||
export PARSEDMARC_IMAP_USER=dmarc@example.com
|
||||
export PARSEDMARC_IMAP_PASSWORD=secret
|
||||
|
||||
# Elasticsearch
|
||||
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://localhost:9200
|
||||
export PARSEDMARC_ELASTICSEARCH_SSL=false
|
||||
|
||||
# Splunk HEC (note: section name splunk_hec becomes SPLUNK_HEC)
|
||||
export PARSEDMARC_SPLUNK_HEC_URL=https://splunk.example.com
|
||||
export PARSEDMARC_SPLUNK_HEC_TOKEN=my-hec-token
|
||||
export PARSEDMARC_SPLUNK_HEC_INDEX=email
|
||||
|
||||
# General settings
|
||||
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
|
||||
export PARSEDMARC_GENERAL_DEBUG=true
|
||||
```
|
||||
|
||||
### Specifying the config file via environment variable
|
||||
|
||||
```bash
|
||||
export PARSEDMARC_CONFIG_FILE=/etc/parsedmarc.ini
|
||||
parsedmarc
|
||||
```
|
||||
|
||||
### Running without a config file (env-only mode)
|
||||
|
||||
When no config file is given (neither `-c` flag nor `PARSEDMARC_CONFIG_FILE`),
|
||||
parsedmarc will still pick up any `PARSEDMARC_*` environment variables. This
|
||||
enables fully file-less deployments:
|
||||
|
||||
```bash
|
||||
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
|
||||
export PARSEDMARC_GENERAL_OFFLINE=true
|
||||
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://elasticsearch:9200
|
||||
parsedmarc /path/to/reports/*
|
||||
```
|
||||
|
||||
### Docker Compose example
|
||||
|
||||
```yaml
|
||||
services:
|
||||
parsedmarc:
|
||||
image: parsedmarc:latest
|
||||
environment:
|
||||
PARSEDMARC_IMAP_HOST: imap.example.com
|
||||
PARSEDMARC_IMAP_USER: dmarc@example.com
|
||||
PARSEDMARC_IMAP_PASSWORD: ${IMAP_PASSWORD}
|
||||
PARSEDMARC_MAILBOX_WATCH: "true"
|
||||
PARSEDMARC_ELASTICSEARCH_HOSTS: http://elasticsearch:9200
|
||||
PARSEDMARC_GENERAL_SAVE_AGGREGATE: "true"
|
||||
PARSEDMARC_GENERAL_SAVE_FORENSIC: "true"
|
||||
```
|
||||
|
||||
### Section name mapping
|
||||
|
||||
For sections with underscores in the name, the full section name is used:
|
||||
|
||||
| Section | Env var prefix |
|
||||
|------------------|-------------------------------|
|
||||
| `general` | `PARSEDMARC_GENERAL_` |
|
||||
| `mailbox` | `PARSEDMARC_MAILBOX_` |
|
||||
| `imap` | `PARSEDMARC_IMAP_` |
|
||||
| `msgraph` | `PARSEDMARC_MSGRAPH_` |
|
||||
| `elasticsearch` | `PARSEDMARC_ELASTICSEARCH_` |
|
||||
| `opensearch` | `PARSEDMARC_OPENSEARCH_` |
|
||||
| `splunk_hec` | `PARSEDMARC_SPLUNK_HEC_` |
|
||||
| `kafka` | `PARSEDMARC_KAFKA_` |
|
||||
| `smtp` | `PARSEDMARC_SMTP_` |
|
||||
| `s3` | `PARSEDMARC_S3_` |
|
||||
| `syslog` | `PARSEDMARC_SYSLOG_` |
|
||||
| `gmail_api` | `PARSEDMARC_GMAIL_API_` |
|
||||
| `maildir` | `PARSEDMARC_MAILDIR_` |
|
||||
| `log_analytics` | `PARSEDMARC_LOG_ANALYTICS_` |
|
||||
| `gelf` | `PARSEDMARC_GELF_` |
|
||||
| `webhook` | `PARSEDMARC_WEBHOOK_` |
|
||||
|
||||
## Performance tuning
|
||||
|
||||
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
|
||||
|
||||
@@ -1955,9 +1955,7 @@ def get_dmarc_reports_from_mailbox(
|
||||
)
|
||||
current_time = datetime.now(timezone.utc).strftime("%d-%b-%Y")
|
||||
elif isinstance(connection, MSGraphConnection):
|
||||
since = (
|
||||
datetime.now(timezone.utc) - timedelta(minutes=_since)
|
||||
).isoformat() + "Z"
|
||||
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).isoformat()
|
||||
current_time = datetime.now(timezone.utc).isoformat() + "Z"
|
||||
elif isinstance(connection, GmailConnection):
|
||||
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime(
|
||||
|
||||
@@ -75,6 +75,84 @@ def _str_to_list(s):
|
||||
return list(map(lambda i: i.lstrip(), _list))
|
||||
|
||||
|
||||
def _expand_path(p: str) -> str:
|
||||
"""Expand ``~`` and ``$VAR`` references in a file path."""
|
||||
return os.path.expanduser(os.path.expandvars(p))
|
||||
|
||||
|
||||
# All known INI config section names, used for env var resolution.
|
||||
_KNOWN_SECTIONS = frozenset(
|
||||
{
|
||||
"general",
|
||||
"mailbox",
|
||||
"imap",
|
||||
"msgraph",
|
||||
"elasticsearch",
|
||||
"opensearch",
|
||||
"splunk_hec",
|
||||
"kafka",
|
||||
"smtp",
|
||||
"s3",
|
||||
"syslog",
|
||||
"gmail_api",
|
||||
"maildir",
|
||||
"log_analytics",
|
||||
"gelf",
|
||||
"webhook",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _resolve_section_key(suffix: str) -> tuple:
|
||||
"""Resolve an env var suffix like ``IMAP_PASSWORD`` to ``('imap', 'password')``.
|
||||
|
||||
Uses longest-prefix matching against known section names so that
|
||||
multi-word sections like ``splunk_hec`` are handled correctly.
|
||||
|
||||
Returns ``(None, None)`` when no known section matches.
|
||||
"""
|
||||
suffix_lower = suffix.lower()
|
||||
|
||||
best_section = None
|
||||
best_key = None
|
||||
for section in _KNOWN_SECTIONS:
|
||||
section_prefix = section + "_"
|
||||
if suffix_lower.startswith(section_prefix):
|
||||
key = suffix_lower[len(section_prefix) :]
|
||||
if key and (best_section is None or len(section) > len(best_section)):
|
||||
best_section = section
|
||||
best_key = key
|
||||
|
||||
return best_section, best_key
|
||||
|
||||
|
||||
def _apply_env_overrides(config: ConfigParser) -> None:
|
||||
"""Inject ``PARSEDMARC_*`` environment variables into *config*.
|
||||
|
||||
Environment variables matching ``PARSEDMARC_{SECTION}_{KEY}`` override
|
||||
(or create) the corresponding config-file values. Sections are created
|
||||
automatically when they do not yet exist.
|
||||
"""
|
||||
prefix = "PARSEDMARC_"
|
||||
|
||||
for env_key, env_value in os.environ.items():
|
||||
if not env_key.startswith(prefix) or env_key == "PARSEDMARC_CONFIG_FILE":
|
||||
continue
|
||||
|
||||
suffix = env_key[len(prefix) :]
|
||||
section, key = _resolve_section_key(suffix)
|
||||
|
||||
if section is None:
|
||||
logger.debug("Ignoring unrecognized env var: %s", env_key)
|
||||
continue
|
||||
|
||||
if not config.has_section(section):
|
||||
config.add_section(section)
|
||||
|
||||
config.set(section, key, env_value)
|
||||
logger.debug("Config override from env: [%s] %s", section, key)
|
||||
|
||||
|
||||
def _configure_logging(log_level, log_file=None):
|
||||
"""
|
||||
Configure logging for the current process.
|
||||
@@ -178,12 +256,39 @@ class ConfigurationError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _parse_config_file(config_file, opts):
|
||||
"""Parse a config file and update opts in place.
|
||||
def _load_config(config_file: str | None = None) -> ConfigParser:
|
||||
"""Load configuration from an INI file and/or environment variables.
|
||||
|
||||
Args:
|
||||
config_file: Path to the .ini config file
|
||||
opts: Namespace object to update with parsed values
|
||||
config_file: Optional path to an .ini config file.
|
||||
|
||||
Returns:
|
||||
A ``ConfigParser`` populated from the file (if given) and from any
|
||||
``PARSEDMARC_*`` environment variables.
|
||||
|
||||
Raises:
|
||||
ConfigurationError: If *config_file* is given but does not exist.
|
||||
"""
|
||||
config = ConfigParser(interpolation=None)
|
||||
if config_file is not None:
|
||||
abs_path = os.path.abspath(config_file)
|
||||
if not os.path.exists(abs_path):
|
||||
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
|
||||
if not os.access(abs_path, os.R_OK):
|
||||
raise ConfigurationError(
|
||||
"Unable to read {0} — check file permissions".format(abs_path)
|
||||
)
|
||||
config.read(config_file)
|
||||
_apply_env_overrides(config)
|
||||
return config
|
||||
|
||||
|
||||
def _parse_config(config: ConfigParser, opts):
|
||||
"""Apply a loaded ``ConfigParser`` to *opts* in place.
|
||||
|
||||
Args:
|
||||
config: A ``ConfigParser`` (from ``_load_config``).
|
||||
opts: Namespace object to update with parsed values.
|
||||
|
||||
Returns:
|
||||
index_prefix_domain_map or None
|
||||
@@ -191,13 +296,8 @@ def _parse_config_file(config_file, opts):
|
||||
Raises:
|
||||
ConfigurationError: If required settings are missing or invalid.
|
||||
"""
|
||||
abs_path = os.path.abspath(config_file)
|
||||
if not os.path.exists(abs_path):
|
||||
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
|
||||
opts.silent = True
|
||||
config = ConfigParser()
|
||||
index_prefix_domain_map = None
|
||||
config.read(config_file)
|
||||
if "general" in config.sections():
|
||||
general_config = config["general"]
|
||||
if "silent" in general_config:
|
||||
@@ -207,7 +307,7 @@ def _parse_config_file(config_file, opts):
|
||||
"normalize_timespan_threshold_hours"
|
||||
)
|
||||
if "index_prefix_domain_map" in general_config:
|
||||
with open(general_config["index_prefix_domain_map"]) as f:
|
||||
with open(_expand_path(general_config["index_prefix_domain_map"])) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
if "offline" in general_config:
|
||||
opts.offline = bool(general_config.getboolean("offline"))
|
||||
@@ -216,7 +316,7 @@ def _parse_config_file(config_file, opts):
|
||||
general_config.getboolean("strip_attachment_payloads")
|
||||
)
|
||||
if "output" in general_config:
|
||||
opts.output = general_config["output"]
|
||||
opts.output = _expand_path(general_config["output"])
|
||||
if "aggregate_json_filename" in general_config:
|
||||
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
|
||||
if "forensic_json_filename" in general_config:
|
||||
@@ -272,11 +372,11 @@ def _parse_config_file(config_file, opts):
|
||||
general_config.getboolean("fail_on_output_error")
|
||||
)
|
||||
if "log_file" in general_config:
|
||||
opts.log_file = general_config["log_file"]
|
||||
opts.log_file = _expand_path(general_config["log_file"])
|
||||
if "n_procs" in general_config:
|
||||
opts.n_procs = general_config.getint("n_procs")
|
||||
if "ip_db_path" in general_config:
|
||||
opts.ip_db_path = general_config["ip_db_path"]
|
||||
opts.ip_db_path = _expand_path(general_config["ip_db_path"])
|
||||
else:
|
||||
opts.ip_db_path = None
|
||||
if "always_use_local_files" in general_config:
|
||||
@@ -284,7 +384,9 @@ def _parse_config_file(config_file, opts):
|
||||
general_config.getboolean("always_use_local_files")
|
||||
)
|
||||
if "local_reverse_dns_map_path" in general_config:
|
||||
opts.reverse_dns_map_path = general_config["local_reverse_dns_map_path"]
|
||||
opts.reverse_dns_map_path = _expand_path(
|
||||
general_config["local_reverse_dns_map_path"]
|
||||
)
|
||||
if "reverse_dns_map_url" in general_config:
|
||||
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
|
||||
if "prettify_json" in general_config:
|
||||
@@ -399,7 +501,7 @@ def _parse_config_file(config_file, opts):
|
||||
|
||||
if "msgraph" in config.sections():
|
||||
graph_config = config["msgraph"]
|
||||
opts.graph_token_file = graph_config.get("token_file", ".token")
|
||||
opts.graph_token_file = _expand_path(graph_config.get("token_file", ".token"))
|
||||
|
||||
if "auth_method" not in graph_config:
|
||||
logger.info(
|
||||
@@ -453,7 +555,9 @@ def _parse_config_file(config_file, opts):
|
||||
|
||||
if opts.graph_auth_method == AuthMethod.Certificate.name:
|
||||
if "certificate_path" in graph_config:
|
||||
opts.graph_certificate_path = graph_config["certificate_path"]
|
||||
opts.graph_certificate_path = _expand_path(
|
||||
graph_config["certificate_path"]
|
||||
)
|
||||
else:
|
||||
raise ConfigurationError(
|
||||
"certificate_path setting missing from the msgraph config section"
|
||||
@@ -510,7 +614,9 @@ def _parse_config_file(config_file, opts):
|
||||
if "ssl" in elasticsearch_config:
|
||||
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
|
||||
if "cert_path" in elasticsearch_config:
|
||||
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
|
||||
opts.elasticsearch_ssl_cert_path = _expand_path(
|
||||
elasticsearch_config["cert_path"]
|
||||
)
|
||||
if "skip_certificate_verification" in elasticsearch_config:
|
||||
opts.elasticsearch_skip_certificate_verification = bool(
|
||||
elasticsearch_config.getboolean("skip_certificate_verification")
|
||||
@@ -553,7 +659,7 @@ def _parse_config_file(config_file, opts):
|
||||
if "ssl" in opensearch_config:
|
||||
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
|
||||
if "cert_path" in opensearch_config:
|
||||
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
|
||||
opts.opensearch_ssl_cert_path = _expand_path(opensearch_config["cert_path"])
|
||||
if "skip_certificate_verification" in opensearch_config:
|
||||
opts.opensearch_skip_certificate_verification = bool(
|
||||
opensearch_config.getboolean("skip_certificate_verification")
|
||||
@@ -680,7 +786,7 @@ def _parse_config_file(config_file, opts):
|
||||
if "subject" in smtp_config:
|
||||
opts.smtp_subject = smtp_config["subject"]
|
||||
if "attachment" in smtp_config:
|
||||
opts.smtp_attachment = smtp_config["attachment"]
|
||||
opts.smtp_attachment = _expand_path(smtp_config["attachment"])
|
||||
if "message" in smtp_config:
|
||||
opts.smtp_message = smtp_config["message"]
|
||||
|
||||
@@ -727,11 +833,11 @@ def _parse_config_file(config_file, opts):
|
||||
else:
|
||||
opts.syslog_protocol = "udp"
|
||||
if "cafile_path" in syslog_config:
|
||||
opts.syslog_cafile_path = syslog_config["cafile_path"]
|
||||
opts.syslog_cafile_path = _expand_path(syslog_config["cafile_path"])
|
||||
if "certfile_path" in syslog_config:
|
||||
opts.syslog_certfile_path = syslog_config["certfile_path"]
|
||||
opts.syslog_certfile_path = _expand_path(syslog_config["certfile_path"])
|
||||
if "keyfile_path" in syslog_config:
|
||||
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
|
||||
opts.syslog_keyfile_path = _expand_path(syslog_config["keyfile_path"])
|
||||
if "timeout" in syslog_config:
|
||||
opts.syslog_timeout = float(syslog_config["timeout"])
|
||||
else:
|
||||
@@ -747,8 +853,13 @@ def _parse_config_file(config_file, opts):
|
||||
|
||||
if "gmail_api" in config.sections():
|
||||
gmail_api_config = config["gmail_api"]
|
||||
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
|
||||
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
|
||||
gmail_creds = gmail_api_config.get("credentials_file")
|
||||
opts.gmail_api_credentials_file = (
|
||||
_expand_path(gmail_creds) if gmail_creds else gmail_creds
|
||||
)
|
||||
opts.gmail_api_token_file = _expand_path(
|
||||
gmail_api_config.get("token_file", ".token")
|
||||
)
|
||||
opts.gmail_api_include_spam_trash = bool(
|
||||
gmail_api_config.getboolean("include_spam_trash", False)
|
||||
)
|
||||
@@ -773,7 +884,8 @@ def _parse_config_file(config_file, opts):
|
||||
|
||||
if "maildir" in config.sections():
|
||||
maildir_api_config = config["maildir"]
|
||||
opts.maildir_path = maildir_api_config.get("maildir_path")
|
||||
maildir_p = maildir_api_config.get("maildir_path")
|
||||
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
|
||||
opts.maildir_create = bool(
|
||||
maildir_api_config.getboolean("maildir_create", fallback=False)
|
||||
)
|
||||
@@ -1683,9 +1795,16 @@ def _main():
|
||||
|
||||
index_prefix_domain_map = None
|
||||
|
||||
if args.config_file:
|
||||
config_file = args.config_file or os.environ.get("PARSEDMARC_CONFIG_FILE")
|
||||
has_env_config = any(
|
||||
k.startswith("PARSEDMARC_") and k != "PARSEDMARC_CONFIG_FILE"
|
||||
for k in os.environ
|
||||
)
|
||||
|
||||
if config_file or has_env_config:
|
||||
try:
|
||||
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
|
||||
config = _load_config(config_file)
|
||||
index_prefix_domain_map = _parse_config(config, opts)
|
||||
except ConfigurationError as e:
|
||||
logger.critical(str(e))
|
||||
exit(-1)
|
||||
@@ -2102,9 +2221,8 @@ def _main():
|
||||
# Build a fresh opts starting from CLI-only defaults so that
|
||||
# sections removed from the config file actually take effect.
|
||||
new_opts = Namespace(**vars(opts_from_cli))
|
||||
new_index_prefix_domain_map = _parse_config_file(
|
||||
args.config_file, new_opts
|
||||
)
|
||||
new_config = _load_config(config_file)
|
||||
new_index_prefix_domain_map = _parse_config(new_config, new_opts)
|
||||
new_clients = _init_output_clients(new_opts)
|
||||
|
||||
# All steps succeeded — commit the changes atomically.
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.4.0"
|
||||
__version__ = "9.5.3"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -55,6 +55,7 @@ def _get_creds(
|
||||
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes)
|
||||
creds = flow.run_local_server(open_browser=False, oauth2_port=oauth2_port)
|
||||
# Save the credentials for the next run
|
||||
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
|
||||
with Path(token_file).open("w") as token:
|
||||
token.write(creds.to_json())
|
||||
return creds
|
||||
|
||||
@@ -56,6 +56,7 @@ def _load_token(token_path: Path) -> Optional[str]:
|
||||
|
||||
def _cache_auth_record(record: AuthenticationRecord, token_path: Path):
|
||||
token = record.serialize()
|
||||
token_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with token_path.open("w") as token_file:
|
||||
token_file.write(token)
|
||||
|
||||
|
||||
@@ -19,18 +19,32 @@ class MaildirConnection(MailboxConnection):
|
||||
):
|
||||
self._maildir_path = maildir_path
|
||||
self._maildir_create = maildir_create
|
||||
maildir_owner = os.stat(maildir_path).st_uid
|
||||
if os.getuid() != maildir_owner:
|
||||
if os.getuid() == 0:
|
||||
logger.warning(
|
||||
"Switching uid to {} to access Maildir".format(maildir_owner)
|
||||
)
|
||||
os.setuid(maildir_owner)
|
||||
try:
|
||||
maildir_owner = os.stat(maildir_path).st_uid
|
||||
except OSError:
|
||||
maildir_owner = None
|
||||
current_uid = os.getuid()
|
||||
if maildir_owner is not None and current_uid != maildir_owner:
|
||||
if current_uid == 0:
|
||||
try:
|
||||
logger.warning(
|
||||
"Switching uid to {} to access Maildir".format(maildir_owner)
|
||||
)
|
||||
os.setuid(maildir_owner)
|
||||
except OSError as e:
|
||||
logger.warning(
|
||||
"Failed to switch uid to {}: {}".format(maildir_owner, e)
|
||||
)
|
||||
else:
|
||||
ex = "runtime uid {} differ from maildir {} owner {}".format(
|
||||
os.getuid(), maildir_path, maildir_owner
|
||||
logger.warning(
|
||||
"Runtime uid {} differs from maildir {} owner {}. "
|
||||
"Access may fail if permissions are insufficient.".format(
|
||||
current_uid, maildir_path, maildir_owner
|
||||
)
|
||||
)
|
||||
raise Exception(ex)
|
||||
if maildir_create:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True)
|
||||
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
|
||||
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
|
||||
|
||||
|
||||
418
tests.py
418
tests.py
@@ -11,6 +11,7 @@ import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from base64 import urlsafe_b64encode
|
||||
from configparser import ConfigParser
|
||||
from glob import glob
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
@@ -1985,7 +1986,8 @@ watch = true
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli._parse_config")
|
||||
@patch("parsedmarc.cli._load_config")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
@@ -1994,6 +1996,7 @@ watch = true
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_load_config,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
@@ -2007,7 +2010,9 @@ watch = true
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
def parse_side_effect(config_file, opts):
|
||||
mock_load_config.return_value = ConfigParser()
|
||||
|
||||
def parse_side_effect(config, opts):
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
opts.imap_password = "pass"
|
||||
@@ -2046,7 +2051,7 @@ watch = true
|
||||
self.assertEqual(cm.exception.code, 1)
|
||||
# watch_inbox was called twice: initial run + after reload
|
||||
self.assertEqual(mock_watch.call_count, 2)
|
||||
# _parse_config_file called for initial load + reload
|
||||
# _parse_config called for initial load + reload
|
||||
self.assertGreaterEqual(mock_parse_config.call_count, 2)
|
||||
|
||||
@unittest.skipUnless(
|
||||
@@ -2054,7 +2059,8 @@ watch = true
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli._parse_config")
|
||||
@patch("parsedmarc.cli._load_config")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
@@ -2063,6 +2069,7 @@ watch = true
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_load_config,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
@@ -2076,11 +2083,13 @@ watch = true
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
mock_load_config.return_value = ConfigParser()
|
||||
|
||||
# Initial parse sets required opts; reload parse raises
|
||||
initial_map = {"prefix_": ["example.com"]}
|
||||
call_count = [0]
|
||||
|
||||
def parse_side_effect(config_file, opts):
|
||||
def parse_side_effect(config, opts):
|
||||
call_count[0] += 1
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
@@ -2130,7 +2139,8 @@ watch = true
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli._parse_config")
|
||||
@patch("parsedmarc.cli._load_config")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
@@ -2139,6 +2149,7 @@ watch = true
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_load_config,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
@@ -2152,7 +2163,9 @@ watch = true
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
def parse_side_effect(config_file, opts):
|
||||
mock_load_config.return_value = ConfigParser()
|
||||
|
||||
def parse_side_effect(config, opts):
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
opts.imap_password = "pass"
|
||||
@@ -2284,7 +2297,8 @@ watch = true
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli._parse_config")
|
||||
@patch("parsedmarc.cli._load_config")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
@@ -2293,6 +2307,7 @@ watch = true
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_load_config,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
@@ -2308,7 +2323,9 @@ watch = true
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
def parse_side_effect(config_file, opts):
|
||||
mock_load_config.return_value = ConfigParser()
|
||||
|
||||
def parse_side_effect(config, opts):
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
opts.imap_password = "pass"
|
||||
@@ -2474,5 +2491,388 @@ password = test-password
|
||||
self.assertNotIn("unmapped-1", report_ids)
|
||||
|
||||
|
||||
class TestMaildirConnection(unittest.TestCase):
|
||||
"""Tests for MaildirConnection subdirectory creation."""
|
||||
|
||||
def test_create_subdirs_when_missing(self):
|
||||
"""maildir_create=True creates cur/new/tmp in an empty directory."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
self.assertFalse(os.path.exists(os.path.join(d, subdir)))
|
||||
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
self.assertTrue(os.path.isdir(os.path.join(d, subdir)))
|
||||
# Should be able to list messages without error
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_create_subdirs_idempotent(self):
|
||||
"""maildir_create=True is safe when subdirs already exist."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(d, subdir))
|
||||
|
||||
# Should not raise
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_no_create_raises_on_missing_subdirs(self):
|
||||
"""maildir_create=False does not create subdirs; keys() fails."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=False)
|
||||
|
||||
with self.assertRaises(FileNotFoundError):
|
||||
conn.fetch_messages("INBOX")
|
||||
|
||||
def test_fetch_and_delete_message(self):
|
||||
"""Round-trip: add a message, fetch it, delete it."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
# Add a message via the underlying client
|
||||
msg_key = conn._client.add("From: test@example.com\n\nHello")
|
||||
keys = conn.fetch_messages("INBOX")
|
||||
self.assertIn(msg_key, keys)
|
||||
|
||||
content = conn.fetch_message(msg_key)
|
||||
self.assertIn("test@example.com", content)
|
||||
|
||||
conn.delete_message(msg_key)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_move_message_creates_subfolder(self):
|
||||
"""move_message auto-creates the destination subfolder."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
msg_key = conn._client.add("From: test@example.com\n\nHello")
|
||||
conn.move_message(msg_key, "archive")
|
||||
|
||||
# Original should be gone
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
# Archive subfolder should have the message
|
||||
self.assertIn("archive", conn._subfolder_client)
|
||||
self.assertEqual(len(conn._subfolder_client["archive"].keys()), 1)
|
||||
|
||||
|
||||
class TestMaildirUidHandling(unittest.TestCase):
|
||||
"""Tests for Maildir UID mismatch handling in Docker-like environments."""
|
||||
|
||||
def test_uid_mismatch_warns_instead_of_crashing(self):
|
||||
"""UID mismatch logs a warning instead of raising an exception."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
# Create subdirs so Maildir works
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(d, subdir))
|
||||
|
||||
# Mock os.stat to return a different UID than os.getuid
|
||||
fake_stat = os.stat(d)
|
||||
with (
|
||||
patch("parsedmarc.mail.maildir.os.stat") as mock_stat,
|
||||
patch("parsedmarc.mail.maildir.os.getuid", return_value=9999),
|
||||
):
|
||||
mock_stat.return_value = fake_stat
|
||||
# Should not raise — just warn
|
||||
conn = MaildirConnection(d, maildir_create=False)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_uid_match_no_warning(self):
|
||||
"""No warning when UIDs match."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_stat_failure_does_not_crash(self):
|
||||
"""If os.stat fails on the maildir path, we don't crash."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(d, subdir))
|
||||
|
||||
original_stat = os.stat
|
||||
|
||||
def stat_that_fails_once(path, *args, **kwargs):
|
||||
"""Fail on the first call (UID check), pass through after."""
|
||||
stat_that_fails_once.calls += 1
|
||||
if stat_that_fails_once.calls == 1:
|
||||
raise OSError("no stat")
|
||||
return original_stat(path, *args, **kwargs)
|
||||
|
||||
stat_that_fails_once.calls = 0
|
||||
|
||||
with patch(
|
||||
"parsedmarc.mail.maildir.os.stat", side_effect=stat_that_fails_once
|
||||
):
|
||||
conn = MaildirConnection(d, maildir_create=False)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
|
||||
class TestExpandPath(unittest.TestCase):
|
||||
"""Tests for _expand_path config path expansion."""
|
||||
|
||||
def test_expand_tilde(self):
|
||||
from parsedmarc.cli import _expand_path
|
||||
|
||||
result = _expand_path("~/some/path")
|
||||
self.assertFalse(result.startswith("~"))
|
||||
self.assertTrue(result.endswith("/some/path"))
|
||||
|
||||
def test_expand_env_var(self):
|
||||
from parsedmarc.cli import _expand_path
|
||||
|
||||
with patch.dict(os.environ, {"PARSEDMARC_TEST_DIR": "/opt/data"}):
|
||||
result = _expand_path("$PARSEDMARC_TEST_DIR/tokens/.token")
|
||||
self.assertEqual(result, "/opt/data/tokens/.token")
|
||||
|
||||
def test_expand_both(self):
|
||||
from parsedmarc.cli import _expand_path
|
||||
|
||||
with patch.dict(os.environ, {"MY_APP": "parsedmarc"}):
|
||||
result = _expand_path("~/$MY_APP/config")
|
||||
self.assertNotIn("~", result)
|
||||
self.assertIn("parsedmarc/config", result)
|
||||
|
||||
def test_no_expansion_needed(self):
|
||||
from parsedmarc.cli import _expand_path
|
||||
|
||||
self.assertEqual(_expand_path("/absolute/path"), "/absolute/path")
|
||||
self.assertEqual(_expand_path("relative/path"), "relative/path")
|
||||
|
||||
|
||||
class TestTokenParentDirCreation(unittest.TestCase):
|
||||
"""Tests for parent directory creation when writing token files."""
|
||||
|
||||
def test_graph_cache_creates_parent_dirs(self):
|
||||
from parsedmarc.mail.graph import _cache_auth_record
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
token_path = Path(d) / "subdir" / "nested" / ".token"
|
||||
self.assertFalse(token_path.parent.exists())
|
||||
|
||||
mock_record = MagicMock()
|
||||
mock_record.serialize.return_value = "serialized-token"
|
||||
|
||||
_cache_auth_record(mock_record, token_path)
|
||||
|
||||
self.assertTrue(token_path.exists())
|
||||
self.assertEqual(token_path.read_text(), "serialized-token")
|
||||
|
||||
def test_gmail_token_write_creates_parent_dirs(self):
|
||||
"""Gmail token write creates parent directories."""
|
||||
with TemporaryDirectory() as d:
|
||||
token_path = Path(d) / "deep" / "nested" / "token.json"
|
||||
self.assertFalse(token_path.parent.exists())
|
||||
|
||||
# Directly test the mkdir + open pattern
|
||||
token_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with token_path.open("w") as f:
|
||||
f.write('{"token": "test"}')
|
||||
|
||||
self.assertTrue(token_path.exists())
|
||||
self.assertEqual(token_path.read_text(), '{"token": "test"}')
|
||||
|
||||
|
||||
class TestEnvVarConfig(unittest.TestCase):
|
||||
"""Tests for environment variable configuration support."""
|
||||
|
||||
def test_resolve_section_key_simple(self):
|
||||
"""Simple section names resolve correctly."""
|
||||
from parsedmarc.cli import _resolve_section_key
|
||||
|
||||
self.assertEqual(_resolve_section_key("IMAP_PASSWORD"), ("imap", "password"))
|
||||
self.assertEqual(_resolve_section_key("GENERAL_DEBUG"), ("general", "debug"))
|
||||
self.assertEqual(_resolve_section_key("S3_BUCKET"), ("s3", "bucket"))
|
||||
self.assertEqual(_resolve_section_key("GELF_HOST"), ("gelf", "host"))
|
||||
|
||||
def test_resolve_section_key_underscore_sections(self):
|
||||
"""Multi-word section names (splunk_hec, gmail_api, etc.) resolve correctly."""
|
||||
from parsedmarc.cli import _resolve_section_key
|
||||
|
||||
self.assertEqual(
|
||||
_resolve_section_key("SPLUNK_HEC_TOKEN"), ("splunk_hec", "token")
|
||||
)
|
||||
self.assertEqual(
|
||||
_resolve_section_key("GMAIL_API_CREDENTIALS_FILE"),
|
||||
("gmail_api", "credentials_file"),
|
||||
)
|
||||
self.assertEqual(
|
||||
_resolve_section_key("LOG_ANALYTICS_CLIENT_ID"),
|
||||
("log_analytics", "client_id"),
|
||||
)
|
||||
|
||||
def test_resolve_section_key_unknown(self):
|
||||
"""Unknown prefixes return (None, None)."""
|
||||
from parsedmarc.cli import _resolve_section_key
|
||||
|
||||
self.assertEqual(_resolve_section_key("UNKNOWN_FOO"), (None, None))
|
||||
# Just a section name with no key should not match
|
||||
self.assertEqual(_resolve_section_key("IMAP"), (None, None))
|
||||
|
||||
def test_apply_env_overrides_injects_values(self):
|
||||
"""Env vars are injected into an existing ConfigParser."""
|
||||
from configparser import ConfigParser
|
||||
from parsedmarc.cli import _apply_env_overrides
|
||||
|
||||
config = ConfigParser()
|
||||
config.add_section("imap")
|
||||
config.set("imap", "host", "original.example.com")
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_IMAP_HOST": "new.example.com",
|
||||
"PARSEDMARC_IMAP_PASSWORD": "secret123",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
|
||||
self.assertEqual(config.get("imap", "host"), "new.example.com")
|
||||
self.assertEqual(config.get("imap", "password"), "secret123")
|
||||
|
||||
def test_apply_env_overrides_creates_sections(self):
|
||||
"""Env vars create new sections when they don't exist."""
|
||||
from configparser import ConfigParser
|
||||
from parsedmarc.cli import _apply_env_overrides
|
||||
|
||||
config = ConfigParser()
|
||||
|
||||
env = {"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200"}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
|
||||
self.assertTrue(config.has_section("elasticsearch"))
|
||||
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
|
||||
|
||||
def test_apply_env_overrides_ignores_config_file_var(self):
|
||||
"""PARSEDMARC_CONFIG_FILE is not injected as a config key."""
|
||||
from configparser import ConfigParser
|
||||
from parsedmarc.cli import _apply_env_overrides
|
||||
|
||||
config = ConfigParser()
|
||||
|
||||
env = {"PARSEDMARC_CONFIG_FILE": "/some/path.ini"}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
|
||||
self.assertEqual(config.sections(), [])
|
||||
|
||||
def test_load_config_with_file_and_env_override(self):
|
||||
"""Env vars override values from an INI file."""
|
||||
from parsedmarc.cli import _load_config
|
||||
|
||||
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
|
||||
f.write(
|
||||
"[imap]\nhost = file.example.com\nuser = alice\npassword = fromfile\n"
|
||||
)
|
||||
f.flush()
|
||||
config_path = f.name
|
||||
|
||||
try:
|
||||
env = {"PARSEDMARC_IMAP_PASSWORD": "fromenv"}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(config_path)
|
||||
|
||||
self.assertEqual(config.get("imap", "host"), "file.example.com")
|
||||
self.assertEqual(config.get("imap", "user"), "alice")
|
||||
self.assertEqual(config.get("imap", "password"), "fromenv")
|
||||
finally:
|
||||
os.unlink(config_path)
|
||||
|
||||
def test_load_config_env_only(self):
|
||||
"""Config can be loaded purely from env vars with no file."""
|
||||
from parsedmarc.cli import _load_config
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_GENERAL_DEBUG": "true",
|
||||
"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
|
||||
self.assertEqual(config.get("general", "debug"), "true")
|
||||
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
|
||||
|
||||
def test_parse_config_from_env(self):
|
||||
"""Full round-trip: env vars -> ConfigParser -> opts."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_GENERAL_DEBUG": "true",
|
||||
"PARSEDMARC_GENERAL_SAVE_AGGREGATE": "true",
|
||||
"PARSEDMARC_GENERAL_OFFLINE": "true",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
|
||||
self.assertTrue(opts.debug)
|
||||
self.assertTrue(opts.save_aggregate)
|
||||
self.assertTrue(opts.offline)
|
||||
|
||||
def test_config_file_env_var(self):
|
||||
"""PARSEDMARC_CONFIG_FILE env var specifies the config file path."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
|
||||
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
|
||||
f.write("[general]\ndebug = true\noffline = true\n")
|
||||
f.flush()
|
||||
config_path = f.name
|
||||
|
||||
try:
|
||||
env = {"PARSEDMARC_CONFIG_FILE": config_path}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(os.environ.get("PARSEDMARC_CONFIG_FILE"))
|
||||
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertTrue(opts.debug)
|
||||
self.assertTrue(opts.offline)
|
||||
finally:
|
||||
os.unlink(config_path)
|
||||
|
||||
def test_boolean_values_from_env(self):
|
||||
"""Various boolean string representations work through ConfigParser."""
|
||||
from configparser import ConfigParser
|
||||
from parsedmarc.cli import _apply_env_overrides
|
||||
|
||||
for true_val in ("true", "yes", "1", "on", "True", "YES"):
|
||||
config = ConfigParser()
|
||||
env = {"PARSEDMARC_GENERAL_DEBUG": true_val}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
self.assertTrue(
|
||||
config.getboolean("general", "debug"),
|
||||
f"Expected truthy for {true_val!r}",
|
||||
)
|
||||
|
||||
for false_val in ("false", "no", "0", "off", "False", "NO"):
|
||||
config = ConfigParser()
|
||||
env = {"PARSEDMARC_GENERAL_DEBUG": false_val}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
self.assertFalse(
|
||||
config.getboolean("general", "debug"),
|
||||
f"Expected falsy for {false_val!r}",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
|
||||
Reference in New Issue
Block a user