mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-28 01:12:54 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a13f38ac6 | ||
|
|
33ab4d9de9 | ||
|
|
f49ca0863d | ||
|
|
e1851d026a | ||
|
|
1542936468 |
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
@@ -26,11 +26,13 @@
|
||||
"boto",
|
||||
"brakhane",
|
||||
"Brightmail",
|
||||
"cafile",
|
||||
"CEST",
|
||||
"CHACHA",
|
||||
"checkdmarc",
|
||||
"Codecov",
|
||||
"confnew",
|
||||
"creds",
|
||||
"dateparser",
|
||||
"dateutil",
|
||||
"Davmail",
|
||||
@@ -130,6 +132,7 @@
|
||||
"sdist",
|
||||
"Servernameone",
|
||||
"setuptools",
|
||||
"signum",
|
||||
"smartquotes",
|
||||
"SMTPTLS",
|
||||
"sortlists",
|
||||
|
||||
@@ -42,7 +42,7 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
|
||||
### Key modules
|
||||
|
||||
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
|
||||
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration
|
||||
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing (`_load_config` + `_parse_config`), output orchestration. Supports configuration via INI files, `PARSEDMARC_{SECTION}_{KEY}` environment variables, or both (env vars override file values).
|
||||
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
|
||||
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
|
||||
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
|
||||
@@ -52,6 +52,10 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
|
||||
|
||||
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError` → `InvalidDMARCReport` → `InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
|
||||
|
||||
### Configuration
|
||||
|
||||
Config priority: CLI args > env vars > config file > defaults. Env var naming: `PARSEDMARC_{SECTION}_{KEY}` (e.g. `PARSEDMARC_IMAP_PASSWORD`). Section names with underscores use longest-prefix matching (`PARSEDMARC_SPLUNK_HEC_TOKEN` → `[splunk_hec] token`). Some INI keys have short aliases for env var friendliness (e.g. `[maildir] create` for `maildir_create`). File path values are expanded via `os.path.expanduser`/`os.path.expandvars`. Config can be loaded purely from env vars with no file (`PARSEDMARC_CONFIG_FILE` sets the file path).
|
||||
|
||||
### Caching
|
||||
|
||||
IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour (via `ExpiringDict`).
|
||||
@@ -62,3 +66,6 @@ IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour
|
||||
- TypedDict for structured data, type hints throughout
|
||||
- Python ≥3.10 required
|
||||
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`
|
||||
- File path config values must be wrapped with `_expand_path()` in `cli.py`
|
||||
- Maildir UID checks are intentionally relaxed (warn, don't crash) for Docker compatibility
|
||||
- Token file writes must create parent directories before opening for write
|
||||
|
||||
20
CHANGELOG.md
20
CHANGELOG.md
@@ -1,5 +1,25 @@
|
||||
# Changelog
|
||||
|
||||
## 9.5.5
|
||||
|
||||
### Fixed
|
||||
|
||||
- Output client initialization now retries up to 4 times with exponential backoff before exiting. This fixes persistent `Connection refused` errors in Docker when OpenSearch or Elasticsearch is momentarily unavailable at startup.
|
||||
- Use tuple format for `http_auth` in OpenSearch and Elasticsearch connections, matching the documented convention and avoiding potential issues if the password contains a colon.
|
||||
- Fix current_time format for MSGraphConnection (current-time) (PR #708)
|
||||
|
||||
### Changes
|
||||
|
||||
- Added debug logging to all output client initialization (S3, syslog, Splunk HEC, Kafka, GELF, webhook, Elasticsearch, OpenSearch).
|
||||
- `DEBUG=true` and `PARSEDMARC_DEBUG=true` are now accepted as short aliases for `PARSEDMARC_GENERAL_DEBUG=true`.
|
||||
|
||||
## 9.5.4
|
||||
|
||||
### Fixed
|
||||
|
||||
- Maildir `fetch_messages` now respects the `reports_folder` argument. Previously it always read from the top-level Maildir, ignoring the configured reports folder. `fetch_message`, `delete_message`, and `move_message` now also operate on the correct active folder.
|
||||
- Config key aliases for env var compatibility: `[maildir] create` and `path` are now accepted as aliases for `maildir_create` and `maildir_path`, and `[msgraph] url` for `graph_url`. This allows natural env var names like `PARSEDMARC_MAILDIR_CREATE` to work without the redundant `PARSEDMARC_MAILDIR_MAILDIR_CREATE`.
|
||||
|
||||
## 9.5.3
|
||||
|
||||
### Fixed
|
||||
|
||||
@@ -1956,7 +1956,7 @@ def get_dmarc_reports_from_mailbox(
|
||||
current_time = datetime.now(timezone.utc).strftime("%d-%b-%Y")
|
||||
elif isinstance(connection, MSGraphConnection):
|
||||
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).isoformat()
|
||||
current_time = datetime.now(timezone.utc).isoformat() + "Z"
|
||||
current_time = datetime.now(timezone.utc).isoformat()
|
||||
elif isinstance(connection, GmailConnection):
|
||||
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime(
|
||||
"%s"
|
||||
|
||||
@@ -9,6 +9,7 @@ import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from argparse import ArgumentParser, Namespace
|
||||
from configparser import ConfigParser
|
||||
from glob import glob
|
||||
@@ -135,12 +136,20 @@ def _apply_env_overrides(config: ConfigParser) -> None:
|
||||
"""
|
||||
prefix = "PARSEDMARC_"
|
||||
|
||||
for env_key, env_value in os.environ.items():
|
||||
if not env_key.startswith(prefix) or env_key == "PARSEDMARC_CONFIG_FILE":
|
||||
continue
|
||||
# Short aliases that don't follow the PARSEDMARC_{SECTION}_{KEY} pattern.
|
||||
_ENV_ALIASES = {
|
||||
"DEBUG": ("general", "debug"),
|
||||
"PARSEDMARC_DEBUG": ("general", "debug"),
|
||||
}
|
||||
|
||||
suffix = env_key[len(prefix) :]
|
||||
section, key = _resolve_section_key(suffix)
|
||||
for env_key, env_value in os.environ.items():
|
||||
if env_key in _ENV_ALIASES:
|
||||
section, key = _ENV_ALIASES[env_key]
|
||||
elif env_key.startswith(prefix) and env_key != "PARSEDMARC_CONFIG_FILE":
|
||||
suffix = env_key[len(prefix) :]
|
||||
section, key = _resolve_section_key(suffix)
|
||||
else:
|
||||
continue
|
||||
|
||||
if section is None:
|
||||
logger.debug("Ignoring unrecognized env var: %s", env_key)
|
||||
@@ -581,6 +590,8 @@ def _parse_config(config: ConfigParser, opts):
|
||||
|
||||
if "graph_url" in graph_config:
|
||||
opts.graph_url = graph_config["graph_url"]
|
||||
elif "url" in graph_config:
|
||||
opts.graph_url = graph_config["url"]
|
||||
|
||||
if "allow_unencrypted_storage" in graph_config:
|
||||
opts.graph_allow_unencrypted_storage = bool(
|
||||
@@ -884,10 +895,15 @@ def _parse_config(config: ConfigParser, opts):
|
||||
|
||||
if "maildir" in config.sections():
|
||||
maildir_api_config = config["maildir"]
|
||||
maildir_p = maildir_api_config.get("maildir_path")
|
||||
maildir_p = maildir_api_config.get(
|
||||
"maildir_path", maildir_api_config.get("path")
|
||||
)
|
||||
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
|
||||
opts.maildir_create = bool(
|
||||
maildir_api_config.getboolean("maildir_create", fallback=False)
|
||||
maildir_api_config.getboolean(
|
||||
"maildir_create",
|
||||
fallback=maildir_api_config.getboolean("create", fallback=False),
|
||||
)
|
||||
)
|
||||
|
||||
if "log_analytics" in config.sections():
|
||||
@@ -981,6 +997,7 @@ def _init_output_clients(opts):
|
||||
|
||||
try:
|
||||
if opts.s3_bucket:
|
||||
logger.debug("Initializing S3 client: bucket=%s", opts.s3_bucket)
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
bucket_path=opts.s3_path,
|
||||
@@ -994,6 +1011,11 @@ def _init_output_clients(opts):
|
||||
|
||||
try:
|
||||
if opts.syslog_server:
|
||||
logger.debug(
|
||||
"Initializing syslog client: server=%s:%s",
|
||||
opts.syslog_server,
|
||||
opts.syslog_port,
|
||||
)
|
||||
clients["syslog_client"] = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
@@ -1018,6 +1040,7 @@ def _init_output_clients(opts):
|
||||
"HEC token and HEC index are required when using HEC URL"
|
||||
)
|
||||
try:
|
||||
logger.debug("Initializing Splunk HEC client: url=%s", opts.hec)
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
@@ -1029,6 +1052,7 @@ def _init_output_clients(opts):
|
||||
|
||||
try:
|
||||
if opts.kafka_hosts:
|
||||
logger.debug("Initializing Kafka client: hosts=%s", opts.kafka_hosts)
|
||||
ssl_context = None
|
||||
if opts.kafka_skip_certificate_verification:
|
||||
logger.debug("Skipping Kafka certificate verification")
|
||||
@@ -1046,6 +1070,11 @@ def _init_output_clients(opts):
|
||||
|
||||
try:
|
||||
if opts.gelf_host:
|
||||
logger.debug(
|
||||
"Initializing GELF client: host=%s:%s",
|
||||
opts.gelf_host,
|
||||
opts.gelf_port,
|
||||
)
|
||||
clients["gelf_client"] = gelf.GelfClient(
|
||||
host=opts.gelf_host,
|
||||
port=int(opts.gelf_port),
|
||||
@@ -1060,6 +1089,7 @@ def _init_output_clients(opts):
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
logger.debug("Initializing webhook client")
|
||||
clients["webhook_client"] = webhook.WebhookClient(
|
||||
aggregate_url=opts.webhook_aggregate_url,
|
||||
forensic_url=opts.webhook_forensic_url,
|
||||
@@ -1072,11 +1102,16 @@ def _init_output_clients(opts):
|
||||
# Elasticsearch and OpenSearch mutate module-level global state via
|
||||
# connections.create_connection(), which cannot be rolled back if a later
|
||||
# step fails. Initialise them last so that all other clients are created
|
||||
# successfully first; this minimises the window for partial-init problems
|
||||
# successfully first; this minimizes the window for partial-init problems
|
||||
# during config reload.
|
||||
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
||||
try:
|
||||
if opts.elasticsearch_hosts:
|
||||
logger.debug(
|
||||
"Initializing Elasticsearch client: hosts=%s, ssl=%s",
|
||||
opts.elasticsearch_hosts,
|
||||
opts.elasticsearch_ssl,
|
||||
)
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
es_forensic_index = "dmarc_forensic"
|
||||
es_smtp_tls_index = "smtp_tls"
|
||||
@@ -1115,6 +1150,11 @@ def _init_output_clients(opts):
|
||||
|
||||
try:
|
||||
if opts.opensearch_hosts:
|
||||
logger.debug(
|
||||
"Initializing OpenSearch client: hosts=%s, ssl=%s",
|
||||
opts.opensearch_hosts,
|
||||
opts.opensearch_ssl,
|
||||
)
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
os_forensic_index = "dmarc_forensic"
|
||||
os_smtp_tls_index = "smtp_tls"
|
||||
@@ -1842,15 +1882,31 @@ def _main():
|
||||
|
||||
logger.info("Starting parsedmarc")
|
||||
|
||||
# Initialize output clients
|
||||
try:
|
||||
clients = _init_output_clients(opts)
|
||||
except ConfigurationError as e:
|
||||
logger.critical(str(e))
|
||||
exit(1)
|
||||
except Exception as error_:
|
||||
logger.error("Output client error: {0}".format(error_))
|
||||
exit(1)
|
||||
# Initialize output clients (with retry for transient connection errors)
|
||||
clients = {}
|
||||
max_retries = 4
|
||||
retry_delay = 5
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
clients = _init_output_clients(opts)
|
||||
break
|
||||
except ConfigurationError as e:
|
||||
logger.critical(str(e))
|
||||
exit(1)
|
||||
except Exception as error_:
|
||||
if attempt < max_retries:
|
||||
logger.warning(
|
||||
"Output client error (attempt %d/%d, retrying in %ds): %s",
|
||||
attempt + 1,
|
||||
max_retries + 1,
|
||||
retry_delay,
|
||||
error_,
|
||||
)
|
||||
time.sleep(retry_delay)
|
||||
retry_delay *= 2
|
||||
else:
|
||||
logger.error("Output client error: {0}".format(error_))
|
||||
exit(1)
|
||||
|
||||
file_paths = []
|
||||
mbox_paths = []
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.5.3"
|
||||
__version__ = "9.5.5"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -299,7 +299,7 @@ def set_hosts(
|
||||
else:
|
||||
conn_params["verify_certs"] = True
|
||||
if username and password:
|
||||
conn_params["http_auth"] = username + ":" + password
|
||||
conn_params["http_auth"] = (username, password)
|
||||
if api_key:
|
||||
conn_params["api_key"] = api_key
|
||||
connections.create_connection(**conn_params)
|
||||
|
||||
@@ -46,16 +46,27 @@ class MaildirConnection(MailboxConnection):
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True)
|
||||
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
|
||||
self._active_folder: mailbox.Maildir = self._client
|
||||
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
|
||||
|
||||
def _get_folder(self, folder_name: str) -> mailbox.Maildir:
|
||||
"""Return a cached subfolder handle, creating it if needed."""
|
||||
if folder_name not in self._subfolder_client:
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
return self._subfolder_client[folder_name]
|
||||
|
||||
def create_folder(self, folder_name: str):
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
self._get_folder(folder_name)
|
||||
|
||||
def fetch_messages(self, reports_folder: str, **kwargs):
|
||||
return self._client.keys()
|
||||
if reports_folder and reports_folder != "INBOX":
|
||||
self._active_folder = self._get_folder(reports_folder)
|
||||
else:
|
||||
self._active_folder = self._client
|
||||
return self._active_folder.keys()
|
||||
|
||||
def fetch_message(self, message_id: str) -> str:
|
||||
msg = self._client.get(message_id)
|
||||
msg = self._active_folder.get(message_id)
|
||||
if msg is not None:
|
||||
msg = msg.as_string()
|
||||
if msg is not None:
|
||||
@@ -63,16 +74,15 @@ class MaildirConnection(MailboxConnection):
|
||||
return ""
|
||||
|
||||
def delete_message(self, message_id: str):
|
||||
self._client.remove(message_id)
|
||||
self._active_folder.remove(message_id)
|
||||
|
||||
def move_message(self, message_id: str, folder_name: str):
|
||||
message_data = self._client.get(message_id)
|
||||
message_data = self._active_folder.get(message_id)
|
||||
if message_data is None:
|
||||
return
|
||||
if folder_name not in self._subfolder_client:
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
self._subfolder_client[folder_name].add(message_data)
|
||||
self._client.remove(message_id)
|
||||
dest = self._get_folder(folder_name)
|
||||
dest.add(message_data)
|
||||
self._active_folder.remove(message_id)
|
||||
|
||||
def keepalive(self):
|
||||
return
|
||||
|
||||
@@ -298,6 +298,7 @@ def set_hosts(
|
||||
"""
|
||||
if not isinstance(hosts, list):
|
||||
hosts = [hosts]
|
||||
logger.debug("Connecting to OpenSearch: hosts=%s, use_ssl=%s", hosts, use_ssl)
|
||||
conn_params = {"hosts": hosts, "timeout": timeout}
|
||||
if use_ssl:
|
||||
conn_params["use_ssl"] = True
|
||||
@@ -323,7 +324,7 @@ def set_hosts(
|
||||
conn_params["connection_class"] = RequestsHttpConnection
|
||||
elif normalized_auth_type == "basic":
|
||||
if username and password:
|
||||
conn_params["http_auth"] = username + ":" + password
|
||||
conn_params["http_auth"] = (username, password)
|
||||
if api_key:
|
||||
conn_params["api_key"] = api_key
|
||||
else:
|
||||
|
||||
158
tests.py
158
tests.py
@@ -2566,6 +2566,164 @@ class TestMaildirConnection(unittest.TestCase):
|
||||
self.assertEqual(len(conn._subfolder_client["archive"].keys()), 1)
|
||||
|
||||
|
||||
class TestMaildirReportsFolder(unittest.TestCase):
|
||||
"""Tests for Maildir reports_folder support in fetch_messages."""
|
||||
|
||||
def test_fetch_from_subfolder(self):
|
||||
"""fetch_messages with a subfolder name reads from that subfolder."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
# Add message to a subfolder
|
||||
subfolder = conn._client.add_folder("reports")
|
||||
msg_key = subfolder.add("From: test@example.com\n\nSubfolder msg")
|
||||
|
||||
# Root should be empty
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
# Subfolder should have the message
|
||||
keys = conn.fetch_messages("reports")
|
||||
self.assertIn(msg_key, keys)
|
||||
|
||||
def test_fetch_message_uses_active_folder(self):
|
||||
"""fetch_message reads from the folder set by fetch_messages."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
subfolder = conn._client.add_folder("reports")
|
||||
msg_key = subfolder.add("From: sub@example.com\n\nIn subfolder")
|
||||
|
||||
conn.fetch_messages("reports")
|
||||
content = conn.fetch_message(msg_key)
|
||||
self.assertIn("sub@example.com", content)
|
||||
|
||||
def test_delete_message_uses_active_folder(self):
|
||||
"""delete_message removes from the folder set by fetch_messages."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
subfolder = conn._client.add_folder("reports")
|
||||
msg_key = subfolder.add("From: del@example.com\n\nDelete me")
|
||||
|
||||
conn.fetch_messages("reports")
|
||||
conn.delete_message(msg_key)
|
||||
self.assertEqual(conn.fetch_messages("reports"), [])
|
||||
|
||||
def test_move_message_from_subfolder(self):
|
||||
"""move_message works when active folder is a subfolder."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
subfolder = conn._client.add_folder("reports")
|
||||
msg_key = subfolder.add("From: move@example.com\n\nMove me")
|
||||
|
||||
conn.fetch_messages("reports")
|
||||
conn.move_message(msg_key, "archive")
|
||||
|
||||
# Source should be empty
|
||||
self.assertEqual(conn.fetch_messages("reports"), [])
|
||||
# Destination should have the message
|
||||
archive_keys = conn.fetch_messages("archive")
|
||||
self.assertEqual(len(archive_keys), 1)
|
||||
|
||||
def test_inbox_reads_root(self):
|
||||
"""INBOX reads from the top-level Maildir."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
msg_key = conn._client.add("From: root@example.com\n\nRoot msg")
|
||||
|
||||
keys = conn.fetch_messages("INBOX")
|
||||
self.assertIn(msg_key, keys)
|
||||
|
||||
def test_empty_folder_reads_root(self):
|
||||
"""Empty string reports_folder reads from the top-level Maildir."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
msg_key = conn._client.add("From: root@example.com\n\nRoot msg")
|
||||
|
||||
keys = conn.fetch_messages("")
|
||||
self.assertIn(msg_key, keys)
|
||||
|
||||
|
||||
class TestConfigAliases(unittest.TestCase):
|
||||
"""Tests for config key aliases (env var friendly short names)."""
|
||||
|
||||
def test_maildir_create_alias(self):
|
||||
"""[maildir] create works as alias for maildir_create."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_MAILDIR_CREATE": "true",
|
||||
"PARSEDMARC_MAILDIR_PATH": "/tmp/test",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertTrue(opts.maildir_create)
|
||||
|
||||
def test_maildir_path_alias(self):
|
||||
"""[maildir] path works as alias for maildir_path."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
|
||||
env = {"PARSEDMARC_MAILDIR_PATH": "/var/mail/dmarc"}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertEqual(opts.maildir_path, "/var/mail/dmarc")
|
||||
|
||||
def test_msgraph_url_alias(self):
|
||||
"""[msgraph] url works as alias for graph_url."""
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
from argparse import Namespace
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_MSGRAPH_AUTH_METHOD": "ClientSecret",
|
||||
"PARSEDMARC_MSGRAPH_CLIENT_ID": "test-id",
|
||||
"PARSEDMARC_MSGRAPH_CLIENT_SECRET": "test-secret",
|
||||
"PARSEDMARC_MSGRAPH_TENANT_ID": "test-tenant",
|
||||
"PARSEDMARC_MSGRAPH_MAILBOX": "test@example.com",
|
||||
"PARSEDMARC_MSGRAPH_URL": "https://custom.graph.example.com",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertEqual(opts.graph_url, "https://custom.graph.example.com")
|
||||
|
||||
def test_original_keys_still_work(self):
|
||||
"""Original INI key names (maildir_create, maildir_path) still work."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _parse_config
|
||||
|
||||
config = ConfigParser(interpolation=None)
|
||||
config.add_section("maildir")
|
||||
config.set("maildir", "maildir_path", "/original/path")
|
||||
config.set("maildir", "maildir_create", "true")
|
||||
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertEqual(opts.maildir_path, "/original/path")
|
||||
self.assertTrue(opts.maildir_create)
|
||||
|
||||
|
||||
class TestMaildirUidHandling(unittest.TestCase):
|
||||
"""Tests for Maildir UID mismatch handling in Docker-like environments."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user