mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-26 16:32:48 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1542936468 | ||
|
|
fb3c38a8b8 | ||
|
|
c9a6145505 |
2
.vscode/settings.json
vendored
2
.vscode/settings.json
vendored
@@ -52,6 +52,7 @@
|
||||
"geoipupdate",
|
||||
"Geolite",
|
||||
"geolocation",
|
||||
"getuid",
|
||||
"githubpages",
|
||||
"Grafana",
|
||||
"hostnames",
|
||||
@@ -75,6 +76,7 @@
|
||||
"LISTSERV",
|
||||
"loganalytics",
|
||||
"lxml",
|
||||
"Maildir",
|
||||
"mailparser",
|
||||
"mailrelay",
|
||||
"mailsuite",
|
||||
|
||||
@@ -42,7 +42,7 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
|
||||
### Key modules
|
||||
|
||||
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
|
||||
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration
|
||||
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing (`_load_config` + `_parse_config`), output orchestration. Supports configuration via INI files, `PARSEDMARC_{SECTION}_{KEY}` environment variables, or both (env vars override file values).
|
||||
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
|
||||
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
|
||||
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
|
||||
@@ -52,6 +52,10 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
|
||||
|
||||
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError` → `InvalidDMARCReport` → `InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
|
||||
|
||||
### Configuration
|
||||
|
||||
Config priority: CLI args > env vars > config file > defaults. Env var naming: `PARSEDMARC_{SECTION}_{KEY}` (e.g. `PARSEDMARC_IMAP_PASSWORD`). Section names with underscores use longest-prefix matching (`PARSEDMARC_SPLUNK_HEC_TOKEN` → `[splunk_hec] token`). Some INI keys have short aliases for env var friendliness (e.g. `[maildir] create` for `maildir_create`). File path values are expanded via `os.path.expanduser`/`os.path.expandvars`. Config can be loaded purely from env vars with no file (`PARSEDMARC_CONFIG_FILE` sets the file path).
|
||||
|
||||
### Caching
|
||||
|
||||
IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour (via `ExpiringDict`).
|
||||
@@ -62,3 +66,6 @@ IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour
|
||||
- TypedDict for structured data, type hints throughout
|
||||
- Python ≥3.10 required
|
||||
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`
|
||||
- File path config values must be wrapped with `_expand_path()` in `cli.py`
|
||||
- Maildir UID checks are intentionally relaxed (warn, don't crash) for Docker compatibility
|
||||
- Token file writes must create parent directories before opening for write
|
||||
|
||||
16
CHANGELOG.md
16
CHANGELOG.md
@@ -1,5 +1,21 @@
|
||||
# Changelog
|
||||
|
||||
## 9.5.4
|
||||
|
||||
### Fixed
|
||||
|
||||
- Maildir `fetch_messages` now respects the `reports_folder` argument. Previously it always read from the top-level Maildir, ignoring the configured reports folder. `fetch_message`, `delete_message`, and `move_message` now also operate on the correct active folder.
|
||||
- Config key aliases for env var compatibility: `[maildir] create` and `path` are now accepted as aliases for `maildir_create` and `maildir_path`, and `[msgraph] url` for `graph_url`. This allows natural env var names like `PARSEDMARC_MAILDIR_CREATE` to work without the redundant `PARSEDMARC_MAILDIR_MAILDIR_CREATE`.
|
||||
|
||||
## 9.5.3
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `FileNotFoundError` when using Maildir with Docker volume mounts. Python's `mailbox.Maildir(create=True)` only creates `cur/new/tmp` subdirectories when the top-level directory doesn't exist; Docker volume mounts pre-create the directory as empty, skipping subdirectory creation. parsedmarc now explicitly creates the subdirectories when `maildir_create` is enabled.
|
||||
- Maildir UID mismatch no longer crashes the process. In Docker containers where volume ownership differs from the container UID, parsedmarc now logs a warning instead of raising an exception. Also handles `os.setuid` failures gracefully in containers without `CAP_SETUID`.
|
||||
- Token file writes (MS Graph and Gmail) now create parent directories automatically, preventing `FileNotFoundError` when the token path points to a directory that doesn't yet exist.
|
||||
- File paths from config (`token_file`, `credentials_file`, `cert_path`, `log_file`, `output`, `ip_db_path`, `maildir_path`, syslog cert paths, etc.) now expand `~` and `$VAR` references via `os.path.expanduser`/`os.path.expandvars`.
|
||||
|
||||
## 9.5.2
|
||||
|
||||
### Fixed
|
||||
|
||||
@@ -75,6 +75,11 @@ def _str_to_list(s):
|
||||
return list(map(lambda i: i.lstrip(), _list))
|
||||
|
||||
|
||||
def _expand_path(p: str) -> str:
|
||||
"""Expand ``~`` and ``$VAR`` references in a file path."""
|
||||
return os.path.expanduser(os.path.expandvars(p))
|
||||
|
||||
|
||||
# All known INI config section names, used for env var resolution.
|
||||
_KNOWN_SECTIONS = frozenset(
|
||||
{
|
||||
@@ -302,7 +307,7 @@ def _parse_config(config: ConfigParser, opts):
|
||||
"normalize_timespan_threshold_hours"
|
||||
)
|
||||
if "index_prefix_domain_map" in general_config:
|
||||
with open(general_config["index_prefix_domain_map"]) as f:
|
||||
with open(_expand_path(general_config["index_prefix_domain_map"])) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
if "offline" in general_config:
|
||||
opts.offline = bool(general_config.getboolean("offline"))
|
||||
@@ -311,7 +316,7 @@ def _parse_config(config: ConfigParser, opts):
|
||||
general_config.getboolean("strip_attachment_payloads")
|
||||
)
|
||||
if "output" in general_config:
|
||||
opts.output = general_config["output"]
|
||||
opts.output = _expand_path(general_config["output"])
|
||||
if "aggregate_json_filename" in general_config:
|
||||
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
|
||||
if "forensic_json_filename" in general_config:
|
||||
@@ -367,11 +372,11 @@ def _parse_config(config: ConfigParser, opts):
|
||||
general_config.getboolean("fail_on_output_error")
|
||||
)
|
||||
if "log_file" in general_config:
|
||||
opts.log_file = general_config["log_file"]
|
||||
opts.log_file = _expand_path(general_config["log_file"])
|
||||
if "n_procs" in general_config:
|
||||
opts.n_procs = general_config.getint("n_procs")
|
||||
if "ip_db_path" in general_config:
|
||||
opts.ip_db_path = general_config["ip_db_path"]
|
||||
opts.ip_db_path = _expand_path(general_config["ip_db_path"])
|
||||
else:
|
||||
opts.ip_db_path = None
|
||||
if "always_use_local_files" in general_config:
|
||||
@@ -379,7 +384,9 @@ def _parse_config(config: ConfigParser, opts):
|
||||
general_config.getboolean("always_use_local_files")
|
||||
)
|
||||
if "local_reverse_dns_map_path" in general_config:
|
||||
opts.reverse_dns_map_path = general_config["local_reverse_dns_map_path"]
|
||||
opts.reverse_dns_map_path = _expand_path(
|
||||
general_config["local_reverse_dns_map_path"]
|
||||
)
|
||||
if "reverse_dns_map_url" in general_config:
|
||||
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
|
||||
if "prettify_json" in general_config:
|
||||
@@ -494,7 +501,7 @@ def _parse_config(config: ConfigParser, opts):
|
||||
|
||||
if "msgraph" in config.sections():
|
||||
graph_config = config["msgraph"]
|
||||
opts.graph_token_file = graph_config.get("token_file", ".token")
|
||||
opts.graph_token_file = _expand_path(graph_config.get("token_file", ".token"))
|
||||
|
||||
if "auth_method" not in graph_config:
|
||||
logger.info(
|
||||
@@ -548,7 +555,9 @@ def _parse_config(config: ConfigParser, opts):
|
||||
|
||||
if opts.graph_auth_method == AuthMethod.Certificate.name:
|
||||
if "certificate_path" in graph_config:
|
||||
opts.graph_certificate_path = graph_config["certificate_path"]
|
||||
opts.graph_certificate_path = _expand_path(
|
||||
graph_config["certificate_path"]
|
||||
)
|
||||
else:
|
||||
raise ConfigurationError(
|
||||
"certificate_path setting missing from the msgraph config section"
|
||||
@@ -572,6 +581,8 @@ def _parse_config(config: ConfigParser, opts):
|
||||
|
||||
if "graph_url" in graph_config:
|
||||
opts.graph_url = graph_config["graph_url"]
|
||||
elif "url" in graph_config:
|
||||
opts.graph_url = graph_config["url"]
|
||||
|
||||
if "allow_unencrypted_storage" in graph_config:
|
||||
opts.graph_allow_unencrypted_storage = bool(
|
||||
@@ -605,7 +616,9 @@ def _parse_config(config: ConfigParser, opts):
|
||||
if "ssl" in elasticsearch_config:
|
||||
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
|
||||
if "cert_path" in elasticsearch_config:
|
||||
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
|
||||
opts.elasticsearch_ssl_cert_path = _expand_path(
|
||||
elasticsearch_config["cert_path"]
|
||||
)
|
||||
if "skip_certificate_verification" in elasticsearch_config:
|
||||
opts.elasticsearch_skip_certificate_verification = bool(
|
||||
elasticsearch_config.getboolean("skip_certificate_verification")
|
||||
@@ -648,7 +661,7 @@ def _parse_config(config: ConfigParser, opts):
|
||||
if "ssl" in opensearch_config:
|
||||
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
|
||||
if "cert_path" in opensearch_config:
|
||||
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
|
||||
opts.opensearch_ssl_cert_path = _expand_path(opensearch_config["cert_path"])
|
||||
if "skip_certificate_verification" in opensearch_config:
|
||||
opts.opensearch_skip_certificate_verification = bool(
|
||||
opensearch_config.getboolean("skip_certificate_verification")
|
||||
@@ -775,7 +788,7 @@ def _parse_config(config: ConfigParser, opts):
|
||||
if "subject" in smtp_config:
|
||||
opts.smtp_subject = smtp_config["subject"]
|
||||
if "attachment" in smtp_config:
|
||||
opts.smtp_attachment = smtp_config["attachment"]
|
||||
opts.smtp_attachment = _expand_path(smtp_config["attachment"])
|
||||
if "message" in smtp_config:
|
||||
opts.smtp_message = smtp_config["message"]
|
||||
|
||||
@@ -822,11 +835,11 @@ def _parse_config(config: ConfigParser, opts):
|
||||
else:
|
||||
opts.syslog_protocol = "udp"
|
||||
if "cafile_path" in syslog_config:
|
||||
opts.syslog_cafile_path = syslog_config["cafile_path"]
|
||||
opts.syslog_cafile_path = _expand_path(syslog_config["cafile_path"])
|
||||
if "certfile_path" in syslog_config:
|
||||
opts.syslog_certfile_path = syslog_config["certfile_path"]
|
||||
opts.syslog_certfile_path = _expand_path(syslog_config["certfile_path"])
|
||||
if "keyfile_path" in syslog_config:
|
||||
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
|
||||
opts.syslog_keyfile_path = _expand_path(syslog_config["keyfile_path"])
|
||||
if "timeout" in syslog_config:
|
||||
opts.syslog_timeout = float(syslog_config["timeout"])
|
||||
else:
|
||||
@@ -842,8 +855,13 @@ def _parse_config(config: ConfigParser, opts):
|
||||
|
||||
if "gmail_api" in config.sections():
|
||||
gmail_api_config = config["gmail_api"]
|
||||
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
|
||||
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
|
||||
gmail_creds = gmail_api_config.get("credentials_file")
|
||||
opts.gmail_api_credentials_file = (
|
||||
_expand_path(gmail_creds) if gmail_creds else gmail_creds
|
||||
)
|
||||
opts.gmail_api_token_file = _expand_path(
|
||||
gmail_api_config.get("token_file", ".token")
|
||||
)
|
||||
opts.gmail_api_include_spam_trash = bool(
|
||||
gmail_api_config.getboolean("include_spam_trash", False)
|
||||
)
|
||||
@@ -868,9 +886,15 @@ def _parse_config(config: ConfigParser, opts):
|
||||
|
||||
if "maildir" in config.sections():
|
||||
maildir_api_config = config["maildir"]
|
||||
opts.maildir_path = maildir_api_config.get("maildir_path")
|
||||
maildir_p = maildir_api_config.get(
|
||||
"maildir_path", maildir_api_config.get("path")
|
||||
)
|
||||
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
|
||||
opts.maildir_create = bool(
|
||||
maildir_api_config.getboolean("maildir_create", fallback=False)
|
||||
maildir_api_config.getboolean(
|
||||
"maildir_create",
|
||||
fallback=maildir_api_config.getboolean("create", fallback=False),
|
||||
)
|
||||
)
|
||||
|
||||
if "log_analytics" in config.sections():
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.5.2"
|
||||
__version__ = "9.5.4"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -55,6 +55,7 @@ def _get_creds(
|
||||
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes)
|
||||
creds = flow.run_local_server(open_browser=False, oauth2_port=oauth2_port)
|
||||
# Save the credentials for the next run
|
||||
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
|
||||
with Path(token_file).open("w") as token:
|
||||
token.write(creds.to_json())
|
||||
return creds
|
||||
|
||||
@@ -56,6 +56,7 @@ def _load_token(token_path: Path) -> Optional[str]:
|
||||
|
||||
def _cache_auth_record(record: AuthenticationRecord, token_path: Path):
|
||||
token = record.serialize()
|
||||
token_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with token_path.open("w") as token_file:
|
||||
token_file.write(token)
|
||||
|
||||
|
||||
@@ -19,29 +19,54 @@ class MaildirConnection(MailboxConnection):
|
||||
):
|
||||
self._maildir_path = maildir_path
|
||||
self._maildir_create = maildir_create
|
||||
maildir_owner = os.stat(maildir_path).st_uid
|
||||
if os.getuid() != maildir_owner:
|
||||
if os.getuid() == 0:
|
||||
logger.warning(
|
||||
"Switching uid to {} to access Maildir".format(maildir_owner)
|
||||
)
|
||||
os.setuid(maildir_owner)
|
||||
try:
|
||||
maildir_owner = os.stat(maildir_path).st_uid
|
||||
except OSError:
|
||||
maildir_owner = None
|
||||
current_uid = os.getuid()
|
||||
if maildir_owner is not None and current_uid != maildir_owner:
|
||||
if current_uid == 0:
|
||||
try:
|
||||
logger.warning(
|
||||
"Switching uid to {} to access Maildir".format(maildir_owner)
|
||||
)
|
||||
os.setuid(maildir_owner)
|
||||
except OSError as e:
|
||||
logger.warning(
|
||||
"Failed to switch uid to {}: {}".format(maildir_owner, e)
|
||||
)
|
||||
else:
|
||||
ex = "runtime uid {} differ from maildir {} owner {}".format(
|
||||
os.getuid(), maildir_path, maildir_owner
|
||||
logger.warning(
|
||||
"Runtime uid {} differs from maildir {} owner {}. "
|
||||
"Access may fail if permissions are insufficient.".format(
|
||||
current_uid, maildir_path, maildir_owner
|
||||
)
|
||||
)
|
||||
raise Exception(ex)
|
||||
if maildir_create:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True)
|
||||
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
|
||||
self._active_folder: mailbox.Maildir = self._client
|
||||
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
|
||||
|
||||
def _get_folder(self, folder_name: str) -> mailbox.Maildir:
|
||||
"""Return a cached subfolder handle, creating it if needed."""
|
||||
if folder_name not in self._subfolder_client:
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
return self._subfolder_client[folder_name]
|
||||
|
||||
def create_folder(self, folder_name: str):
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
self._get_folder(folder_name)
|
||||
|
||||
def fetch_messages(self, reports_folder: str, **kwargs):
|
||||
return self._client.keys()
|
||||
if reports_folder and reports_folder != "INBOX":
|
||||
self._active_folder = self._get_folder(reports_folder)
|
||||
else:
|
||||
self._active_folder = self._client
|
||||
return self._active_folder.keys()
|
||||
|
||||
def fetch_message(self, message_id: str) -> str:
|
||||
msg = self._client.get(message_id)
|
||||
msg = self._active_folder.get(message_id)
|
||||
if msg is not None:
|
||||
msg = msg.as_string()
|
||||
if msg is not None:
|
||||
@@ -49,16 +74,15 @@ class MaildirConnection(MailboxConnection):
|
||||
return ""
|
||||
|
||||
def delete_message(self, message_id: str):
|
||||
self._client.remove(message_id)
|
||||
self._active_folder.remove(message_id)
|
||||
|
||||
def move_message(self, message_id: str, folder_name: str):
|
||||
message_data = self._client.get(message_id)
|
||||
message_data = self._active_folder.get(message_id)
|
||||
if message_data is None:
|
||||
return
|
||||
if folder_name not in self._subfolder_client:
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
self._subfolder_client[folder_name].add(message_data)
|
||||
self._client.remove(message_id)
|
||||
dest = self._get_folder(folder_name)
|
||||
dest.add(message_data)
|
||||
self._active_folder.remove(message_id)
|
||||
|
||||
def keepalive(self):
|
||||
return
|
||||
|
||||
355
tests.py
355
tests.py
@@ -2491,6 +2491,361 @@ password = test-password
|
||||
self.assertNotIn("unmapped-1", report_ids)
|
||||
|
||||
|
||||
class TestMaildirConnection(unittest.TestCase):
|
||||
"""Tests for MaildirConnection subdirectory creation."""
|
||||
|
||||
def test_create_subdirs_when_missing(self):
|
||||
"""maildir_create=True creates cur/new/tmp in an empty directory."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
self.assertFalse(os.path.exists(os.path.join(d, subdir)))
|
||||
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
self.assertTrue(os.path.isdir(os.path.join(d, subdir)))
|
||||
# Should be able to list messages without error
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_create_subdirs_idempotent(self):
|
||||
"""maildir_create=True is safe when subdirs already exist."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(d, subdir))
|
||||
|
||||
# Should not raise
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_no_create_raises_on_missing_subdirs(self):
|
||||
"""maildir_create=False does not create subdirs; keys() fails."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=False)
|
||||
|
||||
with self.assertRaises(FileNotFoundError):
|
||||
conn.fetch_messages("INBOX")
|
||||
|
||||
def test_fetch_and_delete_message(self):
|
||||
"""Round-trip: add a message, fetch it, delete it."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
# Add a message via the underlying client
|
||||
msg_key = conn._client.add("From: test@example.com\n\nHello")
|
||||
keys = conn.fetch_messages("INBOX")
|
||||
self.assertIn(msg_key, keys)
|
||||
|
||||
content = conn.fetch_message(msg_key)
|
||||
self.assertIn("test@example.com", content)
|
||||
|
||||
conn.delete_message(msg_key)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_move_message_creates_subfolder(self):
|
||||
"""move_message auto-creates the destination subfolder."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
msg_key = conn._client.add("From: test@example.com\n\nHello")
|
||||
conn.move_message(msg_key, "archive")
|
||||
|
||||
# Original should be gone
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
# Archive subfolder should have the message
|
||||
self.assertIn("archive", conn._subfolder_client)
|
||||
self.assertEqual(len(conn._subfolder_client["archive"].keys()), 1)
|
||||
|
||||
|
||||
class TestMaildirReportsFolder(unittest.TestCase):
|
||||
"""Tests for Maildir reports_folder support in fetch_messages."""
|
||||
|
||||
def test_fetch_from_subfolder(self):
|
||||
"""fetch_messages with a subfolder name reads from that subfolder."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
# Add message to a subfolder
|
||||
subfolder = conn._client.add_folder("reports")
|
||||
msg_key = subfolder.add("From: test@example.com\n\nSubfolder msg")
|
||||
|
||||
# Root should be empty
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
# Subfolder should have the message
|
||||
keys = conn.fetch_messages("reports")
|
||||
self.assertIn(msg_key, keys)
|
||||
|
||||
def test_fetch_message_uses_active_folder(self):
|
||||
"""fetch_message reads from the folder set by fetch_messages."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
subfolder = conn._client.add_folder("reports")
|
||||
msg_key = subfolder.add("From: sub@example.com\n\nIn subfolder")
|
||||
|
||||
conn.fetch_messages("reports")
|
||||
content = conn.fetch_message(msg_key)
|
||||
self.assertIn("sub@example.com", content)
|
||||
|
||||
def test_delete_message_uses_active_folder(self):
|
||||
"""delete_message removes from the folder set by fetch_messages."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
subfolder = conn._client.add_folder("reports")
|
||||
msg_key = subfolder.add("From: del@example.com\n\nDelete me")
|
||||
|
||||
conn.fetch_messages("reports")
|
||||
conn.delete_message(msg_key)
|
||||
self.assertEqual(conn.fetch_messages("reports"), [])
|
||||
|
||||
def test_move_message_from_subfolder(self):
|
||||
"""move_message works when active folder is a subfolder."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
subfolder = conn._client.add_folder("reports")
|
||||
msg_key = subfolder.add("From: move@example.com\n\nMove me")
|
||||
|
||||
conn.fetch_messages("reports")
|
||||
conn.move_message(msg_key, "archive")
|
||||
|
||||
# Source should be empty
|
||||
self.assertEqual(conn.fetch_messages("reports"), [])
|
||||
# Destination should have the message
|
||||
archive_keys = conn.fetch_messages("archive")
|
||||
self.assertEqual(len(archive_keys), 1)
|
||||
|
||||
def test_inbox_reads_root(self):
|
||||
"""INBOX reads from the top-level Maildir."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
msg_key = conn._client.add("From: root@example.com\n\nRoot msg")
|
||||
|
||||
keys = conn.fetch_messages("INBOX")
|
||||
self.assertIn(msg_key, keys)
|
||||
|
||||
def test_empty_folder_reads_root(self):
|
||||
"""Empty string reports_folder reads from the top-level Maildir."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
|
||||
msg_key = conn._client.add("From: root@example.com\n\nRoot msg")
|
||||
|
||||
keys = conn.fetch_messages("")
|
||||
self.assertIn(msg_key, keys)
|
||||
|
||||
|
||||
class TestConfigAliases(unittest.TestCase):
|
||||
"""Tests for config key aliases (env var friendly short names)."""
|
||||
|
||||
def test_maildir_create_alias(self):
|
||||
"""[maildir] create works as alias for maildir_create."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_MAILDIR_CREATE": "true",
|
||||
"PARSEDMARC_MAILDIR_PATH": "/tmp/test",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertTrue(opts.maildir_create)
|
||||
|
||||
def test_maildir_path_alias(self):
|
||||
"""[maildir] path works as alias for maildir_path."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
|
||||
env = {"PARSEDMARC_MAILDIR_PATH": "/var/mail/dmarc"}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertEqual(opts.maildir_path, "/var/mail/dmarc")
|
||||
|
||||
def test_msgraph_url_alias(self):
|
||||
"""[msgraph] url works as alias for graph_url."""
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
from argparse import Namespace
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_MSGRAPH_AUTH_METHOD": "ClientSecret",
|
||||
"PARSEDMARC_MSGRAPH_CLIENT_ID": "test-id",
|
||||
"PARSEDMARC_MSGRAPH_CLIENT_SECRET": "test-secret",
|
||||
"PARSEDMARC_MSGRAPH_TENANT_ID": "test-tenant",
|
||||
"PARSEDMARC_MSGRAPH_MAILBOX": "test@example.com",
|
||||
"PARSEDMARC_MSGRAPH_URL": "https://custom.graph.example.com",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertEqual(opts.graph_url, "https://custom.graph.example.com")
|
||||
|
||||
def test_original_keys_still_work(self):
|
||||
"""Original INI key names (maildir_create, maildir_path) still work."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _parse_config
|
||||
|
||||
config = ConfigParser(interpolation=None)
|
||||
config.add_section("maildir")
|
||||
config.set("maildir", "maildir_path", "/original/path")
|
||||
config.set("maildir", "maildir_create", "true")
|
||||
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertEqual(opts.maildir_path, "/original/path")
|
||||
self.assertTrue(opts.maildir_create)
|
||||
|
||||
|
||||
class TestMaildirUidHandling(unittest.TestCase):
|
||||
"""Tests for Maildir UID mismatch handling in Docker-like environments."""
|
||||
|
||||
def test_uid_mismatch_warns_instead_of_crashing(self):
|
||||
"""UID mismatch logs a warning instead of raising an exception."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
# Create subdirs so Maildir works
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(d, subdir))
|
||||
|
||||
# Mock os.stat to return a different UID than os.getuid
|
||||
fake_stat = os.stat(d)
|
||||
with (
|
||||
patch("parsedmarc.mail.maildir.os.stat") as mock_stat,
|
||||
patch("parsedmarc.mail.maildir.os.getuid", return_value=9999),
|
||||
):
|
||||
mock_stat.return_value = fake_stat
|
||||
# Should not raise — just warn
|
||||
conn = MaildirConnection(d, maildir_create=False)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_uid_match_no_warning(self):
|
||||
"""No warning when UIDs match."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
conn = MaildirConnection(d, maildir_create=True)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
def test_stat_failure_does_not_crash(self):
|
||||
"""If os.stat fails on the maildir path, we don't crash."""
|
||||
from parsedmarc.mail.maildir import MaildirConnection
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(d, subdir))
|
||||
|
||||
original_stat = os.stat
|
||||
|
||||
def stat_that_fails_once(path, *args, **kwargs):
|
||||
"""Fail on the first call (UID check), pass through after."""
|
||||
stat_that_fails_once.calls += 1
|
||||
if stat_that_fails_once.calls == 1:
|
||||
raise OSError("no stat")
|
||||
return original_stat(path, *args, **kwargs)
|
||||
|
||||
stat_that_fails_once.calls = 0
|
||||
|
||||
with patch(
|
||||
"parsedmarc.mail.maildir.os.stat", side_effect=stat_that_fails_once
|
||||
):
|
||||
conn = MaildirConnection(d, maildir_create=False)
|
||||
self.assertEqual(conn.fetch_messages("INBOX"), [])
|
||||
|
||||
|
||||
class TestExpandPath(unittest.TestCase):
|
||||
"""Tests for _expand_path config path expansion."""
|
||||
|
||||
def test_expand_tilde(self):
|
||||
from parsedmarc.cli import _expand_path
|
||||
|
||||
result = _expand_path("~/some/path")
|
||||
self.assertFalse(result.startswith("~"))
|
||||
self.assertTrue(result.endswith("/some/path"))
|
||||
|
||||
def test_expand_env_var(self):
|
||||
from parsedmarc.cli import _expand_path
|
||||
|
||||
with patch.dict(os.environ, {"PARSEDMARC_TEST_DIR": "/opt/data"}):
|
||||
result = _expand_path("$PARSEDMARC_TEST_DIR/tokens/.token")
|
||||
self.assertEqual(result, "/opt/data/tokens/.token")
|
||||
|
||||
def test_expand_both(self):
|
||||
from parsedmarc.cli import _expand_path
|
||||
|
||||
with patch.dict(os.environ, {"MY_APP": "parsedmarc"}):
|
||||
result = _expand_path("~/$MY_APP/config")
|
||||
self.assertNotIn("~", result)
|
||||
self.assertIn("parsedmarc/config", result)
|
||||
|
||||
def test_no_expansion_needed(self):
|
||||
from parsedmarc.cli import _expand_path
|
||||
|
||||
self.assertEqual(_expand_path("/absolute/path"), "/absolute/path")
|
||||
self.assertEqual(_expand_path("relative/path"), "relative/path")
|
||||
|
||||
|
||||
class TestTokenParentDirCreation(unittest.TestCase):
|
||||
"""Tests for parent directory creation when writing token files."""
|
||||
|
||||
def test_graph_cache_creates_parent_dirs(self):
|
||||
from parsedmarc.mail.graph import _cache_auth_record
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
token_path = Path(d) / "subdir" / "nested" / ".token"
|
||||
self.assertFalse(token_path.parent.exists())
|
||||
|
||||
mock_record = MagicMock()
|
||||
mock_record.serialize.return_value = "serialized-token"
|
||||
|
||||
_cache_auth_record(mock_record, token_path)
|
||||
|
||||
self.assertTrue(token_path.exists())
|
||||
self.assertEqual(token_path.read_text(), "serialized-token")
|
||||
|
||||
def test_gmail_token_write_creates_parent_dirs(self):
|
||||
"""Gmail token write creates parent directories."""
|
||||
with TemporaryDirectory() as d:
|
||||
token_path = Path(d) / "deep" / "nested" / "token.json"
|
||||
self.assertFalse(token_path.parent.exists())
|
||||
|
||||
# Directly test the mkdir + open pattern
|
||||
token_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with token_path.open("w") as f:
|
||||
f.write('{"token": "test"}')
|
||||
|
||||
self.assertTrue(token_path.exists())
|
||||
self.assertEqual(token_path.read_text(), '{"token": "test"}')
|
||||
|
||||
|
||||
class TestEnvVarConfig(unittest.TestCase):
|
||||
"""Tests for environment variable configuration support."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user